You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/01/10 01:41:29 UTC

kylin git commit: KYLIN-2371 rename the prefix to ‘kylin.engine.spark-conf.’

Repository: kylin
Updated Branches:
  refs/heads/master ee74a74e4 -> 00c8f31e6


KYLIN-2371 rename the prefix to \u2018kylin.engine.spark-conf.\u2019


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/00c8f31e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/00c8f31e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/00c8f31e

Branch: refs/heads/master
Commit: 00c8f31e6d235d38bb7919fa4ee5b12e53d840f6
Parents: ee74a74
Author: shaofengshi <sh...@apache.org>
Authored: Tue Jan 10 09:40:44 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Jan 10 09:40:44 2017 +0800

----------------------------------------------------------------------
 build/conf/kylin-spark-conf.properties          | 27 -----------
 build/conf/kylin.properties                     | 48 ++++++++++++--------
 build/deploy/spark-defaults.conf                |  6 +++
 build/script/download-spark.sh                  |  3 +-
 .../apache/kylin/common/KylinConfigBase.java    | 18 +-------
 .../kylin/engine/mr/BatchCubingJobBuilder2.java |  2 +-
 .../kylin/engine/spark/SparkCubingByLayer.java  | 10 ++--
 .../kylin/engine/spark/SparkExecutable.java     |  7 ++-
 .../test_case_data/sandbox/kylin.properties     | 19 ++++++--
 9 files changed, 65 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/build/conf/kylin-spark-conf.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin-spark-conf.properties b/build/conf/kylin-spark-conf.properties
deleted file mode 100644
index 143e7e4..0000000
--- a/build/conf/kylin-spark-conf.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-spark.yarn.submit.file.replication=1
-spark.yarn.executor.memoryOverhead=1024
-spark.yarn.driver.memoryOverhead=384
-spark.master=yarn
-spark.submit.deployMode=cluster
-spark.eventLog.enabled=true
-spark.yarn.scheduler.heartbeat.interval-ms=5000
-spark.yarn.preserve.staging.files=true
-spark.yarn.queue=default
-spark.yarn.containerLauncherMaxThreads=25
-spark.yarn.max.executor.failures=3
-spark.eventLog.dir=hdfs\:///kylin/spark-history
-spark.history.kerberos.enabled=true
-spark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider
-spark.history.ui.port=18080
-spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
-spark.executor.memory=4G
-spark.storage.memoryFraction=0.3
-spark.executor.cores=4
-spark.executor.instances=8
-spark.history.kerberos.keytab=none
-spark.history.kerberos.principal=none
-spark.driver.extraJavaOptions=-Dhdp.version=current
-spark.yarn.am.extraJavaOptions=-Dhdp.version=current
-spark.executor.extraJavaOptions=-Dhdp.version=current
-#spark.yarn.jar=hdfs://namenode:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
-#spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec

http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 6efa423..196a711 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -127,24 +127,6 @@ kylin.engine.mr.max-reducer-number=500
 
 kylin.engine.mr.mapper-input-rows=1000000
 
-
-### Spark Engine ###
-
-# Hadoop conf folder, will export this as "HADOOP_CONF_DIR" before run spark-submit
-#kylin.engine.spark.env.hadoop-conf-dir=/etc/hive/conf
-
-# Spark job submission properties file, default be $KYLIN_HOME/conf/kylin-spark-conf.properties
-#kylin.engine.spark.properties-file=
-
-# Estimate the RDD partition numbers
-kylin.engine.spark.rdd-partition-cut-mb=10
-
-# Minimal partition numbers of rdd
-kylin.engine.spark.min-partition=1
-
-# Max partition numbers of rdd
-kylin.engine.spark.max-partition=5000
-
 ### CUBE | DICTIONARY ###
 
 # 'auto', 'inmem', 'layer' or 'random' for testing
@@ -204,3 +186,33 @@ kylin.security.saml.context-server-name=hostname
 kylin.security.saml.context-server-port=443
 kylin.security.saml.context-path=/kylin
 
+
+### Spark Engine Configs ###
+
+# Hadoop conf folder, will export this as "HADOOP_CONF_DIR" before run spark-submit
+#kylin.engine.spark.env.hadoop-conf-dir=/etc/hive/conf
+
+# Estimate the RDD partition numbers
+kylin.engine.spark.rdd-partition-cut-mb=10
+
+# Minimal partition numbers of rdd
+kylin.engine.spark.min-partition=1
+
+# Max partition numbers of rdd
+kylin.engine.spark.max-partition=5000
+
+### Spark conf (default is in spark/conf/spark-defaults.conf)
+kylin.engine.spark-conf.spark.master=yarn
+kylin.engine.spark-conf.spark.submit.deployMode=cluster
+kylin.engine.spark-conf.spark.yarn.executor.memoryOverhead=1024
+kylin.engine.spark-conf.spark.yarn.driver.memoryOverhead=384
+kylin.engine.spark-conf.spark.yarn.queue=default
+kylin.engine.spark-conf.spark.executor.memory=4G
+kylin.engine.spark-conf.spark.executor.cores=4
+kylin.engine.spark-conf.spark.executor.instances=8
+kylin.engine.spark-conf.spark.storage.memoryFraction=0.3
+kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
+kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
+## manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime
+#kylin.engine.spark-conf.spark.yarn.jar=hdfs://namenode:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
+#kylin.engine.spark-conf.spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/build/deploy/spark-defaults.conf
----------------------------------------------------------------------
diff --git a/build/deploy/spark-defaults.conf b/build/deploy/spark-defaults.conf
new file mode 100644
index 0000000..36c0ab3
--- /dev/null
+++ b/build/deploy/spark-defaults.conf
@@ -0,0 +1,6 @@
+spark.yarn.submit.file.replication=1
+spark.eventLog.enabled=true
+spark.yarn.max.executor.failures=3
+spark.driver.extraJavaOptions=-Dhdp.version=current
+spark.yarn.am.extraJavaOptions=-Dhdp.version=current
+spark.executor.extraJavaOptions=-Dhdp.version=current

http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/build/script/download-spark.sh
----------------------------------------------------------------------
diff --git a/build/script/download-spark.sh b/build/script/download-spark.sh
index dcbcbe7..ad9651d 100755
--- a/build/script/download-spark.sh
+++ b/build/script/download-spark.sh
@@ -48,5 +48,6 @@ mv build/spark-${spark_version}-bin-hadoop2.6 build/spark
 rm -rf build/spark/lib/spark-examples-*
 rm -rf build/spark/examples
 rm -rf build/spark/data
-rm -rf build/spark/python
 rm -rf build/spark/R
+
+cp build/deploy/spark-defaults.conf build/spark/conf/spark-defaults.conf
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 36ddbf4..04051b4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -696,7 +696,7 @@ abstract public class KylinConfigBase implements Serializable {
     }
 
     public Map<String, String> getSparkConfigOverride() {
-        return getPropertiesByPrefix("kylin.engine.spark.config-override.");
+        return getPropertiesByPrefix("kylin.engine.spark-conf.");
     }
 
     public double getDefaultHadoopJobReducerInputMB() {
@@ -747,22 +747,6 @@ abstract public class KylinConfigBase implements Serializable {
         return getOptional("kylin.engine.spark.env.hadoop-conf-dir", "");
     }
 
-    public String getSparkConfFile() {
-        String conf = getOptional("kylin.engine.spark.properties-file", "conf/kylin-spark-conf.properties");
-        File f = new File(conf);
-        if (f.exists()) {
-            return f.getAbsolutePath();
-        } else {
-            String home = getKylinHome();
-            f = new File(home, conf);
-            if (f.exists()) {
-                return f.getAbsolutePath();
-            }
-        }
-
-        throw new IllegalArgumentException("Spark conf properties file '" + conf + "' does not exist.");
-    }
-
     public String getSparkAdditionalJars() {
         return getOptional("kylin.engine.spark.additional-jars", "");
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index dd866bd..0f604e2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -82,7 +82,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId));
         // n dim cuboid steps
         for (int i = 1; i <= maxLevel; i++) {
-            result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i-1), getCuboidOutputPathsByLevel(cuboidRootPath, i-1), i, jobId));
+            result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i-1), getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index c989dee..3a664fc 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -255,7 +255,10 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
 
         // aggregate to calculate base cuboid
         allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, partition).persist(storageLevel);
-        saveToHDFS(allRDDs[0], vCubeDesc.getValue(), outputPath, 0, sc.hadoopConfiguration());
+        Configuration confOverwrite = new Configuration(sc.hadoopConfiguration());
+        confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2
+
+        saveToHDFS(allRDDs[0], vCubeDesc.getValue(), outputPath, 0, confOverwrite);
 
         // aggregate to ND cuboids
         PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder);
@@ -267,7 +270,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
              if (kylinConfig.isSparkSanityCheckEnabled() == true) {
                  sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex);
             }
-            saveToHDFS(allRDDs[level], vCubeDesc.getValue(), outputPath, level, sc.hadoopConfiguration());
+            saveToHDFS(allRDDs[level], vCubeDesc.getValue(), outputPath, level, confOverwrite);
             allRDDs[level - 1].unpersist();
         }
         allRDDs[totalLevels - 1].unpersist();
@@ -285,8 +288,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
     }
 
     private static void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final CubeDesc cubeDesc, final String hdfsBaseLocation, int level, Configuration conf) {
-        conf.set("dfs.replication", "2");
-        final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
+       final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
                 rdd.mapToPair(new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
                     BufferedMeasureCodec codec = new BufferedMeasureCodec(cubeDesc.getMeasures());
                     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index d892060..733a472 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -76,7 +76,6 @@ public class SparkExecutable extends AbstractExecutable {
         final KylinConfig config = context.getConfig();
         Preconditions.checkNotNull(config.getSparkHome());
         Preconditions.checkNotNull(config.getKylinJobJarPath());
-        String sparkConf = config.getSparkConfFile();
         String jars = this.getParam(JARS);
 
         String hadoopConf = "/etc/hadoop/conf";
@@ -105,17 +104,17 @@ public class SparkExecutable extends AbstractExecutable {
         }
 
         StringBuilder stringBuilder = new StringBuilder();
-        stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry --properties-file %s ");
+        stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
 
         Map<String, String> sparkConfs = config.getSparkConfigOverride();
         for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
-            stringBuilder.append(" --conf ").append(entry.getKey()).append("==").append(entry.getValue()).append(" ");
+            stringBuilder.append(" --conf ").append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
         }
 
         stringBuilder.append("--files %s --jars %s %s %s");
         try {
             String cmd = String.format(stringBuilder.toString(),
-                    hadoopConf, config.getSparkHome(), sparkConf, hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs());
+                    hadoopConf, config.getSparkHome(), hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs());
             logger.info("cmd:" + cmd);
             final StringBuilder output = new StringBuilder();
             CliCommandExecutor exec = new CliCommandExecutor();

http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index d42e009..06f8e4b 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -160,6 +160,19 @@ kylin.source.hive.keep-flat-table=false
 
 ### Spark as Engine ###
 kylin.engine.spark.env.hadoop-conf-dir=../examples/test_case_data/sandbox
-kylin.engine.spark.spark-home=/usr/local/spark
-kylin.engine.spark.properties-file=../examples/test_case_data/sandbox/kylin-spark-conf.properties
-kylin.engine.spark.sanity-check-enabled=false
\ No newline at end of file
+kylin.engine.spark.sanity-check-enabled=false
+
+### Spark conf overwrite for cube engine
+kylin.engine.spark-conf.spark.master=yarn
+kylin.engine.spark-conf.spark.submit.deployMode=client
+kylin.engine.spark-conf.spark.yarn.executor.memoryOverhead=512
+kylin.engine.spark-conf.spark.yarn.driver.memoryOverhead=384
+kylin.engine.spark-conf.spark.executor.memory=1G
+kylin.engine.spark-conf.spark.executor.cores=1
+kylin.engine.spark-conf.spark.executor.instances=1
+kylin.engine.spark-conf.spark.storage.memoryFraction=0.3
+kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
+kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
+#kylin.engine.spark-conf.spark.yarn.queue=default
+#kylin.engine.spark-conf.spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
+#kylin.engine.spark-conf.spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec