You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/01/09 08:58:55 UTC
[22/25] kylin git commit: KYLIN-2371 Allow overwrite default spark
conf at cube level
KYLIN-2371 Allow overwrite default spark conf at cube level
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ee74a74e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ee74a74e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ee74a74e
Branch: refs/heads/sparkcubing-rebase
Commit: ee74a74e4d17a45b3832912381220af39e71f3d5
Parents: 66bca9a
Author: shaofengshi <sh...@apache.org>
Authored: Mon Jan 9 15:23:22 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jan 9 16:58:11 2017 +0800
----------------------------------------------------------------------
build/conf/kylin-spark-conf.properties | 5 +--
build/conf/kylin.properties | 6 ++--
.../apache/kylin/common/KylinConfigBase.java | 12 ++++---
.../kylin/job/constant/ExecutableConstants.java | 1 +
.../spark/SparkBatchCubingJobBuilder2.java | 2 +-
.../kylin/engine/spark/SparkCubingByLayer.java | 7 ++--
.../kylin/engine/spark/SparkExecutable.java | 38 +++++++++++++++++---
.../test_case_data/sandbox/kylin.properties | 3 +-
8 files changed, 55 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/ee74a74e/build/conf/kylin-spark-conf.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin-spark-conf.properties b/build/conf/kylin-spark-conf.properties
index 5e6dafe..143e7e4 100644
--- a/build/conf/kylin-spark-conf.properties
+++ b/build/conf/kylin-spark-conf.properties
@@ -1,5 +1,5 @@
spark.yarn.submit.file.replication=1
-spark.yarn.executor.memoryOverhead=200
+spark.yarn.executor.memoryOverhead=1024
spark.yarn.driver.memoryOverhead=384
spark.master=yarn
spark.submit.deployMode=cluster
@@ -20,7 +20,8 @@ spark.executor.cores=4
spark.executor.instances=8
spark.history.kerberos.keytab=none
spark.history.kerberos.principal=none
-#spark.yarn.jar=hdfs://namenode:8020/apps/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
spark.driver.extraJavaOptions=-Dhdp.version=current
spark.yarn.am.extraJavaOptions=-Dhdp.version=current
spark.executor.extraJavaOptions=-Dhdp.version=current
+#spark.yarn.jar=hdfs://namenode:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
+#spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec
http://git-wip-us.apache.org/repos/asf/kylin/blob/ee74a74e/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 54430f0..6efa423 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -131,16 +131,16 @@ kylin.engine.mr.mapper-input-rows=1000000
### Spark Engine ###
# Hadoop conf folder, will export this as "HADOOP_CONF_DIR" before run spark-submit
-kylin.engine.spark.env.hadoop-conf-dir=/etc/hadoop/conf
+#kylin.engine.spark.env.hadoop-conf-dir=/etc/hive/conf
# Spark job submission properties file, default be $KYLIN_HOME/conf/kylin-spark-conf.properties
#kylin.engine.spark.properties-file=
# Estimate the RDD partition numbers
-kylin.engine.spark.rdd-partition-cut-mb=50
+kylin.engine.spark.rdd-partition-cut-mb=10
# Minimal partition numbers of rdd
-kylin.engine.spark.min-partition=10
+kylin.engine.spark.min-partition=1
# Max partition numbers of rdd
kylin.engine.spark.max-partition=5000
http://git-wip-us.apache.org/repos/asf/kylin/blob/ee74a74e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 7d6ac2b..36ddbf4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -695,6 +695,10 @@ abstract public class KylinConfigBase implements Serializable {
return getPropertiesByPrefix("kylin.engine.mr.config-override.");
}
+ public Map<String, String> getSparkConfigOverride() {
+ return getPropertiesByPrefix("kylin.engine.spark.config-override.");
+ }
+
public double getDefaultHadoopJobReducerInputMB() {
return Double.parseDouble(getOptional("kylin.engine.mr.reduce-input-mb", "500"));
}
@@ -739,8 +743,8 @@ abstract public class KylinConfigBase implements Serializable {
// ENGINE.SPARK
// ============================================================================
- public String getSparkHadoopConfDir() {
- return getRequired("kylin.engine.spark.env.hadoop-conf-dir");
+ public String getHadoopConfDir() {
+ return getOptional("kylin.engine.spark.env.hadoop-conf-dir", "");
}
public String getSparkConfFile() {
@@ -764,7 +768,7 @@ abstract public class KylinConfigBase implements Serializable {
}
public float getSparkRDDPartitionCutMB() {
- return Float.valueOf(getOptional("kylin.engine.spark.rdd-partition-cut-mb", "200.0"));
+ return Float.valueOf(getOptional("kylin.engine.spark.rdd-partition-cut-mb", "10.0"));
}
@@ -773,7 +777,7 @@ abstract public class KylinConfigBase implements Serializable {
}
public int getSparkMaxPartition() {
- return Integer.valueOf(getOptional("kylin.engine.spark.max-partition", "500"));
+ return Integer.valueOf(getOptional("kylin.engine.spark.max-partition", "5000"));
}
public boolean isSparkSanityCheckEnabled() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/ee74a74e/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 11c7455..d7f6292 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -40,6 +40,7 @@ public final class ExecutableConstants {
public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid";
public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube In-Mem";
+ public static final String STEP_NAME_BUILD_SPARK_CUBE = "Build Cube with Spark";
public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid";
public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits";
public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable";
http://git-wip-us.apache.org/repos/asf/kylin/blob/ee74a74e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 55e11c4..208a0c9 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -70,7 +70,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
sparkExecutable.setJars(jars.toString());
- sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE + " with Spark");
+ sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE);
return sparkExecutable;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ee74a74e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 93cce81..c989dee 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -144,7 +144,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH);
final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
- SparkConf conf = new SparkConf().setAppName("Cubing Application");
+ SparkConf conf = new SparkConf().setAppName("Cubing for:" + cubeName + ", segment " + segmentId);
//serialization conf
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryo.registrationRequired", "true");
@@ -249,7 +249,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
}
final int totalLevels = cubeDesc.getBuildLevel();
- JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels];
+ JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels + 1];
int level = 0;
int partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig);
@@ -285,6 +285,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
}
private static void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final CubeDesc cubeDesc, final String hdfsBaseLocation, int level, Configuration conf) {
+ conf.set("dfs.replication", "2");
final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
rdd.mapToPair(new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
BufferedMeasureCodec codec = new BufferedMeasureCodec(cubeDesc.getMeasures());
@@ -403,7 +404,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
Long count = rdd.mapValues(new Function<Object[], Long>() {
@Override
public Long call(Object[] objects) throws Exception {
- return (Long) objects[countMeasureIndex]; // assume the first measure is COUNT(*)
+ return (Long) objects[countMeasureIndex];
}
}).reduce(new Function2<Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>>() {
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/ee74a74e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 644f73f..d892060 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -17,9 +17,10 @@
*/
package org.apache.kylin.engine.spark;
-import java.io.IOException;
+import java.io.File;
import java.util.Map;
+import jodd.util.StringUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.CliCommandExecutor;
@@ -78,14 +79,43 @@ public class SparkExecutable extends AbstractExecutable {
String sparkConf = config.getSparkConfFile();
String jars = this.getParam(JARS);
- String jobJar = config.getKylinJobJarPath();
+ String hadoopConf = "/etc/hadoop/conf";
+ if (StringUtil.isNotEmpty(config.getHadoopConfDir())) {
+ hadoopConf = config.getHadoopConfDir();
+ } else {
+ String hiveConf = ClassLoader.getSystemClassLoader().getResource("hive-site.xml").getFile().toString();
+ File hiveConfFile = new File(hiveConf);
+ if (hiveConfFile.exists() == true) {
+ logger.info("Locate hive-site.xml in " + hiveConfFile);
+ hadoopConf = hiveConfFile.getParent();
+ }
+ }
+ logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR");
+
+ String hbaseConf = ClassLoader.getSystemClassLoader().getResource("hbase-site.xml").getFile().toString();
+ logger.info("Get hbase-site.xml location from classpath: " + hbaseConf);
+ File hbaseConfFile = new File(hbaseConf);
+ if (hbaseConfFile.exists() == false) {
+ throw new IllegalArgumentException("Couldn't find hbase-site.xml from classpath.");
+ }
+ String jobJar = config.getKylinJobJarPath();
if (StringUtils.isEmpty(jars)) {
jars = jobJar;
}
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry --properties-file %s ");
+
+ Map<String, String> sparkConfs = config.getSparkConfigOverride();
+ for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
+ stringBuilder.append(" --conf ").append(entry.getKey()).append("==").append(entry.getValue()).append(" ");
+ }
+
+ stringBuilder.append("--files %s --jars %s %s %s");
try {
- String cmd = String.format("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class \"org.apache.kylin.common.util.SparkEntry\" --properties-file %s --jars %s %s %s", config.getSparkHadoopConfDir(), config.getSparkHome(), sparkConf, jars, jobJar, formatArgs());
+ String cmd = String.format(stringBuilder.toString(),
+ hadoopConf, config.getSparkHome(), sparkConf, hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs());
logger.info("cmd:" + cmd);
final StringBuilder output = new StringBuilder();
CliCommandExecutor exec = new CliCommandExecutor();
@@ -98,7 +128,7 @@ public class SparkExecutable extends AbstractExecutable {
}
});
return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
- } catch (IOException e) {
+ } catch (Exception e) {
logger.error("error run spark job:", e);
return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ee74a74e/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index a011911..d42e009 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -156,10 +156,9 @@ kylin.server.query-metrics-percentiles-intervals=60, 360, 3600
# Env DEV|QA|PROD
kylin.env=DEV
-kylin.source.hive.keep-flat-table=true
+kylin.source.hive.keep-flat-table=false
### Spark as Engine ###
-#kylin.engine.spark.env.hadoop-conf-dir=/etc/hadoop/conf
kylin.engine.spark.env.hadoop-conf-dir=../examples/test_case_data/sandbox
kylin.engine.spark.spark-home=/usr/local/spark
kylin.engine.spark.properties-file=../examples/test_case_data/sandbox/kylin-spark-conf.properties