You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/03/24 12:13:55 UTC

[kylin] branch master updated: [KYLIN-4940] Implement the step of "Extract Dictionary from Global Dictionary" for spark cubing engine

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5be135c  [KYLIN-4940] Implement the step of "Extract Dictionary from Global Dictionary" for spark cubing engine
5be135c is described below

commit 5be135c61c80b9241ac7632dec41cc7209b9d23a
Author: yangjiang <ya...@ebay.com>
AuthorDate: Tue Jan 19 11:34:35 2021 +0800

    [KYLIN-4940] Implement the step of "Extract Dictionary from Global Dictionary" for spark cubing engine
---
 .../engine/spark/SparkBatchCubingJobBuilder2.java  |   4 +
 .../kylin/engine/spark/SparkCubingByLayer.java     | 174 ++++++++++++++++++++-
 .../org/apache/kylin/engine/spark/SparkUtil.java   |  40 +++++
 3 files changed, 211 insertions(+), 7 deletions(-)

diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 7d6a367..4b43318 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -201,6 +201,10 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(),
                 getSegmentMetadataUrl(seg.getConfig(), jobId));
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath);
+        if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) {
+            sparkExecutable.setParam(SparkCubingByLayer.OPTION_SHRUNK_INPUT_PATH.getOpt(),
+                    getShrunkenDictionaryPath(jobId));
+        }
         sparkExecutable.setJobId(jobId);
 
         StringBuilder jars = new StringBuilder();
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index cbedc8b..09c68c9 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,21 +17,28 @@
 */
 package org.apache.kylin.engine.spark;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.cube.CubeDescManager;
@@ -44,6 +51,8 @@ import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
 import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.dict.ShrunkenDictionary;
+import org.apache.kylin.dict.ShrunkenDictionaryBuilder;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
 import org.apache.kylin.engine.mr.IMROutput2;
@@ -59,13 +68,19 @@ import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
 import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.storage.StorageLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -91,6 +106,9 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
             .withDescription("Hive Intermediate Table").create("hiveTable");
     public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
             .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
+    public static final Option OPTION_SHRUNK_INPUT_PATH = OptionBuilder
+            .withArgName(BatchConstants.ARG_SHRUNKEN_DICT_PATH).hasArg().isRequired(false)
+            .withDescription("shrunken Dictionary Path").create(BatchConstants.ARG_SHRUNKEN_DICT_PATH);
 
     private Options options;
 
@@ -102,6 +120,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         options.addOption(OPTION_SEGMENT_ID);
         options.addOption(OPTION_META_URL);
         options.addOption(OPTION_OUTPUT_PATH);
+        options.addOption(OPTION_SHRUNK_INPUT_PATH);
     }
 
     @Override
@@ -117,6 +136,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
         String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
         String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+        String shrunkInputPath = optionsHelper.getOptionValue(OPTION_SHRUNK_INPUT_PATH);
+        logger.info("shrunkInputPath is {}", shrunkInputPath);
 
         Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1") };
 
@@ -130,7 +151,12 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         JavaSparkContext sc = new JavaSparkContext(conf);
         sc.sc().addSparkListener(jobListener);
         HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
-        SparkUtil.modifySparkHadoopConfiguration(sc.sc(), AbstractHadoopJob.loadKylinConfigFromHdfs(new SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set dfs.replication and enable compress
+        SparkUtil.modifySparkHadoopConfiguration(sc.sc(), AbstractHadoopJob
+                .loadKylinConfigFromHdfs(new SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set dfs.replication and enable compress
+
+        if (shrunkInputPath != null)
+            sc.hadoopConfiguration().set(BatchConstants.ARG_SHRUNKEN_DICT_PATH, shrunkInputPath);
+
         final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
         KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
 
@@ -164,9 +190,21 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
 
         boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
 
-        final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = SparkUtil
-                .hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable)
-                .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
+        final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD;
+        logger.info("isShrunkenDictFromGlobalEnabled  {}  shrunkInputPath is {}",
+                cubeDesc.isShrunkenDictFromGlobalEnabled(), shrunkInputPath);
+
+        JavaRDD<String[]> recordInputRDD = null;
+
+        if (cubeDesc.isShrunkenDictFromGlobalEnabled() && shrunkInputPath != null) {
+            recordInputRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable).cache();
+            recordInputRDD
+                    .foreachPartition(new CreateShrunkenDictionary(cubeName, cubeDesc, cubeSegment, envConfig, sConf));
+            encodedBaseRDD = recordInputRDD.mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
+        } else {
+            encodedBaseRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable)
+                    .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
+        }
 
         Long totalCount = 0L;
         if (envConfig.isSparkSanityCheckEnabled()) {
@@ -190,6 +228,11 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
 
         saveToHDFS(allRDDs[0], metaUrl, cubeName, cubeSegment, outputPath, 0, job, envConfig);
 
+        // use ShrunkenDictionary should unpersist recordInputRDD
+        if (recordInputRDD != null) {
+            recordInputRDD.unpersist();
+        }
+
         PairFlatMapFunction flatMapFunction = new CuboidFlatMap(cubeName, segmentId, metaUrl, sConf);
         // aggregate to ND cuboids
         for (level = 1; level <= totalLevels; level++) {
@@ -286,9 +329,16 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
                                     EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
                             long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
                             Cuboid baseCuboid = Cuboid.findForMandatory(cubeDesc, baseCuboidId);
-                            baseCuboidBuilder = new BaseCuboidBuilder(kConfig, cubeDesc, cubeSegment, interDesc,
-                                    AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid),
-                                    MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap());
+                            String splitKey = String.valueOf(TaskContext.getPartitionId());
+                            try {
+                                baseCuboidBuilder = new BaseCuboidBuilder(kConfig, cubeDesc, cubeSegment, interDesc,
+                                        AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid),
+                                        MeasureIngester.create(cubeDesc.getMeasures()),
+                                        SparkUtil.getDictionaryMap(cubeSegment, splitKey, conf.get()));
+                            } catch (IOException e) {
+                                logger.error("Fail to get shrunk dict");
+                                e.printStackTrace();
+                            }
                             initialized = true;
                         }
                     }
@@ -466,4 +516,114 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         return count;
     }
 
+    public static class CreateShrunkenDictionary implements VoidFunction<Iterator<String[]>> {
+        private String cubeName;
+        private CubeDesc cubeDesc;
+        private CubeSegment cubeSeg;
+
+        private KylinConfig kylinConfig;
+        private SerializableConfiguration scof;
+        private CubeJoinedFlatTableEnrich intermediateTableDesc;
+
+        private List<TblColRef> globalColumns;
+        private int[] globalColumnIndex;
+        private List<Set<String>> globalColumnValues;
+
+        private volatile transient boolean initialized = false;
+
+        private String splitKey;
+
+        public CreateShrunkenDictionary(String cubeName, CubeDesc cubeDesc, CubeSegment cubeSegment, KylinConfig kylinConfig,
+                                        SerializableConfiguration serializableConfiguration) {
+            this.cubeName = cubeName;
+            this.cubeDesc = cubeDesc;
+            this.cubeSeg = cubeSegment;
+            this.kylinConfig = kylinConfig;
+            this.scof = serializableConfiguration;
+            this.intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg),
+                    cubeDesc);
+        }
+
+        public void init() {
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                    .setAndUnsetThreadLocalConfig(kylinConfig)) {
+                globalColumns = cubeDesc.getAllGlobalDictColumnsNeedBuilt();
+                globalColumnIndex = new int[globalColumns.size()];
+                globalColumnValues = Lists.newArrayListWithExpectedSize(globalColumns.size());
+
+                splitKey = String.valueOf(TaskContext.getPartitionId());
+
+                for (int i = 0; i < globalColumns.size(); i++) {
+                    TblColRef colRef = globalColumns.get(i);
+                    int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
+                    globalColumnIndex[i] = columnIndexOnFlatTbl;
+                    globalColumnValues.add(Sets.<String>newHashSet());
+                }
+            }
+        }
+
+        @Override
+        public void call(Iterator<String[]> iter) throws Exception {
+
+            if (initialized == false) {
+                synchronized (CreateShrunkenDictionary.class) {
+                    if (initialized == false) {
+                        init();
+                        initialized = true;
+                    }
+                }
+            }
+            int count = 0;
+            while (iter.hasNext()) {
+                count++;
+                String[] rowArray = iter.next();
+                for (int i = 0; i < globalColumnIndex.length; i++) {
+                    String fieldValue = rowArray[globalColumnIndex[i]];
+                    if (fieldValue == null)
+                        continue;
+                    globalColumnValues.get(i).add(fieldValue);
+                }
+            }
+
+            FileSystem fs = FileSystem.get(scof.get());
+            Path outputDirBase = new Path(scof.get().get(BatchConstants.ARG_SHRUNKEN_DICT_PATH));
+
+            Map<TblColRef, Dictionary<String>> globalDictionaryMap = cubeSeg
+                    .buildGlobalDictionaryMap(globalColumns.size());
+
+            ShrunkenDictionary.StringValueSerializer strValueSerializer = new ShrunkenDictionary.StringValueSerializer();
+
+            for (int i = 0; i < globalColumns.size(); i++) {
+                List<String> colDistinctValues = Lists.newArrayList(globalColumnValues.get(i));
+                if (colDistinctValues.size() == 0) {
+                    continue;
+                }
+                // sort values to accelerate the encoding process by reducing the swapping of global dictionary slices
+                Collections.sort(colDistinctValues);
+
+                //only get one col dict
+                ShrunkenDictionaryBuilder<String> dictBuilder = new ShrunkenDictionaryBuilder<>(
+                        globalDictionaryMap.get(globalColumns.get(i)));
+
+                for (String colValue : colDistinctValues) {
+                    dictBuilder.addValue(colValue);
+                }
+
+                Dictionary<String> shrunkenDict = dictBuilder.build(strValueSerializer);
+
+                Path colDictDir = new Path(outputDirBase, globalColumns.get(i).getIdentity());
+
+                if (!fs.exists(colDictDir)) {
+                    fs.mkdirs(colDictDir);
+                }
+                Path shrunkenDictPath = new Path(colDictDir, splitKey);
+                try (DataOutputStream dos = fs.create(shrunkenDictPath)) {
+                    logger.info("Write Shrunken dictionary to {} success", shrunkenDictPath);
+                    shrunkenDict.write(dos);
+                }
+            }
+
+        }
+    }
+
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index b963252..d146c85 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -18,9 +18,12 @@
 
 package org.apache.kylin.engine.spark;
 
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -30,14 +33,17 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.ShrunkenDictionary;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.IMROutput2;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.storage.StorageFactory;
 import org.apache.spark.SparkContext;
@@ -53,9 +59,13 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.hive.HiveUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SparkUtil {
 
+    private static final Logger logger = LoggerFactory.getLogger(SparkUtil.class);
+
     public static ISparkBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
         IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
         return (ISparkBatchCubingInputSide)SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchCubingInputSide(flatDesc);
@@ -188,4 +198,34 @@ public class SparkUtil {
         });
     }
 
+    public static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeSegment cubeSegment, String splitKey,
+                                                                      Configuration configuration) throws IOException {
+        Map<TblColRef, Dictionary<String>> dictionaryMap = cubeSegment.buildDictionaryMap();
+
+        String shrunkenDictPath = configuration.get(BatchConstants.ARG_SHRUNKEN_DICT_PATH);
+        if (shrunkenDictPath == null) {
+            return dictionaryMap;
+        }
+
+        // replace global dictionary with shrunken dictionary if possible
+        FileSystem fs = FileSystem.get(configuration);
+        ShrunkenDictionary.StringValueSerializer valueSerializer = new ShrunkenDictionary.StringValueSerializer();
+        for (TblColRef colRef : cubeSegment.getCubeDesc().getAllGlobalDictColumnsNeedBuilt()) {
+            Path colShrunkenDictDir = new Path(shrunkenDictPath, colRef.getIdentity());
+            Path colShrunkenDictPath = new Path(colShrunkenDictDir, splitKey);
+            if (!fs.exists(colShrunkenDictPath)) {
+                logger.warn("Shrunken dictionary for column " + colRef.getIdentity() + " in split " + splitKey
+                        + " does not exist!!!");
+                continue;
+            }
+            try (DataInputStream dis = fs.open(colShrunkenDictPath)) {
+                Dictionary<String> shrunkenDict = new ShrunkenDictionary(valueSerializer);
+                shrunkenDict.readFields(dis);
+                logger.info("Read Shrunken dictionary from {} success", colShrunkenDictPath);
+                dictionaryMap.put(colRef, shrunkenDict);
+            }
+        }
+
+        return dictionaryMap;
+    }
 }