You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kylin.apache.org by GitBox <gi...@apache.org> on 2018/08/28 14:22:49 UTC

[GitHub] shaofengshi closed pull request #213: KYLIN-3509 Allocate more memory for merge-dictionary step

shaofengshi closed pull request #213: KYLIN-3509 Allocate more memory for merge-dictionary step
URL: https://github.com/apache/kylin/pull/213
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 58d9caacbc..4895bf0745 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1140,6 +1140,11 @@ public String getKylinJobMRLibDir() {
         return getPropertiesByPrefix("kylin.engine.spark-conf.");
     }
 
+    public Map<String, String> getSparkConfigOverrideWithSpecificName(String configName) {
+        return getPropertiesByPrefix("kylin.engine.spark-conf-" + configName + ".");
+    }
+
+
     public double getDefaultHadoopJobReducerInputMB() {
         return Double.parseDouble(getOptional("kylin.engine.mr.reduce-input-mb", "500"));
     }
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 23c0730afa..e505def0a3 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -312,6 +312,10 @@ kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
 kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
 kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
 
+### Spark conf for specific job
+kylin.engine.spark-conf-mergedict.spark.executor.memory=6G
+kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2
+
 # manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime
 #kylin.engine.spark-conf.spark.yarn.archive=hdfs://namenode:8020/kylin/spark/spark-libs.jar
 #kylin.engine.spark-conf.spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 560293cf44..5735a80975 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -71,4 +71,6 @@ private ExecutableConstants() {
     public static final String STEP_NAME_LOOKUP_SNAPSHOT_CACHE_UPDATE = "Update Lookup Snapshot Cache to Query Engine";
     public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_META_STORE = "Take Snapshot to Metadata Store";
     public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_UPDATE_CUBE = "Update Cube Info";
+
+    public static final String SPARK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY = "mergedict";
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index d9027082b6..88a58ae5a2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -26,6 +26,7 @@
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.steps.MergeDictionaryJob;
 import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,8 +79,8 @@ public MapReduceExecutable createMergeDictionaryStep(CubeSegment seg, String job
         MapReduceExecutable mergeDictionaryStep = new MapReduceExecutable();
         mergeDictionaryStep.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
         StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd, JobEngineConfig.IN_MEM_JOB_CONF_SUFFIX);
 
-        appendMapReduceParameters(cmd);
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName());
         appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
         appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL, getSegmentMetadataUrl(seg.getConfig(), jobID));
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
index 4487610885..eb67fefaa3 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
@@ -89,6 +89,7 @@ public SparkExecutable createMergeDictionaryStep(CubeSegment seg, String jobID,
 
         sparkExecutable.setJobId(jobID);
         sparkExecutable.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
+        sparkExecutable.setSparkConfigName(ExecutableConstants.SPARK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY);
 
         StringBuilder jars = new StringBuilder();
 
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 612239741f..dea820698e 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -63,6 +63,7 @@
     private static final String JARS = "jars";
     private static final String JOB_ID = "jobId";
     private static final String COUNTER_SAVE_AS = "CounterSaveAs";
+    private static final String CONFIG_NAME = "configName";
 
     public void setClassName(String className) {
         this.setParam(CLASS_NAME, className);
@@ -84,6 +85,17 @@ public String getCounterSaveAs() {
         return getParam(COUNTER_SAVE_AS);
     }
 
+    /**
+     * set spark override conf for specific job
+     */
+    public void setSparkConfigName(String configName) {
+        this.setParam(CONFIG_NAME, configName);
+    }
+
+    public String getSparkConfigName() {
+        return getParam(CONFIG_NAME);
+    }
+
     private String formatArgs() {
         StringBuilder stringBuilder = new StringBuilder();
         for (Map.Entry<String, String> entry : getParams().entrySet()) {
@@ -92,7 +104,7 @@ private String formatArgs() {
             if (entry.getKey().equals(CLASS_NAME)) {
                 stringBuilder.insert(0, tmp);
             } else if (entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID)
-                    || entry.getKey().equals(COUNTER_SAVE_AS)) {
+                    || entry.getKey().equals(COUNTER_SAVE_AS) || entry.getKey().equals(CONFIG_NAME)) {
                 // JARS is for spark-submit, not for app
                 continue;
             } else {
@@ -221,6 +233,13 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
             }
 
             Map<String, String> sparkConfs = config.getSparkConfigOverride();
+
+            String sparkConfigName = getSparkConfigName();
+            if (sparkConfigName != null) {
+                Map<String, String> sparkSpecificConfs = config.getSparkConfigOverrideWithSpecificName(sparkConfigName);
+                sparkConfs.putAll(sparkSpecificConfs);
+            }
+
             for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
                 stringBuilder.append(" --conf ").append(entry.getKey()).append("=").append(entry.getValue())
                         .append(" ");
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index edced8f1f3..e6a6bd6c9b 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -202,6 +202,10 @@ kylin.engine.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
 kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current
 kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
 
+### Spark conf for specific job
+kylin.engine.spark-conf-mergedict.spark.executor.memory=1G
+kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2
+
 ### QUERY PUSH DOWN  ###
 #kylin.query.pushdown.runner-class-name=org.apache.kylin.query.adhoc.PushDownRunnerJdbcImpl
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services