You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/05/31 13:36:25 UTC
[kylin] 01/02: KYLIN-3137 Spark cubing refine
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch KYLIN-3137
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit f11aec308119426222a4bc9d9d6a28f551a16111
Author: shaofengshi <sh...@apache.org>
AuthorDate: Thu May 31 11:17:49 2018 +0800
KYLIN-3137 Spark cubing refine
---
.../kylin/engine/mr/common/BaseCuboidBuilder.java | 1 +
.../engine/spark/SparkBatchCubingJobBuilder2.java | 11 +++--
.../kylin/engine/spark/SparkCubingByLayer.java | 49 +++++++++-------------
.../apache/kylin/engine/spark/SparkExecutable.java | 5 ---
4 files changed, 26 insertions(+), 40 deletions(-)
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
index 40f1ac5..7cc7779 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
@@ -47,6 +47,7 @@ public class BaseCuboidBuilder implements java.io.Serializable {
protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidBuilder.class);
public static final String HIVE_NULL = "\\N";
+ public static final String SEQUENCEFILE_DELIMITER = "\\01";
protected String cubeName;
protected Cuboid baseCuboid;
protected CubeDesc cubeDesc;
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 57d4fb0..91690dd 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -26,6 +26,7 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.slf4j.Logger;
@@ -69,11 +70,12 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
public void configureSparkJob(final CubeSegment seg, final SparkExecutable sparkExecutable,
final String jobId, final String cuboidRootPath) {
- IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+ final IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+ final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId));
sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
- sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(),
- seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
+ sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_PATH.getOpt(),
+ tablePath);
sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(),
getSegmentMetadataUrl(seg.getConfig(), jobId));
sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath);
@@ -81,9 +83,6 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
StringBuilder jars = new StringBuilder();
- StringUtil.appendWithSeparator(jars, findJar("com.yammer.metrics.core.Gauge", null)); // metrics-core.jar
- StringUtil.appendWithSeparator(jars, findJar("com.google.common.collect.Maps", "guava")); //guava.jar
-
StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
sparkExecutable.setJars(jars.toString());
sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE);
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 76e7e22..e6d478e 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -30,12 +30,14 @@ import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.common.util.Pair;
@@ -71,15 +73,14 @@ import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
+import static org.apache.kylin.engine.mr.common.BaseCuboidBuilder.SEQUENCEFILE_DELIMITER;
+
/**
* Spark application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase.
*/
@@ -95,14 +96,14 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
.withDescription("HDFS metadata url").create("metaUrl");
public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
.isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
- public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
- .withDescription("Hive Intermediate Table").create("hiveTable");
+ public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg().isRequired(true)
+ .withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
private Options options;
public SparkCubingByLayer() {
options = new Options();
- options.addOption(OPTION_INPUT_TABLE);
+ options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_META_URL);
@@ -117,7 +118,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
@Override
protected void execute(OptionsHelper optionsHelper) throws Exception {
String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
- String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
+ String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
@@ -145,6 +146,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2
final Job job = Job.getInstance(confOverwrite);
+ logger.info("RDD input path: {}", inputPath);
logger.info("RDD Output path: {}", outputPath);
setHadoopConf(job, cubeSegment, metaUrl);
@@ -166,12 +168,15 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure);
StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER();
- HiveContext sqlContext = new HiveContext(sc.sc());
- final Dataset intermediateTable = sqlContext.table(hiveTable);
-
- // encode with dimension encoding, transform to <ByteArray, Object[]> RDD
- final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD()
- .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
+ final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = sc
+ .sequenceFile(inputPath, BytesWritable.class, Text.class).values()
+ .map(new Function<Text, String[]>() {
+ @Override
+ public String[] call(Text text) throws Exception {
+ String s = Bytes.toString(text.getBytes(), 0, text.getLength());
+ return s.split(SEQUENCEFILE_DELIMITER);
+ }
+ }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
Long totalCount = 0L;
if (envConfig.isSparkSanityCheckEnabled()) {
@@ -269,7 +274,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
logger.info("Persisting RDD for level " + level + " into " + cuboidOutputPath);
}
- static public class EncodeBaseCuboid implements PairFunction<Row, ByteArray, Object[]> {
+ static public class EncodeBaseCuboid implements PairFunction<String[], ByteArray, Object[]> {
private volatile transient boolean initialized = false;
private BaseCuboidBuilder baseCuboidBuilder = null;
private String cubeName;
@@ -285,7 +290,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
}
@Override
- public Tuple2<ByteArray, Object[]> call(Row row) throws Exception {
+ public Tuple2<ByteArray, Object[]> call(String[] rowArray) throws Exception {
if (initialized == false) {
synchronized (SparkCubingByLayer.class) {
if (initialized == false) {
@@ -304,25 +309,11 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
}
}
}
- String[] rowArray = rowToArray(row);
baseCuboidBuilder.resetAggrs();
byte[] rowKey = baseCuboidBuilder.buildKey(rowArray);
Object[] result = baseCuboidBuilder.buildValueObjects(rowArray);
return new Tuple2<>(new ByteArray(rowKey), result);
}
-
- private String[] rowToArray(Row row) {
- String[] result = new String[row.size()];
- for (int i = 0; i < row.size(); i++) {
- final Object o = row.get(i);
- if (o != null) {
- result[i] = o.toString();
- } else {
- result[i] = null;
- }
- }
- return result;
- }
}
static public class BaseCuboidReducerFunction2 implements Function2<Object[], Object[], Object[]> {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 69232ba..8de78c0 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -112,11 +112,6 @@ public class SparkExecutable extends AbstractExecutable {
"kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'");
}
- File hiveConfFile = new File(hadoopConf, "hive-site.xml");
- if (!hiveConfFile.exists()) {
- throw new RuntimeException("Cannot find hive-site.xml in kylin_hadoop_conf_dir: " + hadoopConf + //
- ". In order to enable spark cubing, you must set kylin.env.hadoop-conf-dir to a dir which contains at least core-site.xml, hdfs-site.xml, hive-site.xml, mapred-site.xml, yarn-site.xml");
- }
logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR");
String jobJar = config.getKylinJobJarPath();
--
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.