You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/06/05 06:45:31 UTC

[kylin] branch master updated: KYLIN-3137 Spark cubing without hive

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fdf55a  KYLIN-3137 Spark cubing without hive
0fdf55a is described below

commit 0fdf55abb2c709e04bf327d15299d695dce7b219
Author: shaofengshi <sh...@apache.org>
AuthorDate: Mon Jun 4 14:59:55 2018 +0800

    KYLIN-3137 Spark cubing without hive
---
 .../engine/spark/SparkBatchCubingJobBuilder2.java  |  2 +
 .../kylin/engine/spark/SparkCubingByLayer.java     | 51 +++++++++++++++++-----
 2 files changed, 43 insertions(+), 10 deletions(-)

diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 91690dd..6ce9b90 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -74,6 +74,8 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
         final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId));
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+        sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(),
+                seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_PATH.getOpt(),
                 tablePath);
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(),
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index e6d478e..711c9ac 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,6 +17,8 @@
 */
 package org.apache.kylin.engine.spark;
 
+import static org.apache.kylin.engine.mr.common.BaseCuboidBuilder.SEQUENCEFILE_DELIMITER;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
@@ -73,14 +75,15 @@ import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.storage.StorageLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import scala.Tuple2;
 
-import static org.apache.kylin.engine.mr.common.BaseCuboidBuilder.SEQUENCEFILE_DELIMITER;
-
 /**
  * Spark application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase.
  */
@@ -96,6 +99,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
             .withDescription("HDFS metadata url").create("metaUrl");
     public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
             .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
+            .withDescription("Hive Intermediate Table").create("hiveTable");
     public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg().isRequired(true)
             .withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
 
@@ -103,6 +108,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
 
     public SparkCubingByLayer() {
         options = new Options();
+        options.addOption(OPTION_INPUT_TABLE);
         options.addOption(OPTION_INPUT_PATH);
         options.addOption(OPTION_CUBE_NAME);
         options.addOption(OPTION_SEGMENT_ID);
@@ -118,6 +124,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
     @Override
     protected void execute(OptionsHelper optionsHelper) throws Exception {
         String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
         String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
         String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
         String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
@@ -168,15 +175,39 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure);
         StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER();
 
-        final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = sc
-                .sequenceFile(inputPath, BytesWritable.class, Text.class).values()
-                .map(new Function<Text, String[]>() {
-                    @Override
-                    public String[] call(Text text) throws Exception {
-                        String s = Bytes.toString(text.getBytes(), 0, text.getLength());
-                        return s.split(SEQUENCEFILE_DELIMITER);
+        boolean isSequenceFile = "SEQUENCEFILE".equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+
+        final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD;
+
+        if (isSequenceFile) {
+            encodedBaseRDD = sc.sequenceFile(inputPath, BytesWritable.class, Text.class).values()
+                    .map(new Function<Text, String[]>() {
+                        @Override
+                        public String[] call(Text text) throws Exception {
+                            String s = Bytes.toString(text.getBytes(), 0, text.getLength());
+                            return s.split(SEQUENCEFILE_DELIMITER);
+                        }
+                    }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
+        } else {
+            SparkSession sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate();
+            final Dataset intermediateTable = sparkSession.table(hiveTable);
+            encodedBaseRDD = intermediateTable.javaRDD().map(new Function<Row, String[]>() {
+                @Override
+                public String[] call(Row row) throws Exception {
+                    String[] result = new String[row.size()];
+                    for (int i = 0; i < row.size(); i++) {
+                        final Object o = row.get(i);
+                        if (o != null) {
+                            result[i] = o.toString();
+                        } else {
+                            result[i] = null;
+                        }
                     }
-                }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
+                    return result;
+                }
+            }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
+
+        }
 
         Long totalCount = 0L;
         if (envConfig.isSparkSanityCheckEnabled()) {

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.