You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/05/31 13:36:25 UTC

[kylin] 01/02: KYLIN-3137 Spark cubing refine

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch KYLIN-3137
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit f11aec308119426222a4bc9d9d6a28f551a16111
Author: shaofengshi <sh...@apache.org>
AuthorDate: Thu May 31 11:17:49 2018 +0800

    KYLIN-3137 Spark cubing refine
---
 .../kylin/engine/mr/common/BaseCuboidBuilder.java  |  1 +
 .../engine/spark/SparkBatchCubingJobBuilder2.java  | 11 +++--
 .../kylin/engine/spark/SparkCubingByLayer.java     | 49 +++++++++-------------
 .../apache/kylin/engine/spark/SparkExecutable.java |  5 ---
 4 files changed, 26 insertions(+), 40 deletions(-)

diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
index 40f1ac5..7cc7779 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
@@ -47,6 +47,7 @@ public class BaseCuboidBuilder implements java.io.Serializable {
 
     protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidBuilder.class);
     public static final String HIVE_NULL = "\\N";
+    public static final String SEQUENCEFILE_DELIMITER = "\\01";
     protected String cubeName;
     protected Cuboid baseCuboid;
     protected CubeDesc cubeDesc;
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 57d4fb0..91690dd 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -26,6 +26,7 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.slf4j.Logger;
@@ -69,11 +70,12 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
 
     public void configureSparkJob(final CubeSegment seg, final SparkExecutable sparkExecutable,
             final String jobId, final String cuboidRootPath) {
-        IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+        final IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+        final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId));
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
-        sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(),
-                seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
+        sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_PATH.getOpt(),
+                tablePath);
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(),
                 getSegmentMetadataUrl(seg.getConfig(), jobId));
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath);
@@ -81,9 +83,6 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
 
         StringBuilder jars = new StringBuilder();
 
-        StringUtil.appendWithSeparator(jars, findJar("com.yammer.metrics.core.Gauge", null)); // metrics-core.jar
-        StringUtil.appendWithSeparator(jars, findJar("com.google.common.collect.Maps", "guava")); //guava.jar
-
         StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
         sparkExecutable.setJars(jars.toString());
         sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE);
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 76e7e22..e6d478e 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -30,12 +30,14 @@ import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.common.util.Pair;
@@ -71,15 +73,14 @@ import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.hive.HiveContext;
 import org.apache.spark.storage.StorageLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import scala.Tuple2;
 
+import static org.apache.kylin.engine.mr.common.BaseCuboidBuilder.SEQUENCEFILE_DELIMITER;
+
 /**
  * Spark application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase.
  */
@@ -95,14 +96,14 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
             .withDescription("HDFS metadata url").create("metaUrl");
     public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
             .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
-    public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
-            .withDescription("Hive Intermediate Table").create("hiveTable");
+    public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg().isRequired(true)
+            .withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
 
     private Options options;
 
     public SparkCubingByLayer() {
         options = new Options();
-        options.addOption(OPTION_INPUT_TABLE);
+        options.addOption(OPTION_INPUT_PATH);
         options.addOption(OPTION_CUBE_NAME);
         options.addOption(OPTION_SEGMENT_ID);
         options.addOption(OPTION_META_URL);
@@ -117,7 +118,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
     @Override
     protected void execute(OptionsHelper optionsHelper) throws Exception {
         String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
-        String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
+        String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
         String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
         String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
         String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
@@ -145,6 +146,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2
         final Job job = Job.getInstance(confOverwrite);
 
+        logger.info("RDD input path: {}", inputPath);
         logger.info("RDD Output path: {}", outputPath);
         setHadoopConf(job, cubeSegment, metaUrl);
 
@@ -166,12 +168,15 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure);
         StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER();
 
-        HiveContext sqlContext = new HiveContext(sc.sc());
-        final Dataset intermediateTable = sqlContext.table(hiveTable);
-
-        // encode with dimension encoding, transform to <ByteArray, Object[]> RDD
-        final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD()
-                .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
+        final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = sc
+                .sequenceFile(inputPath, BytesWritable.class, Text.class).values()
+                .map(new Function<Text, String[]>() {
+                    @Override
+                    public String[] call(Text text) throws Exception {
+                        String s = Bytes.toString(text.getBytes(), 0, text.getLength());
+                        return s.split(SEQUENCEFILE_DELIMITER);
+                    }
+                }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
 
         Long totalCount = 0L;
         if (envConfig.isSparkSanityCheckEnabled()) {
@@ -269,7 +274,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         logger.info("Persisting RDD for level " + level + " into " + cuboidOutputPath);
     }
 
-    static public class EncodeBaseCuboid implements PairFunction<Row, ByteArray, Object[]> {
+    static public class EncodeBaseCuboid implements PairFunction<String[], ByteArray, Object[]> {
         private volatile transient boolean initialized = false;
         private BaseCuboidBuilder baseCuboidBuilder = null;
         private String cubeName;
@@ -285,7 +290,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         }
 
         @Override
-        public Tuple2<ByteArray, Object[]> call(Row row) throws Exception {
+        public Tuple2<ByteArray, Object[]> call(String[] rowArray) throws Exception {
             if (initialized == false) {
                 synchronized (SparkCubingByLayer.class) {
                     if (initialized == false) {
@@ -304,25 +309,11 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
                     }
                 }
             }
-            String[] rowArray = rowToArray(row);
             baseCuboidBuilder.resetAggrs();
             byte[] rowKey = baseCuboidBuilder.buildKey(rowArray);
             Object[] result = baseCuboidBuilder.buildValueObjects(rowArray);
             return new Tuple2<>(new ByteArray(rowKey), result);
         }
-
-        private String[] rowToArray(Row row) {
-            String[] result = new String[row.size()];
-            for (int i = 0; i < row.size(); i++) {
-                final Object o = row.get(i);
-                if (o != null) {
-                    result[i] = o.toString();
-                } else {
-                    result[i] = null;
-                }
-            }
-            return result;
-        }
     }
 
     static public class BaseCuboidReducerFunction2 implements Function2<Object[], Object[], Object[]> {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 69232ba..8de78c0 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -112,11 +112,6 @@ public class SparkExecutable extends AbstractExecutable {
                     "kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'");
         }
 
-        File hiveConfFile = new File(hadoopConf, "hive-site.xml");
-        if (!hiveConfFile.exists()) {
-            throw new RuntimeException("Cannot find hive-site.xml in kylin_hadoop_conf_dir: " + hadoopConf + //
-                    ". In order to enable spark cubing, you must set kylin.env.hadoop-conf-dir to a dir which contains at least core-site.xml, hdfs-site.xml, hive-site.xml, mapred-site.xml, yarn-site.xml");
-        }
         logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR");
 
         String jobJar = config.getKylinJobJarPath();

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.