You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/01/10 01:41:29 UTC
kylin git commit: KYLIN-2371 rename the prefix to ‘kylin.engine.spark-conf.’
Repository: kylin
Updated Branches:
refs/heads/master ee74a74e4 -> 00c8f31e6
KYLIN-2371 rename the prefix to \u2018kylin.engine.spark-conf.\u2019
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/00c8f31e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/00c8f31e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/00c8f31e
Branch: refs/heads/master
Commit: 00c8f31e6d235d38bb7919fa4ee5b12e53d840f6
Parents: ee74a74
Author: shaofengshi <sh...@apache.org>
Authored: Tue Jan 10 09:40:44 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Jan 10 09:40:44 2017 +0800
----------------------------------------------------------------------
build/conf/kylin-spark-conf.properties | 27 -----------
build/conf/kylin.properties | 48 ++++++++++++--------
build/deploy/spark-defaults.conf | 6 +++
build/script/download-spark.sh | 3 +-
.../apache/kylin/common/KylinConfigBase.java | 18 +-------
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 2 +-
.../kylin/engine/spark/SparkCubingByLayer.java | 10 ++--
.../kylin/engine/spark/SparkExecutable.java | 7 ++-
.../test_case_data/sandbox/kylin.properties | 19 ++++++--
9 files changed, 65 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/build/conf/kylin-spark-conf.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin-spark-conf.properties b/build/conf/kylin-spark-conf.properties
deleted file mode 100644
index 143e7e4..0000000
--- a/build/conf/kylin-spark-conf.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-spark.yarn.submit.file.replication=1
-spark.yarn.executor.memoryOverhead=1024
-spark.yarn.driver.memoryOverhead=384
-spark.master=yarn
-spark.submit.deployMode=cluster
-spark.eventLog.enabled=true
-spark.yarn.scheduler.heartbeat.interval-ms=5000
-spark.yarn.preserve.staging.files=true
-spark.yarn.queue=default
-spark.yarn.containerLauncherMaxThreads=25
-spark.yarn.max.executor.failures=3
-spark.eventLog.dir=hdfs\:///kylin/spark-history
-spark.history.kerberos.enabled=true
-spark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider
-spark.history.ui.port=18080
-spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
-spark.executor.memory=4G
-spark.storage.memoryFraction=0.3
-spark.executor.cores=4
-spark.executor.instances=8
-spark.history.kerberos.keytab=none
-spark.history.kerberos.principal=none
-spark.driver.extraJavaOptions=-Dhdp.version=current
-spark.yarn.am.extraJavaOptions=-Dhdp.version=current
-spark.executor.extraJavaOptions=-Dhdp.version=current
-#spark.yarn.jar=hdfs://namenode:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
-#spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec
http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 6efa423..196a711 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -127,24 +127,6 @@ kylin.engine.mr.max-reducer-number=500
kylin.engine.mr.mapper-input-rows=1000000
-
-### Spark Engine ###
-
-# Hadoop conf folder, will export this as "HADOOP_CONF_DIR" before run spark-submit
-#kylin.engine.spark.env.hadoop-conf-dir=/etc/hive/conf
-
-# Spark job submission properties file, default be $KYLIN_HOME/conf/kylin-spark-conf.properties
-#kylin.engine.spark.properties-file=
-
-# Estimate the RDD partition numbers
-kylin.engine.spark.rdd-partition-cut-mb=10
-
-# Minimal partition numbers of rdd
-kylin.engine.spark.min-partition=1
-
-# Max partition numbers of rdd
-kylin.engine.spark.max-partition=5000
-
### CUBE | DICTIONARY ###
# 'auto', 'inmem', 'layer' or 'random' for testing
@@ -204,3 +186,33 @@ kylin.security.saml.context-server-name=hostname
kylin.security.saml.context-server-port=443
kylin.security.saml.context-path=/kylin
+
+### Spark Engine Configs ###
+
+# Hadoop conf folder, will export this as "HADOOP_CONF_DIR" before run spark-submit
+#kylin.engine.spark.env.hadoop-conf-dir=/etc/hive/conf
+
+# Estimate the RDD partition numbers
+kylin.engine.spark.rdd-partition-cut-mb=10
+
+# Minimal partition numbers of rdd
+kylin.engine.spark.min-partition=1
+
+# Max partition numbers of rdd
+kylin.engine.spark.max-partition=5000
+
+### Spark conf (default is in spark/conf/spark-defaults.conf)
+kylin.engine.spark-conf.spark.master=yarn
+kylin.engine.spark-conf.spark.submit.deployMode=cluster
+kylin.engine.spark-conf.spark.yarn.executor.memoryOverhead=1024
+kylin.engine.spark-conf.spark.yarn.driver.memoryOverhead=384
+kylin.engine.spark-conf.spark.yarn.queue=default
+kylin.engine.spark-conf.spark.executor.memory=4G
+kylin.engine.spark-conf.spark.executor.cores=4
+kylin.engine.spark-conf.spark.executor.instances=8
+kylin.engine.spark-conf.spark.storage.memoryFraction=0.3
+kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
+kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
+## manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime
+#kylin.engine.spark-conf.spark.yarn.jar=hdfs://namenode:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
+#kylin.engine.spark-conf.spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/build/deploy/spark-defaults.conf
----------------------------------------------------------------------
diff --git a/build/deploy/spark-defaults.conf b/build/deploy/spark-defaults.conf
new file mode 100644
index 0000000..36c0ab3
--- /dev/null
+++ b/build/deploy/spark-defaults.conf
@@ -0,0 +1,6 @@
+spark.yarn.submit.file.replication=1
+spark.eventLog.enabled=true
+spark.yarn.max.executor.failures=3
+spark.driver.extraJavaOptions=-Dhdp.version=current
+spark.yarn.am.extraJavaOptions=-Dhdp.version=current
+spark.executor.extraJavaOptions=-Dhdp.version=current
http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/build/script/download-spark.sh
----------------------------------------------------------------------
diff --git a/build/script/download-spark.sh b/build/script/download-spark.sh
index dcbcbe7..ad9651d 100755
--- a/build/script/download-spark.sh
+++ b/build/script/download-spark.sh
@@ -48,5 +48,6 @@ mv build/spark-${spark_version}-bin-hadoop2.6 build/spark
rm -rf build/spark/lib/spark-examples-*
rm -rf build/spark/examples
rm -rf build/spark/data
-rm -rf build/spark/python
rm -rf build/spark/R
+
+cp build/deploy/spark-defaults.conf build/spark/conf/spark-defaults.conf
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 36ddbf4..04051b4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -696,7 +696,7 @@ abstract public class KylinConfigBase implements Serializable {
}
public Map<String, String> getSparkConfigOverride() {
- return getPropertiesByPrefix("kylin.engine.spark.config-override.");
+ return getPropertiesByPrefix("kylin.engine.spark-conf.");
}
public double getDefaultHadoopJobReducerInputMB() {
@@ -747,22 +747,6 @@ abstract public class KylinConfigBase implements Serializable {
return getOptional("kylin.engine.spark.env.hadoop-conf-dir", "");
}
- public String getSparkConfFile() {
- String conf = getOptional("kylin.engine.spark.properties-file", "conf/kylin-spark-conf.properties");
- File f = new File(conf);
- if (f.exists()) {
- return f.getAbsolutePath();
- } else {
- String home = getKylinHome();
- f = new File(home, conf);
- if (f.exists()) {
- return f.getAbsolutePath();
- }
- }
-
- throw new IllegalArgumentException("Spark conf properties file '" + conf + "' does not exist.");
- }
-
public String getSparkAdditionalJars() {
return getOptional("kylin.engine.spark.additional-jars", "");
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index dd866bd..0f604e2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -82,7 +82,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId));
// n dim cuboid steps
for (int i = 1; i <= maxLevel; i++) {
- result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i-1), getCuboidOutputPathsByLevel(cuboidRootPath, i-1), i, jobId));
+ result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i-1), getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId));
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index c989dee..3a664fc 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -255,7 +255,10 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
// aggregate to calculate base cuboid
allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, partition).persist(storageLevel);
- saveToHDFS(allRDDs[0], vCubeDesc.getValue(), outputPath, 0, sc.hadoopConfiguration());
+ Configuration confOverwrite = new Configuration(sc.hadoopConfiguration());
+ confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2
+
+ saveToHDFS(allRDDs[0], vCubeDesc.getValue(), outputPath, 0, confOverwrite);
// aggregate to ND cuboids
PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder);
@@ -267,7 +270,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
if (kylinConfig.isSparkSanityCheckEnabled() == true) {
sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex);
}
- saveToHDFS(allRDDs[level], vCubeDesc.getValue(), outputPath, level, sc.hadoopConfiguration());
+ saveToHDFS(allRDDs[level], vCubeDesc.getValue(), outputPath, level, confOverwrite);
allRDDs[level - 1].unpersist();
}
allRDDs[totalLevels - 1].unpersist();
@@ -285,8 +288,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
}
private static void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final CubeDesc cubeDesc, final String hdfsBaseLocation, int level, Configuration conf) {
- conf.set("dfs.replication", "2");
- final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
+ final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
rdd.mapToPair(new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
BufferedMeasureCodec codec = new BufferedMeasureCodec(cubeDesc.getMeasures());
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index d892060..733a472 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -76,7 +76,6 @@ public class SparkExecutable extends AbstractExecutable {
final KylinConfig config = context.getConfig();
Preconditions.checkNotNull(config.getSparkHome());
Preconditions.checkNotNull(config.getKylinJobJarPath());
- String sparkConf = config.getSparkConfFile();
String jars = this.getParam(JARS);
String hadoopConf = "/etc/hadoop/conf";
@@ -105,17 +104,17 @@ public class SparkExecutable extends AbstractExecutable {
}
StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry --properties-file %s ");
+ stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
Map<String, String> sparkConfs = config.getSparkConfigOverride();
for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
- stringBuilder.append(" --conf ").append(entry.getKey()).append("==").append(entry.getValue()).append(" ");
+ stringBuilder.append(" --conf ").append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
}
stringBuilder.append("--files %s --jars %s %s %s");
try {
String cmd = String.format(stringBuilder.toString(),
- hadoopConf, config.getSparkHome(), sparkConf, hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs());
+ hadoopConf, config.getSparkHome(), hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs());
logger.info("cmd:" + cmd);
final StringBuilder output = new StringBuilder();
CliCommandExecutor exec = new CliCommandExecutor();
http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index d42e009..06f8e4b 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -160,6 +160,19 @@ kylin.source.hive.keep-flat-table=false
### Spark as Engine ###
kylin.engine.spark.env.hadoop-conf-dir=../examples/test_case_data/sandbox
-kylin.engine.spark.spark-home=/usr/local/spark
-kylin.engine.spark.properties-file=../examples/test_case_data/sandbox/kylin-spark-conf.properties
-kylin.engine.spark.sanity-check-enabled=false
\ No newline at end of file
+kylin.engine.spark.sanity-check-enabled=false
+
+### Spark conf overwrite for cube engine
+kylin.engine.spark-conf.spark.master=yarn
+kylin.engine.spark-conf.spark.submit.deployMode=client
+kylin.engine.spark-conf.spark.yarn.executor.memoryOverhead=512
+kylin.engine.spark-conf.spark.yarn.driver.memoryOverhead=384
+kylin.engine.spark-conf.spark.executor.memory=1G
+kylin.engine.spark-conf.spark.executor.cores=1
+kylin.engine.spark-conf.spark.executor.instances=1
+kylin.engine.spark-conf.spark.storage.memoryFraction=0.3
+kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
+kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
+#kylin.engine.spark-conf.spark.yarn.queue=default
+#kylin.engine.spark-conf.spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
+#kylin.engine.spark-conf.spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec