You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/06/05 06:45:31 UTC
[kylin] branch master updated: KYLIN-3137 Spark cubing without hive
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 0fdf55a KYLIN-3137 Spark cubing without hive
0fdf55a is described below
commit 0fdf55abb2c709e04bf327d15299d695dce7b219
Author: shaofengshi <sh...@apache.org>
AuthorDate: Mon Jun 4 14:59:55 2018 +0800
KYLIN-3137 Spark cubing without hive
---
.../engine/spark/SparkBatchCubingJobBuilder2.java | 2 +
.../kylin/engine/spark/SparkCubingByLayer.java | 51 +++++++++++++++++-----
2 files changed, 43 insertions(+), 10 deletions(-)
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 91690dd..6ce9b90 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -74,6 +74,8 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId));
sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+ sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(),
+ seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_PATH.getOpt(),
tablePath);
sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(),
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index e6d478e..711c9ac 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,6 +17,8 @@
*/
package org.apache.kylin.engine.spark;
+import static org.apache.kylin.engine.mr.common.BaseCuboidBuilder.SEQUENCEFILE_DELIMITER;
+
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
@@ -73,14 +75,15 @@ import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
-import static org.apache.kylin.engine.mr.common.BaseCuboidBuilder.SEQUENCEFILE_DELIMITER;
-
/**
* Spark application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase.
*/
@@ -96,6 +99,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
.withDescription("HDFS metadata url").create("metaUrl");
public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
.isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
+ public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
+ .withDescription("Hive Intermediate Table").create("hiveTable");
public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg().isRequired(true)
.withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
@@ -103,6 +108,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
public SparkCubingByLayer() {
options = new Options();
+ options.addOption(OPTION_INPUT_TABLE);
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_SEGMENT_ID);
@@ -118,6 +124,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
@Override
protected void execute(OptionsHelper optionsHelper) throws Exception {
String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+ String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
@@ -168,15 +175,39 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure);
StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER();
- final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = sc
- .sequenceFile(inputPath, BytesWritable.class, Text.class).values()
- .map(new Function<Text, String[]>() {
- @Override
- public String[] call(Text text) throws Exception {
- String s = Bytes.toString(text.getBytes(), 0, text.getLength());
- return s.split(SEQUENCEFILE_DELIMITER);
+ boolean isSequenceFile = "SEQUENCEFILE".equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+
+ final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD;
+
+ if (isSequenceFile) {
+ encodedBaseRDD = sc.sequenceFile(inputPath, BytesWritable.class, Text.class).values()
+ .map(new Function<Text, String[]>() {
+ @Override
+ public String[] call(Text text) throws Exception {
+ String s = Bytes.toString(text.getBytes(), 0, text.getLength());
+ return s.split(SEQUENCEFILE_DELIMITER);
+ }
+ }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
+ } else {
+ SparkSession sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate();
+ final Dataset intermediateTable = sparkSession.table(hiveTable);
+ encodedBaseRDD = intermediateTable.javaRDD().map(new Function<Row, String[]>() {
+ @Override
+ public String[] call(Row row) throws Exception {
+ String[] result = new String[row.size()];
+ for (int i = 0; i < row.size(); i++) {
+ final Object o = row.get(i);
+ if (o != null) {
+ result[i] = o.toString();
+ } else {
+ result[i] = null;
+ }
}
- }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
+ return result;
+ }
+ }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
+
+ }
Long totalCount = 0L;
if (envConfig.isSparkSanityCheckEnabled()) {
--
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.