You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/03/24 12:13:55 UTC
[kylin] branch master updated: [KYLIN-4940] Implement the step of
"Extract Dictionary from Global Dictionary" for spark cubing engine
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 5be135c [KYLIN-4940] Implement the step of "Extract Dictionary from Global Dictionary" for spark cubing engine
5be135c is described below
commit 5be135c61c80b9241ac7632dec41cc7209b9d23a
Author: yangjiang <ya...@ebay.com>
AuthorDate: Tue Jan 19 11:34:35 2021 +0800
[KYLIN-4940] Implement the step of "Extract Dictionary from Global Dictionary" for spark cubing engine
---
.../engine/spark/SparkBatchCubingJobBuilder2.java | 4 +
.../kylin/engine/spark/SparkCubingByLayer.java | 174 ++++++++++++++++++++-
.../org/apache/kylin/engine/spark/SparkUtil.java | 40 +++++
3 files changed, 211 insertions(+), 7 deletions(-)
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 7d6a367..4b43318 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -201,6 +201,10 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(),
getSegmentMetadataUrl(seg.getConfig(), jobId));
sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath);
+ if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) {
+ sparkExecutable.setParam(SparkCubingByLayer.OPTION_SHRUNK_INPUT_PATH.getOpt(),
+ getShrunkenDictionaryPath(jobId));
+ }
sparkExecutable.setJobId(jobId);
StringBuilder jars = new StringBuilder();
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index cbedc8b..09c68c9 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,21 +17,28 @@
*/
package org.apache.kylin.engine.spark;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeDescManager;
@@ -44,6 +51,8 @@ import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.dict.ShrunkenDictionary;
+import org.apache.kylin.dict.ShrunkenDictionaryBuilder;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
import org.apache.kylin.engine.mr.IMROutput2;
@@ -59,13 +68,19 @@ import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.measure.MeasureIngester;
import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,6 +106,9 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
.withDescription("Hive Intermediate Table").create("hiveTable");
public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
.isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
+ public static final Option OPTION_SHRUNK_INPUT_PATH = OptionBuilder
+ .withArgName(BatchConstants.ARG_SHRUNKEN_DICT_PATH).hasArg().isRequired(false)
+ .withDescription("shrunken Dictionary Path").create(BatchConstants.ARG_SHRUNKEN_DICT_PATH);
private Options options;
@@ -102,6 +120,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_META_URL);
options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_SHRUNK_INPUT_PATH);
}
@Override
@@ -117,6 +136,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+ String shrunkInputPath = optionsHelper.getOptionValue(OPTION_SHRUNK_INPUT_PATH);
+ logger.info("shrunkInputPath is {}", shrunkInputPath);
Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1") };
@@ -130,7 +151,12 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
JavaSparkContext sc = new JavaSparkContext(conf);
sc.sc().addSparkListener(jobListener);
HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
- SparkUtil.modifySparkHadoopConfiguration(sc.sc(), AbstractHadoopJob.loadKylinConfigFromHdfs(new SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set dfs.replication and enable compress
+ SparkUtil.modifySparkHadoopConfiguration(sc.sc(), AbstractHadoopJob
+ .loadKylinConfigFromHdfs(new SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set dfs.replication and enable compress
+
+ if (shrunkInputPath != null)
+ sc.hadoopConfiguration().set(BatchConstants.ARG_SHRUNKEN_DICT_PATH, shrunkInputPath);
+
final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
@@ -164,9 +190,21 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
- final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = SparkUtil
- .hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable)
- .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
+ final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD;
+ logger.info("isShrunkenDictFromGlobalEnabled {} shrunkInputPath is {}",
+ cubeDesc.isShrunkenDictFromGlobalEnabled(), shrunkInputPath);
+
+ JavaRDD<String[]> recordInputRDD = null;
+
+ if (cubeDesc.isShrunkenDictFromGlobalEnabled() && shrunkInputPath != null) {
+ recordInputRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable).cache();
+ recordInputRDD
+ .foreachPartition(new CreateShrunkenDictionary(cubeName, cubeDesc, cubeSegment, envConfig, sConf));
+ encodedBaseRDD = recordInputRDD.mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
+ } else {
+ encodedBaseRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable)
+ .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
+ }
Long totalCount = 0L;
if (envConfig.isSparkSanityCheckEnabled()) {
@@ -190,6 +228,11 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
saveToHDFS(allRDDs[0], metaUrl, cubeName, cubeSegment, outputPath, 0, job, envConfig);
+ // use ShrunkenDictionary should unpersist recordInputRDD
+ if (recordInputRDD != null) {
+ recordInputRDD.unpersist();
+ }
+
PairFlatMapFunction flatMapFunction = new CuboidFlatMap(cubeName, segmentId, metaUrl, sConf);
// aggregate to ND cuboids
for (level = 1; level <= totalLevels; level++) {
@@ -286,9 +329,16 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
Cuboid baseCuboid = Cuboid.findForMandatory(cubeDesc, baseCuboidId);
- baseCuboidBuilder = new BaseCuboidBuilder(kConfig, cubeDesc, cubeSegment, interDesc,
- AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid),
- MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap());
+ String splitKey = String.valueOf(TaskContext.getPartitionId());
+ try {
+ baseCuboidBuilder = new BaseCuboidBuilder(kConfig, cubeDesc, cubeSegment, interDesc,
+ AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid),
+ MeasureIngester.create(cubeDesc.getMeasures()),
+ SparkUtil.getDictionaryMap(cubeSegment, splitKey, conf.get()));
+ } catch (IOException e) {
+ logger.error("Fail to get shrunk dict");
+ e.printStackTrace();
+ }
initialized = true;
}
}
@@ -466,4 +516,114 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
return count;
}
+ public static class CreateShrunkenDictionary implements VoidFunction<Iterator<String[]>> {
+ private String cubeName;
+ private CubeDesc cubeDesc;
+ private CubeSegment cubeSeg;
+
+ private KylinConfig kylinConfig;
+ private SerializableConfiguration scof;
+ private CubeJoinedFlatTableEnrich intermediateTableDesc;
+
+ private List<TblColRef> globalColumns;
+ private int[] globalColumnIndex;
+ private List<Set<String>> globalColumnValues;
+
+ private volatile transient boolean initialized = false;
+
+ private String splitKey;
+
+ public CreateShrunkenDictionary(String cubeName, CubeDesc cubeDesc, CubeSegment cubeSegment, KylinConfig kylinConfig,
+ SerializableConfiguration serializableConfiguration) {
+ this.cubeName = cubeName;
+ this.cubeDesc = cubeDesc;
+ this.cubeSeg = cubeSegment;
+ this.kylinConfig = kylinConfig;
+ this.scof = serializableConfiguration;
+ this.intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg),
+ cubeDesc);
+ }
+
+ public void init() {
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+ .setAndUnsetThreadLocalConfig(kylinConfig)) {
+ globalColumns = cubeDesc.getAllGlobalDictColumnsNeedBuilt();
+ globalColumnIndex = new int[globalColumns.size()];
+ globalColumnValues = Lists.newArrayListWithExpectedSize(globalColumns.size());
+
+ splitKey = String.valueOf(TaskContext.getPartitionId());
+
+ for (int i = 0; i < globalColumns.size(); i++) {
+ TblColRef colRef = globalColumns.get(i);
+ int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
+ globalColumnIndex[i] = columnIndexOnFlatTbl;
+ globalColumnValues.add(Sets.<String>newHashSet());
+ }
+ }
+ }
+
+ @Override
+ public void call(Iterator<String[]> iter) throws Exception {
+
+ if (initialized == false) {
+ synchronized (CreateShrunkenDictionary.class) {
+ if (initialized == false) {
+ init();
+ initialized = true;
+ }
+ }
+ }
+ int count = 0;
+ while (iter.hasNext()) {
+ count++;
+ String[] rowArray = iter.next();
+ for (int i = 0; i < globalColumnIndex.length; i++) {
+ String fieldValue = rowArray[globalColumnIndex[i]];
+ if (fieldValue == null)
+ continue;
+ globalColumnValues.get(i).add(fieldValue);
+ }
+ }
+
+ FileSystem fs = FileSystem.get(scof.get());
+ Path outputDirBase = new Path(scof.get().get(BatchConstants.ARG_SHRUNKEN_DICT_PATH));
+
+ Map<TblColRef, Dictionary<String>> globalDictionaryMap = cubeSeg
+ .buildGlobalDictionaryMap(globalColumns.size());
+
+ ShrunkenDictionary.StringValueSerializer strValueSerializer = new ShrunkenDictionary.StringValueSerializer();
+
+ for (int i = 0; i < globalColumns.size(); i++) {
+ List<String> colDistinctValues = Lists.newArrayList(globalColumnValues.get(i));
+ if (colDistinctValues.size() == 0) {
+ continue;
+ }
+ // sort values to accelerate the encoding process by reducing the swapping of global dictionary slices
+ Collections.sort(colDistinctValues);
+
+ //only get one col dict
+ ShrunkenDictionaryBuilder<String> dictBuilder = new ShrunkenDictionaryBuilder<>(
+ globalDictionaryMap.get(globalColumns.get(i)));
+
+ for (String colValue : colDistinctValues) {
+ dictBuilder.addValue(colValue);
+ }
+
+ Dictionary<String> shrunkenDict = dictBuilder.build(strValueSerializer);
+
+ Path colDictDir = new Path(outputDirBase, globalColumns.get(i).getIdentity());
+
+ if (!fs.exists(colDictDir)) {
+ fs.mkdirs(colDictDir);
+ }
+ Path shrunkenDictPath = new Path(colDictDir, splitKey);
+ try (DataOutputStream dos = fs.create(shrunkenDictPath)) {
+ logger.info("Write Shrunken dictionary to {} success", shrunkenDictPath);
+ shrunkenDict.write(dos);
+ }
+ }
+
+ }
+ }
+
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index b963252..d146c85 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -18,9 +18,12 @@
package org.apache.kylin.engine.spark;
+import java.io.DataInputStream;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -30,14 +33,17 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.ShrunkenDictionary;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.IMROutput2;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.SourceManager;
import org.apache.kylin.storage.StorageFactory;
import org.apache.spark.SparkContext;
@@ -53,9 +59,13 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SparkUtil {
+ private static final Logger logger = LoggerFactory.getLogger(SparkUtil.class);
+
public static ISparkBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
return (ISparkBatchCubingInputSide)SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchCubingInputSide(flatDesc);
@@ -188,4 +198,34 @@ public class SparkUtil {
});
}
+ public static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeSegment cubeSegment, String splitKey,
+ Configuration configuration) throws IOException {
+ Map<TblColRef, Dictionary<String>> dictionaryMap = cubeSegment.buildDictionaryMap();
+
+ String shrunkenDictPath = configuration.get(BatchConstants.ARG_SHRUNKEN_DICT_PATH);
+ if (shrunkenDictPath == null) {
+ return dictionaryMap;
+ }
+
+ // replace global dictionary with shrunken dictionary if possible
+ FileSystem fs = FileSystem.get(configuration);
+ ShrunkenDictionary.StringValueSerializer valueSerializer = new ShrunkenDictionary.StringValueSerializer();
+ for (TblColRef colRef : cubeSegment.getCubeDesc().getAllGlobalDictColumnsNeedBuilt()) {
+ Path colShrunkenDictDir = new Path(shrunkenDictPath, colRef.getIdentity());
+ Path colShrunkenDictPath = new Path(colShrunkenDictDir, splitKey);
+ if (!fs.exists(colShrunkenDictPath)) {
+ logger.warn("Shrunken dictionary for column " + colRef.getIdentity() + " in split " + splitKey
+ + " does not exist!!!");
+ continue;
+ }
+ try (DataInputStream dis = fs.open(colShrunkenDictPath)) {
+ Dictionary<String> shrunkenDict = new ShrunkenDictionary(valueSerializer);
+ shrunkenDict.readFields(dis);
+ logger.info("Read Shrunken dictionary from {} success", colShrunkenDictPath);
+ dictionaryMap.put(colRef, shrunkenDict);
+ }
+ }
+
+ return dictionaryMap;
+ }
}