You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/02 04:29:29 UTC

incubator-kylin git commit: KYLIN-953 Let MapReduceExecutable pick up patched hadoop config

Repository: incubator-kylin
Updated Branches:
  refs/heads/1.x-staging 0f9acf002 -> bff402f46


KYLIN-953 Let MapReduceExecutable pick up patched hadoop config


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/bff402f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/bff402f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/bff402f4

Branch: refs/heads/1.x-staging
Commit: bff402f466ccbb93102330eace7c6904dfb1d242
Parents: 0f9acf0
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Nov 2 11:29:21 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Nov 2 11:29:21 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/util/HadoopUtil.java    | 26 ++++++++++++--------
 .../kylin/job/common/MapReduceExecutable.java   |  7 ++++--
 .../kylin/rest/service/PerformService.java      |  7 ++++--
 3 files changed, 26 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bff402f4/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 2dc8be0..b815969 100644
--- a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -46,7 +46,7 @@ public class HadoopUtil {
 
     public static Configuration getCurrentConfiguration() {
         if (hadoopConfig.get() == null) {
-            hadoopConfig.set(new Configuration());
+            hadoopConfig.set(newConfiguration());
         }
         return hadoopConfig.get();
     }
@@ -89,8 +89,11 @@ public class HadoopUtil {
         }
     }
 
-    /**
-     */
+    public static Configuration newConfiguration() {
+        Configuration conf = new Configuration();
+        return healSickConfig(conf);
+    }
+    
     public static Configuration newHBaseConfiguration(String url) {
         Configuration conf = HBaseConfiguration.create(getCurrentConfiguration());
         
@@ -103,6 +106,16 @@ public class HadoopUtil {
             conf.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs);
         }
         
+        // reduce rpc retry
+        conf.set(HConstants.HBASE_CLIENT_PAUSE, "3000");
+        conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "5");
+        conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000");
+        // conf.set(ScannerCallable.LOG_SCANNER_ACTIVITY, "true");
+
+        return healSickConfig(conf);
+    }
+
+    private static Configuration healSickConfig(Configuration conf) {
         // https://issues.apache.org/jira/browse/KYLIN-953
         if (StringUtils.isBlank(conf.get("hadoop.tmp.dir"))) {
             conf.set("hadoop.tmp.dir", "/tmp");
@@ -110,13 +123,6 @@ public class HadoopUtil {
         if (StringUtils.isBlank(conf.get("hbase.fs.tmp.dir"))) {
             conf.set("hbase.fs.tmp.dir", "/tmp");
         }
-
-        // reduce rpc retry
-        conf.set(HConstants.HBASE_CLIENT_PAUSE, "3000");
-        conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "5");
-        conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000");
-        // conf.set(ScannerCallable.LOG_SCANNER_ACTIVITY, "true");
-
         return conf;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bff402f4/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
index cbf453d..2220884 100644
--- a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
+++ b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.RMHAUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.constant.JobStepStatusEnum;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -73,7 +74,8 @@ public class MapReduceExecutable extends AbstractExecutable {
                 return;
             }
             try {
-                Job job = new Cluster(new Configuration()).getJob(JobID.forName(mrJobId));
+                Configuration conf = HadoopUtil.getCurrentConfiguration();
+                Job job = new Cluster(conf).getJob(JobID.forName(mrJobId));
                 if (job.getJobState() == JobStatus.State.FAILED) {
                     //remove previous mr job info
                     super.onExecuteStart(executableContext);
@@ -102,7 +104,8 @@ public class MapReduceExecutable extends AbstractExecutable {
             Job job;
             final Map<String, String> extra = executableManager.getOutput(getId()).getExtra();
             if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) {
-                job = new Cluster(new Configuration()).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
+                Configuration conf = HadoopUtil.getCurrentConfiguration();
+                job = new Cluster(conf).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
                 logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID + " resumed"));
             } else {
                 final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bff402f4/server/src/main/java/org/apache/kylin/rest/service/PerformService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/PerformService.java b/server/src/main/java/org/apache/kylin/rest/service/PerformService.java
index 8d6ed7b..e34aa1f 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/PerformService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/PerformService.java
@@ -22,11 +22,13 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.List;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -101,7 +103,7 @@ public class PerformService extends BasicService {
         List<String[]> allRows = null;
         CSVReader reader = null;
         FileSystem fs = null;
-        Configuration conf = new Configuration();
+        Configuration conf = HadoopUtil.newConfiguration();
 
         try {
             fs = FileSystem.newInstance(conf);
@@ -114,7 +116,8 @@ public class PerformService extends BasicService {
         } catch (IOException e) {
             logger.info("failed to read hdfs file:", e);
         } finally {
-            fs.close();
+            IOUtils.closeQuietly(reader);
+            IOUtils.closeQuietly(fs);
         }
         return allRows;
     }