You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/01/09 08:58:55 UTC

[22/25] kylin git commit: KYLIN-2371 Allow overwrite default spark conf at cube level

KYLIN-2371 Allow overwrite default spark conf at cube level


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ee74a74e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ee74a74e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ee74a74e

Branch: refs/heads/sparkcubing-rebase
Commit: ee74a74e4d17a45b3832912381220af39e71f3d5
Parents: 66bca9a
Author: shaofengshi <sh...@apache.org>
Authored: Mon Jan 9 15:23:22 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jan 9 16:58:11 2017 +0800

----------------------------------------------------------------------
 build/conf/kylin-spark-conf.properties          |  5 +--
 build/conf/kylin.properties                     |  6 ++--
 .../apache/kylin/common/KylinConfigBase.java    | 12 ++++---
 .../kylin/job/constant/ExecutableConstants.java |  1 +
 .../spark/SparkBatchCubingJobBuilder2.java      |  2 +-
 .../kylin/engine/spark/SparkCubingByLayer.java  |  7 ++--
 .../kylin/engine/spark/SparkExecutable.java     | 38 +++++++++++++++++---
 .../test_case_data/sandbox/kylin.properties     |  3 +-
 8 files changed, 55 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ee74a74e/build/conf/kylin-spark-conf.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin-spark-conf.properties b/build/conf/kylin-spark-conf.properties
index 5e6dafe..143e7e4 100644
--- a/build/conf/kylin-spark-conf.properties
+++ b/build/conf/kylin-spark-conf.properties
@@ -1,5 +1,5 @@
 spark.yarn.submit.file.replication=1
-spark.yarn.executor.memoryOverhead=200
+spark.yarn.executor.memoryOverhead=1024
 spark.yarn.driver.memoryOverhead=384
 spark.master=yarn
 spark.submit.deployMode=cluster
@@ -20,7 +20,8 @@ spark.executor.cores=4
 spark.executor.instances=8
 spark.history.kerberos.keytab=none
 spark.history.kerberos.principal=none
-#spark.yarn.jar=hdfs://namenode:8020/apps/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
 spark.driver.extraJavaOptions=-Dhdp.version=current
 spark.yarn.am.extraJavaOptions=-Dhdp.version=current
 spark.executor.extraJavaOptions=-Dhdp.version=current
+#spark.yarn.jar=hdfs://namenode:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
+#spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee74a74e/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 54430f0..6efa423 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -131,16 +131,16 @@ kylin.engine.mr.mapper-input-rows=1000000
 ### Spark Engine ###
 
 # Hadoop conf folder, will export this as "HADOOP_CONF_DIR" before run spark-submit
-kylin.engine.spark.env.hadoop-conf-dir=/etc/hadoop/conf
+#kylin.engine.spark.env.hadoop-conf-dir=/etc/hive/conf
 
 # Spark job submission properties file, default be $KYLIN_HOME/conf/kylin-spark-conf.properties
 #kylin.engine.spark.properties-file=
 
 # Estimate the RDD partition numbers
-kylin.engine.spark.rdd-partition-cut-mb=50
+kylin.engine.spark.rdd-partition-cut-mb=10
 
 # Minimal partition numbers of rdd
-kylin.engine.spark.min-partition=10
+kylin.engine.spark.min-partition=1
 
 # Max partition numbers of rdd
 kylin.engine.spark.max-partition=5000

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee74a74e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 7d6ac2b..36ddbf4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -695,6 +695,10 @@ abstract public class KylinConfigBase implements Serializable {
         return getPropertiesByPrefix("kylin.engine.mr.config-override.");
     }
 
+    public Map<String, String> getSparkConfigOverride() {
+        return getPropertiesByPrefix("kylin.engine.spark.config-override.");
+    }
+
     public double getDefaultHadoopJobReducerInputMB() {
         return Double.parseDouble(getOptional("kylin.engine.mr.reduce-input-mb", "500"));
     }
@@ -739,8 +743,8 @@ abstract public class KylinConfigBase implements Serializable {
     // ENGINE.SPARK
     // ============================================================================
 
-    public String getSparkHadoopConfDir() {
-        return getRequired("kylin.engine.spark.env.hadoop-conf-dir");
+    public String getHadoopConfDir() {
+        return getOptional("kylin.engine.spark.env.hadoop-conf-dir", "");
     }
 
     public String getSparkConfFile() {
@@ -764,7 +768,7 @@ abstract public class KylinConfigBase implements Serializable {
     }
 
     public float getSparkRDDPartitionCutMB() {
-        return Float.valueOf(getOptional("kylin.engine.spark.rdd-partition-cut-mb", "200.0"));
+        return Float.valueOf(getOptional("kylin.engine.spark.rdd-partition-cut-mb", "10.0"));
     }
 
 
@@ -773,7 +777,7 @@ abstract public class KylinConfigBase implements Serializable {
     }
 
     public int getSparkMaxPartition() {
-        return Integer.valueOf(getOptional("kylin.engine.spark.max-partition", "500"));
+        return Integer.valueOf(getOptional("kylin.engine.spark.max-partition", "5000"));
     }
 
     public boolean isSparkSanityCheckEnabled() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee74a74e/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 11c7455..d7f6292 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -40,6 +40,7 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
     public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid";
     public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube In-Mem";
+    public static final String STEP_NAME_BUILD_SPARK_CUBE = "Build Cube with Spark";
     public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid";
     public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits";
     public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable";

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee74a74e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 55e11c4..208a0c9 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -70,7 +70,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
         StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
         sparkExecutable.setJars(jars.toString());
 
-        sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE + " with Spark");
+        sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE);
         return sparkExecutable;
 
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee74a74e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 93cce81..c989dee 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -144,7 +144,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH);
         final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
 
-        SparkConf conf = new SparkConf().setAppName("Cubing Application");
+        SparkConf conf = new SparkConf().setAppName("Cubing for:" + cubeName + ", segment " + segmentId);
         //serialization conf
         conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
         conf.set("spark.kryo.registrationRequired", "true");
@@ -249,7 +249,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         }
 
         final int totalLevels = cubeDesc.getBuildLevel();
-        JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels];
+        JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels + 1];
         int level = 0;
         int partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig);
 
@@ -285,6 +285,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
     }
 
     private static void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final CubeDesc cubeDesc, final String hdfsBaseLocation, int level, Configuration conf) {
+        conf.set("dfs.replication", "2");
         final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
                 rdd.mapToPair(new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
                     BufferedMeasureCodec codec = new BufferedMeasureCodec(cubeDesc.getMeasures());
@@ -403,7 +404,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         Long count = rdd.mapValues(new Function<Object[], Long>() {
             @Override
             public Long call(Object[] objects) throws Exception {
-                return (Long) objects[countMeasureIndex]; // assume the first measure is COUNT(*)
+                return (Long) objects[countMeasureIndex];
             }
         }).reduce(new Function2<Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>>() {
             @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee74a74e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 644f73f..d892060 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -17,9 +17,10 @@
 */
 package org.apache.kylin.engine.spark;
 
-import java.io.IOException;
+import java.io.File;
 import java.util.Map;
 
+import jodd.util.StringUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.CliCommandExecutor;
@@ -78,14 +79,43 @@ public class SparkExecutable extends AbstractExecutable {
         String sparkConf = config.getSparkConfFile();
         String jars = this.getParam(JARS);
 
-        String jobJar = config.getKylinJobJarPath();
+        String hadoopConf = "/etc/hadoop/conf";
+        if (StringUtil.isNotEmpty(config.getHadoopConfDir())) {
+            hadoopConf = config.getHadoopConfDir();
+        } else {
+            String hiveConf = ClassLoader.getSystemClassLoader().getResource("hive-site.xml").getFile().toString();
+            File hiveConfFile = new File(hiveConf);
+            if (hiveConfFile.exists() == true) {
+                logger.info("Locate hive-site.xml in " + hiveConfFile);
+                hadoopConf = hiveConfFile.getParent();
+            }
+        }
+        logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR");
+
+        String hbaseConf = ClassLoader.getSystemClassLoader().getResource("hbase-site.xml").getFile().toString();
+        logger.info("Get hbase-site.xml location from classpath: " + hbaseConf);
+        File hbaseConfFile = new File(hbaseConf);
+        if (hbaseConfFile.exists() == false) {
+            throw new IllegalArgumentException("Couldn't find hbase-site.xml from classpath.");
+        }
 
+        String jobJar = config.getKylinJobJarPath();
         if (StringUtils.isEmpty(jars)) {
             jars = jobJar;
         }
 
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry --properties-file %s ");
+
+        Map<String, String> sparkConfs = config.getSparkConfigOverride();
+        for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
+            stringBuilder.append(" --conf ").append(entry.getKey()).append("==").append(entry.getValue()).append(" ");
+        }
+
+        stringBuilder.append("--files %s --jars %s %s %s");
         try {
-            String cmd = String.format("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class \"org.apache.kylin.common.util.SparkEntry\" --properties-file %s --jars %s %s %s", config.getSparkHadoopConfDir(), config.getSparkHome(), sparkConf, jars, jobJar, formatArgs());
+            String cmd = String.format(stringBuilder.toString(),
+                    hadoopConf, config.getSparkHome(), sparkConf, hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs());
             logger.info("cmd:" + cmd);
             final StringBuilder output = new StringBuilder();
             CliCommandExecutor exec = new CliCommandExecutor();
@@ -98,7 +128,7 @@ public class SparkExecutable extends AbstractExecutable {
                 }
             });
             return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
-        } catch (IOException e) {
+        } catch (Exception e) {
             logger.error("error run spark job:", e);
             return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee74a74e/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index a011911..d42e009 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -156,10 +156,9 @@ kylin.server.query-metrics-percentiles-intervals=60, 360, 3600
 
 # Env DEV|QA|PROD
 kylin.env=DEV
-kylin.source.hive.keep-flat-table=true
+kylin.source.hive.keep-flat-table=false
 
 ### Spark as Engine ###
-#kylin.engine.spark.env.hadoop-conf-dir=/etc/hadoop/conf
 kylin.engine.spark.env.hadoop-conf-dir=../examples/test_case_data/sandbox
 kylin.engine.spark.spark-home=/usr/local/spark
 kylin.engine.spark.properties-file=../examples/test_case_data/sandbox/kylin-spark-conf.properties