You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/05 02:54:31 UTC

[17/31] incubator-kylin git commit: KYLIN-953 Let MapReduceExecutable pick up patched hadoop config

KYLIN-953 Let MapReduceExecutable pick up patched hadoop config


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/5949fe71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/5949fe71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/5949fe71

Branch: refs/heads/KYLIN-1112
Commit: 5949fe71e5bf38bc2eaf7d077c00013743efe2e1
Parents: 933e81f
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Nov 2 12:50:08 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Nov 5 09:53:16 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/dict/DictionaryGenerator.java  |  2 +-
 .../org/apache/kylin/engine/mr/HadoopUtil.java  | 20 ++++++++++++++++----
 .../engine/mr/common/MapReduceExecutable.java   |  7 +++++--
 .../engine/mr/steps/MergeStatisticsStep.java    |  2 +-
 .../HiveToBaseCuboidMapperPerformanceTest.java  |  3 ++-
 .../engine/mr/steps/MergeCuboidJobTest.java     |  3 ++-
 .../kylin/engine/mr/steps/NDCuboidJobTest.java  |  3 ++-
 .../storage/hbase/util/CubeMigrationCLI.java    |  3 ++-
 .../storage/hbase/steps/CreateHTableTest.java   |  3 ++-
 .../hbase/steps/CubeHFileMapper2Test.java       |  3 ++-
 .../storage/hbase/steps/ITHdfsOpsTest.java      |  3 ++-
 .../steps/RangeKeyDistributionJobTest.java      |  3 ++-
 12 files changed, 39 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5949fe71/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index c0bff8a..db885f9 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -52,7 +52,7 @@ public class DictionaryGenerator {
         try {
             return KylinConfig.getInstanceFromEnv().getDictionaryMaxCardinality();
         } catch (Throwable e) {
-            return 2000000; // some test case does not KylinConfig setup properly
+            return 5000000; // some test case does not KylinConfig setup properly
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5949fe71/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
index 862f9a0..9ce2bab 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,16 +40,27 @@ public class HadoopUtil {
 
     public static Configuration getCurrentConfiguration() {
         if (hadoopConfig.get() == null) {
-            Configuration conf = new Configuration();
+            Configuration conf = healSickConfig(new Configuration());
             
-            // why we have this hard code?
-            conf.set(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, "8");
-
             hadoopConfig.set(conf);
         }
         return hadoopConfig.get();
     }
 
+    private static Configuration healSickConfig(Configuration conf) {
+        // why we have this hard code?
+        conf.set(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, "8");
+
+        // https://issues.apache.org/jira/browse/KYLIN-953
+        if (StringUtils.isBlank(conf.get("hadoop.tmp.dir"))) {
+            conf.set("hadoop.tmp.dir", "/tmp");
+        }
+        if (StringUtils.isBlank(conf.get("hbase.fs.tmp.dir"))) {
+            conf.set("hbase.fs.tmp.dir", "/tmp");
+        }
+        return conf;
+    }
+
     public static FileSystem getFileSystem(String path) throws IOException {
         return FileSystem.get(makeURI(path), getCurrentConfiguration());
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5949fe71/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index 4fb3eed..ebb3900 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.RMHAUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.constant.JobStepStatusEnum;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -72,7 +73,8 @@ public class MapReduceExecutable extends AbstractExecutable {
                 return;
             }
             try {
-                Job job = new Cluster(new Configuration()).getJob(JobID.forName(mrJobId));
+                Configuration conf = HadoopUtil.getCurrentConfiguration();
+                Job job = new Cluster(conf).getJob(JobID.forName(mrJobId));
                 if (job.getJobState() == JobStatus.State.FAILED) {
                     //remove previous mr job info
                     super.onExecuteStart(executableContext);
@@ -101,7 +103,8 @@ public class MapReduceExecutable extends AbstractExecutable {
             Job job;
             final Map<String, String> extra = executableManager.getOutput(getId()).getExtra();
             if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) {
-                job = new Cluster(new Configuration()).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
+                Configuration conf = HadoopUtil.getCurrentConfiguration();
+                job = new Cluster(conf).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
                 logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID + " resumed"));
             } else {
                 final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5949fe71/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 67c4416..45282df 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -76,7 +76,7 @@ public class MergeStatisticsStep extends AbstractExecutable {
         final CubeInstance cube = mgr.getCube(getCubeName());
         final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
 
-        Configuration conf = new Configuration();
+        Configuration conf = HadoopUtil.getCurrentConfiguration();
         ResourceStore rs = ResourceStore.getStore(kylinConf);
         try {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5949fe71/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapperPerformanceTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapperPerformanceTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapperPerformanceTest.java
index fad521f..c1f50a8 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapperPerformanceTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapperPerformanceTest.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -45,7 +46,7 @@ public class HiveToBaseCuboidMapperPerformanceTest {
     @Ignore("convenient trial tool for dev")
     @Test
     public void test() throws IOException, InterruptedException {
-        Configuration hconf = new Configuration();
+        Configuration hconf = HadoopUtil.getCurrentConfiguration();
         HiveToBaseCuboidMapper mapper = new HiveToBaseCuboidMapper();
         Context context = MockupMapContext.create(hconf, metadataUrl, cubeName, null);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5949fe71/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java
index 43038a0..ccaa027 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidJobTest.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -37,7 +38,7 @@ public class MergeCuboidJobTest extends LocalFileMetadataTestCase {
 
     @Before
     public void setup() throws Exception {
-        conf = new Configuration();
+        conf = HadoopUtil.getCurrentConfiguration();
         conf.set("fs.default.name", "file:///");
         conf.set("mapred.job.tracker", "local");
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5949fe71/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java
index c1b8ee3..2a74720 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidJobTest.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -36,7 +37,7 @@ public class NDCuboidJobTest extends LocalFileMetadataTestCase {
 
     @Before
     public void setup() throws Exception {
-        conf = new Configuration();
+        conf = HadoopUtil.getCurrentConfiguration();
         conf.set("fs.default.name", "file:///");
         conf.set("mapred.job.tracker", "local");
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5949fe71/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
index 5708514..aec1d5b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
@@ -44,6 +44,7 @@ import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IEngineAware;
@@ -118,7 +119,7 @@ public class CubeMigrationCLI {
         Configuration conf = HBaseConfiguration.create();
         hbaseAdmin = new HBaseAdmin(conf);
 
-        hdfsFS = FileSystem.get(new Configuration());
+        hdfsFS = FileSystem.get(HadoopUtil.getCurrentConfiguration());
 
         operations = new ArrayList<Opt>();
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5949fe71/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CreateHTableTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CreateHTableTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CreateHTableTest.java
index 091d182..c7f9f73 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CreateHTableTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CreateHTableTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.storage.hbase.steps.CreateHTableJob;
 import org.junit.After;
 import org.junit.Before;
@@ -39,7 +40,7 @@ public class CreateHTableTest extends LocalFileMetadataTestCase {
 
     @Before
     public void setup() throws Exception {
-        conf = new Configuration();
+        conf = HadoopUtil.getCurrentConfiguration();
         conf.set("fs.default.name", "file:///");
         conf.set("mapred.job.tracker", "local");
         this.createTestMetadata();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5949fe71/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
index 81fa3a6..270928f 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
@@ -34,6 +34,7 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.metadata.measure.MeasureCodec;
 import org.junit.After;
 import org.junit.Before;
@@ -71,7 +72,7 @@ public class CubeHFileMapper2Test extends LocalFileMetadataTestCase {
     @Test
     public void testBasic() throws Exception {
 
-        Configuration hconf = new Configuration();
+        Configuration hconf = HadoopUtil.getCurrentConfiguration();
         Context context = MockupMapContext.create(hconf, getTestConfig().getMetadataUrl(), cubeName, outKV);
 
         CubeHFileMapper mapper = new CubeHFileMapper();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5949fe71/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHdfsOpsTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHdfsOpsTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHdfsOpsTest.java
index 5db0697..25ac2c6 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHdfsOpsTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHdfsOpsTest.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,7 +41,7 @@ public class ITHdfsOpsTest extends HBaseMetadataTestCase {
 
         this.createTestMetadata();
 
-        Configuration hconf = new Configuration();
+        Configuration hconf = HadoopUtil.getCurrentConfiguration();
 
         fileSystem = FileSystem.get(hconf);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5949fe71/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
index 793834a..05d0b08 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.steps.RangeKeyDistributionJob;
 import org.junit.After;
 import org.junit.Before;
@@ -41,7 +42,7 @@ public class RangeKeyDistributionJobTest extends LocalFileMetadataTestCase {
 
     @Before
     public void setup() throws Exception {
-        conf = new Configuration();
+        conf = HadoopUtil.getCurrentConfiguration();
         conf.set("fs.default.name", "file:///");
         conf.set("mapred.job.tracker", "local");