You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/03/11 08:05:06 UTC

kylin git commit: KYLIN-1226 apply in-mem conf only to in-mem mr job

Repository: kylin
Updated Branches:
  refs/heads/master 5e13bba08 -> 0232f17ed


KYLIN-1226 apply in-mem conf only to in-mem mr job


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0232f17e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0232f17e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0232f17e

Branch: refs/heads/master
Commit: 0232f17ed74a3354c7ffd6647d159a8581626861
Parents: 5e13bba
Author: Li Yang <li...@apache.org>
Authored: Fri Mar 11 15:03:40 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Fri Mar 11 15:04:19 2016 +0800

----------------------------------------------------------------------
 build/conf/kylin_job_conf.xml                   | 14 -------------
 .../apache/kylin/common/KylinConfigBase.java    | 21 +++++++++++++++++++-
 .../kylin/engine/mr/steps/InMemCuboidJob.java   | 11 ++++++++++
 3 files changed, 31 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/0232f17e/build/conf/kylin_job_conf.xml
----------------------------------------------------------------------
diff --git a/build/conf/kylin_job_conf.xml b/build/conf/kylin_job_conf.xml
index 052021c..3ecaeda 100644
--- a/build/conf/kylin_job_conf.xml
+++ b/build/conf/kylin_job_conf.xml
@@ -55,20 +55,6 @@
         <description>Block replication</description>
     </property>
 
-    <!--Properties for calculating cube by splits (in-mem), with which each Mapper need more mem to hold a full cube segment -->
-    <property>
-        <name>mapreduce.map.java.opts</name>
-        <value>-Xmx2500m</value>
-    </property>
-    <property>
-        <name>mapreduce.map.memory.mb</name>
-        <value>3072</value>
-    </property>
-    <property>
-        <name>mapreduce.task.io.sort.mb</name>
-        <value>200</value>
-    </property>
-    
     <property>
         <name>mapred.task.timeout</name>
         <value>3600000</value>

http://git-wip-us.apache.org/repos/asf/kylin/blob/0232f17e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 487f78e..6a47321 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
+import java.util.Map;
 import java.util.Properties;
 import java.util.SortedSet;
 import java.util.regex.Matcher;
@@ -33,6 +34,7 @@ import org.apache.kylin.common.util.CliCommandExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 @SuppressWarnings("serial")
@@ -476,11 +478,28 @@ public class KylinConfigBase implements Serializable {
     }
 
     public int getCubingInMemSamplingPercent() {
-        int percent = Integer.parseInt(this.getOptional("kylin.job.cubing.inMem.sampling.percent", "30"));
+        int percent = Integer.parseInt(this.getOptional("kylin.job.cubing.inmem.sampling.percent", "30"));
         percent = Math.max(percent, 1);
         percent = Math.min(percent, 100);
         return percent;
     }
+    
+    public Map<String, String> getCubingInMemMRJobConfOverride() {
+        String str = getOptional("kylin.job.cubing.inmem.mrjob_conf_override", //
+                "mapreduce.map.java.opts=-Xmx2700m;  mapreduce.map.memory.mb=3072;  mapreduce.task.io.sort.mb=200");
+        Map<String, String> result = Maps.newHashMap();
+        for (String pair : str.split(";")) {
+            int cut = pair.indexOf('=');
+            if (cut < 0)
+                continue;
+            String k = pair.substring(0, cut).trim();
+            String v = pair.substring(cut + 1).trim();
+            if (k.isEmpty() || v.isEmpty())
+                continue;
+            result.put(k, v);
+        }
+        return result;
+    }
 
     public String getHbaseDefaultCompressionCodec() {
         return getOptional("kylin.hbase.default.compression.codec", "");

http://git-wip-us.apache.org/repos/asf/kylin/blob/0232f17e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index d197252..2319451 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -20,8 +20,10 @@ package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -97,6 +99,9 @@ public class InMemCuboidJob extends AbstractHadoopJob {
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
             logger.info("Starting: " + job.getJobName());
+            
+            // some special tuning for in-mem MR job
+            overrideJobConf(job.getConfiguration(), config);
 
             setJobClasspath(job);
 
@@ -143,6 +148,12 @@ public class InMemCuboidJob extends AbstractHadoopJob {
         }
     }
 
+    private void overrideJobConf(Configuration jobConf, KylinConfig kylinConfig) {
+        for (Entry<String, String> entry : kylinConfig.getCubingInMemMRJobConfOverride().entrySet()) {
+            jobConf.set(entry.getKey(), entry.getValue());
+        }
+    }
+
     private int calculateReducerNum(CubeSegment cubeSeg) throws IOException {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();