You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2020/04/28 07:07:18 UTC

[kylin] branch master updated: KYLIN-3847 Flink cubing step : fact distinct

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b4db4a  KYLIN-3847 Flink cubing step : fact distinct
3b4db4a is described below

commit 3b4db4aacf425fe6e706efbd240116823435348a
Author: harveyyue <yw...@126.com>
AuthorDate: Tue Mar 10 13:47:56 2020 +0800

    KYLIN-3847 Flink cubing step : fact distinct
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  16 +-
 .../org/apache/kylin/common/util/StringUtil.java   |   3 +
 .../engine/flink/FlinkBatchCubingJobBuilder2.java  |  34 +-
 .../engine/flink/FlinkFactDistinctColumns.java     | 357 +++++++++++++
 .../engine/flink/HadoopMultipleOutputFormat.java   | 114 +++++
 .../engine/mr/steps/FactDistinctColumnsBase.java   | 552 +++++++++++++++++++++
 6 files changed, 1069 insertions(+), 7 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 773254d..8eefc02 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1560,15 +1560,15 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     public boolean isSparkFactDistinctEnable() {
-        return Boolean.parseBoolean(getOptional("kylin.engine.spark-fact-distinct", "true"));
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-fact-distinct", FALSE));
     }
 
     public boolean isSparkUHCDictionaryEnable() {
-        return Boolean.parseBoolean(getOptional("kylin.engine.spark-udc-dictionary", "false"));
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-udc-dictionary", FALSE));
     }
 
     public boolean isSparkCardinalityEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.engine.spark-cardinality", "false"));
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-cardinality", FALSE));
     }
 
     public int getSparkOutputMaxSize() {
@@ -1576,15 +1576,19 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     public boolean isSparkDimensionDictionaryEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.engine.spark-dimension-dictionary", "false"));
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-dimension-dictionary", FALSE));
+    }
+
+    public boolean isSparCreateHiveTableViaSparkEnable() {
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-create-table-enabled", FALSE));
     }
 
     public boolean isFlinkSanityCheckEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.flink.sanity-check-enabled", FALSE));
     }
 
-    public boolean isSparCreateHiveTableViaSparkEnable() {
-        return Boolean.parseBoolean(getOptional("kylin.engine.spark-create-table-enabled", "false"));
+    public boolean isFlinkFactDistinctEnable() {
+        return Boolean.parseBoolean(getOptional("kylin.engine.flink-fact-distinct", FALSE));
     }
 
     // ============================================================================
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java
index 5dde9cf..80545dc 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java
@@ -220,4 +220,7 @@ public class StringUtil {
         return a == null ? b == null : a.equals(b);
     }
 
+    public static boolean isEmpty(String str) {
+        return str == null || str.length() == 0;
+    }
 }
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
index 925334c..b2db499 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
@@ -61,7 +61,12 @@ public class FlinkBatchCubingJobBuilder2 extends JobBuilderSupport {
         inputSide.addStepPhase1_CreateFlatTable(result);
 
         // Phase 2: Build Dictionary
-        result.addTask(createFactDistinctColumnsStep(jobId));
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        if (config.isFlinkFactDistinctEnable()) {
+            result.addTask(createFactDistinctColumnsFlinkStep(jobId));
+        } else {
+            result.addTask(createFactDistinctColumnsStep(jobId));
+        }
 
         if (isEnableUHCDictStep()) {
             result.addTask(createBuildUHCDictStep(jobId));
@@ -87,6 +92,33 @@ public class FlinkBatchCubingJobBuilder2 extends JobBuilderSupport {
         return result;
     }
 
+    public FlinkExecutable createFactDistinctColumnsFlinkStep(String jobId) {
+        final FlinkExecutable flinkExecutable = new FlinkExecutable();
+        final IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+        final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId));
+
+        flinkExecutable.setClassName(FlinkFactDistinctColumns.class.getName());
+        flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+        flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+        flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobId));
+        flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
+        flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_INPUT_PATH.getOpt(), tablePath);
+        flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_OUTPUT_PATH.getOpt(), getFactDistinctColumnsPath(jobId));
+        flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_STATS_SAMPLING_PERCENT.getOpt(), String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+
+        flinkExecutable.setJobId(jobId);
+        flinkExecutable.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+        flinkExecutable.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES, getCounterOutputPath(jobId));
+
+        StringBuilder jars = new StringBuilder();
+
+        StringUtil.appendWithSeparator(jars, seg.getConfig().getFlinkAdditionalJars());
+
+        flinkExecutable.setJars(jars.toString());
+
+        return flinkExecutable;
+    }
+
     protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
         final FlinkExecutable flinkExecutable = new FlinkExecutable();
         flinkExecutable.setClassName(FlinkCubingByLayer.class.getName());
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkFactDistinctColumns.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkFactDistinctColumns.java
new file mode 100644
index 0000000..b1787d8
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkFactDistinctColumns.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.flink;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayPrimitiveWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsBase;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducerMapping;
+import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class FlinkFactDistinctColumns extends AbstractApplication {
+    protected static final Logger logger = LoggerFactory.getLogger(FlinkFactDistinctColumns.class);
+
+    public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
+            .withDescription("Cube Segment Id").create("segmentId");
+    public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create("metaUrl");
+    public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
+            .withDescription("Hive Intermediate Table").create("hiveTable");
+    public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+            .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
+    public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+            .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUTPUT)
+            .hasArg().isRequired(true).withDescription("Counter output path").create(BatchConstants.ARG_COUNTER_OUTPUT);
+    public static final Option OPTION_STATS_SAMPLING_PERCENT = OptionBuilder
+            .withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(true)
+            .withDescription("Statistics sampling percent").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
+    public static final Option OPTION_ENABLE_OBJECT_REUSE = OptionBuilder.withArgName("enableObjectReuse").hasArg()
+            .isRequired(false).withDescription("Enable object reuse").create("enableObjectReuse");
+
+    private Options options;
+
+    public FlinkFactDistinctColumns() {
+        options = new Options();
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_META_URL);
+        options.addOption(OPTION_OUTPUT_PATH);
+        options.addOption(OPTION_INPUT_TABLE);
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_STATS_SAMPLING_PERCENT);
+        options.addOption(OPTION_COUNTER_PATH);
+        options.addOption(OPTION_ENABLE_OBJECT_REUSE);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
+        String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+        String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+        String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
+        int samplingPercent = Integer.parseInt(optionsHelper.getOptionValue(OPTION_STATS_SAMPLING_PERCENT));
+        String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
+
+        Job job = Job.getInstance();
+        FileSystem fs = HadoopUtil.getWorkingFileSystem(job.getConfiguration());
+        HadoopUtil.deletePath(job.getConfiguration(), new Path(outputPath));
+
+        final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
+        KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+
+        final FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+        final int totalReducer = reducerMapping.getTotalReducerNum();
+
+        logger.info("getTotalReducerNum: {}", totalReducer);
+        logger.info("getCuboidRowCounterReducerNum: {}", reducerMapping.getCuboidRowCounterReducerNum());
+        logger.info("counter path {}", counterPath);
+
+        boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+
+        // calculate source record bytes size
+        final String bytesWrittenName = "byte-writer-counter";
+        final String recordCounterName = "record-counter";
+
+        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        if (!StringUtil.isEmpty(enableObjectReuseOptValue) &&
+                enableObjectReuseOptValue.equalsIgnoreCase("true")) {
+            env.getConfig().enableObjectReuse();
+        }
+
+        DataSet<String[]> recordDataSet = FlinkUtil.readHiveRecords(isSequenceFile, env, inputPath, hiveTable, job);
+
+        // read record from flat table
+        // output:
+        //   1, statistic
+        //   2, field value of dict col
+        //   3, min/max field value of not dict col
+        DataSet<Tuple2<SelfDefineSortableKey, Text>> flatOutputDataSet = recordDataSet.mapPartition(
+                new FlatOutputMapPartitionFunction(sConf, cubeName, segmentId, metaUrl, samplingPercent,
+                        bytesWrittenName, recordCounterName));
+
+        // repartition data, make each reducer handle only one col data or the statistic data
+        DataSet<Tuple2<SelfDefineSortableKey, Text>> partitionDataSet = flatOutputDataSet
+                .partitionCustom(new FactDistinctColumnPartitioner(cubeName, metaUrl, sConf), 0)
+                .setParallelism(totalReducer);
+
+        // multiple output result
+        // 1, CFG_OUTPUT_COLUMN: field values of dict col, which will not be built in reducer, like globalDictCol
+        // 2, CFG_OUTPUT_DICT: dictionary object built in reducer
+        // 3, CFG_OUTPUT_STATISTICS: cube statistic: hll of cuboids ...
+        // 4, CFG_OUTPUT_PARTITION: dimension value range(min,max)
+        DataSet<Tuple2<String, Tuple3<Writable, Writable, String>>> outputDataSet = partitionDataSet
+                .mapPartition(new MultiOutputMapPartitionFunction(sConf, cubeName, segmentId, metaUrl, samplingPercent))
+                .setParallelism(totalReducer);
+
+        // make each reducer output to respective dir
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class,
+                NullWritable.class, Text.class);
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
+                NullWritable.class, ArrayPrimitiveWritable.class);
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
+                LongWritable.class, BytesWritable.class);
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
+                NullWritable.class, LongWritable.class);
+
+        FileOutputFormat.setOutputPath(job, new Path(outputPath));
+        FileOutputFormat.setCompressOutput(job, false);
+
+        // prevent to create zero-sized default output
+        LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+
+        outputDataSet.output(new HadoopMultipleOutputFormat(new LazyOutputFormat(), job));
+
+        JobExecutionResult jobExecutionResult =
+                env.execute("Fact distinct columns for:" + cubeName + " segment " + segmentId);
+        Map<String, Object> accumulatorResults = jobExecutionResult.getAllAccumulatorResults();
+        Long recordCount = (Long) accumulatorResults.get(recordCounterName);
+        Long bytesWritten = (Long) accumulatorResults.get(bytesWrittenName);
+        logger.info("Map input records={}", recordCount);
+        logger.info("HDFS Read: {} HDFS Write", bytesWritten);
+        logger.info("HDFS: Number of bytes written=" + FlinkBatchCubingJobBuilder2.getFileSize(outputPath, fs));
+
+        Map<String, String> counterMap = Maps.newHashMap();
+        counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, String.valueOf(recordCount));
+        counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, String.valueOf(bytesWritten));
+
+        // save counter to hdfs
+        HadoopUtil.writeToSequenceFile(job.getConfiguration(), counterPath, counterMap);
+    }
+
+
+    static class FlatOutputMapPartitionFunction extends
+            RichMapPartitionFunction<String[], Tuple2<SelfDefineSortableKey, Text>> {
+        private String cubeName;
+        private String segmentId;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private int samplingPercentage;
+        private String bytesWrittenName;
+        private String recordCounterName;
+        private LongCounter bytesWrittenCounter;
+        private LongCounter recordCounter;
+
+        private FactDistinctColumnsBase base;
+
+        public FlatOutputMapPartitionFunction(SerializableConfiguration conf, String cubeName, String segmentId,
+                                              String metaUrl, int samplingPercentage, String bytesWrittenName,
+                                              String recordCounterName) {
+            this.cubeName = cubeName;
+            this.segmentId = segmentId;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+            this.samplingPercentage = samplingPercentage;
+            this.bytesWrittenName = bytesWrittenName;
+            this.recordCounterName = recordCounterName;
+            this.bytesWrittenCounter = new LongCounter();
+            this.recordCounter = new LongCounter();
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            getRuntimeContext().addAccumulator(bytesWrittenName, bytesWrittenCounter);
+            getRuntimeContext().addAccumulator(recordCounterName, recordCounter);
+            base = new FactDistinctColumnsBase(cubeName, segmentId, metaUrl, conf, samplingPercentage);
+            base.setupMap();
+        }
+
+        @Override
+        public void mapPartition(Iterable<String[]> values, Collector<Tuple2<SelfDefineSortableKey, Text>> out) throws Exception {
+            FactDistinctColumnsBase.Visitor visitor = new FactDistinctColumnsBase.Visitor<SelfDefineSortableKey, Text>() {
+                @Override
+                public void collect(String namedOutput, SelfDefineSortableKey key, Text value, String outputPath) {
+                    out.collect(new Tuple2<>(key, value));
+                }
+            };
+
+            for (String[] row : values) {
+                bytesWrittenCounter.add(base.countSizeInBytes(row));
+                recordCounter.add(1L);
+                base.map(row, visitor);
+            }
+
+            base.postMap(visitor);
+        }
+    }
+
+    static class FactDistinctColumnPartitioner implements Partitioner<SelfDefineSortableKey> {
+        private transient volatile boolean initialized = false;
+        private String cubeName;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private transient FactDistinctColumnsReducerMapping reducerMapping;
+
+        public FactDistinctColumnPartitioner(String cubeName, String metaUrl, SerializableConfiguration conf) {
+            this.cubeName = cubeName;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+        }
+
+        private void init() {
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                    .setAndUnsetThreadLocalConfig(kConfig)) {
+                CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+                reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+                initialized = true;
+            }
+        }
+
+        @Override
+        public int partition(SelfDefineSortableKey key, int numPartitions) {
+            if (initialized == false) {
+                synchronized (FlinkFactDistinctColumns.class) {
+                    if (initialized == false) {
+                        init();
+                    }
+                }
+            }
+
+            Text keyText = key.getText();
+            if (keyText.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
+                Long cuboidId = Bytes.toLong(keyText.getBytes(), 1, Bytes.SIZEOF_LONG);
+                return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);
+            } else {
+                return BytesUtil.readUnsigned(keyText.getBytes(), 0, 1);
+            }
+        }
+    }
+
+    static class MultiOutputMapPartitionFunction extends
+            RichMapPartitionFunction<Tuple2<SelfDefineSortableKey, Text>, Tuple2<String, Tuple3<Writable, Writable, String>>> {
+        private String cubeName;
+        private String segmentId;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private int samplingPercentage;
+
+        private FactDistinctColumnsBase base;
+
+        public MultiOutputMapPartitionFunction(SerializableConfiguration conf, String cubeName, String segmentId,
+                                               String metaUrl, int samplingPercentage) {
+            this.cubeName = cubeName;
+            this.segmentId = segmentId;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+            this.samplingPercentage = samplingPercentage;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            int taskId = getRuntimeContext().getIndexOfThisSubtask();
+            base = new FactDistinctColumnsBase(cubeName, segmentId, metaUrl, conf, samplingPercentage);
+            base.setupReduce(taskId);
+        }
+
+        @Override
+        public void mapPartition(Iterable<Tuple2<SelfDefineSortableKey, Text>> values,
+                                 Collector<Tuple2<String, Tuple3<Writable, Writable, String>>> out) throws Exception {
+            FactDistinctColumnsBase.Visitor visitor = new FactDistinctColumnsBase.Visitor<Writable, Writable>() {
+                @Override
+                public void collect(String namedOutput, Writable key, Writable value, String outputPath) {
+                    out.collect(new Tuple2<>(namedOutput, new Tuple3<>(key, value, outputPath)));
+                }
+            };
+
+            for (Tuple2<SelfDefineSortableKey, Text> value : values) {
+                base.reduce(new Pair<>(value.f0, value.f1), visitor);
+            }
+
+            base.postReduce(visitor);
+        }
+    }
+}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/HadoopMultipleOutputFormat.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/HadoopMultipleOutputFormat.java
new file mode 100644
index 0000000..0d25028
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/HadoopMultipleOutputFormat.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.flink;
+
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.DummyReporter;
+import org.apache.hadoop.util.Progress;
+
+import java.io.IOException;
+
+public class HadoopMultipleOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<String, Tuple3<K, V, String>>> {
+
+    private static final long serialVersionUID = 1L;
+
+    protected static final Object OPEN_MULTIPLE_MUTEX = new Object();
+    protected static final Object CLOSE_MULTIPLE_MUTEX = new Object();
+
+    protected MultipleOutputs writer;
+
+    public HadoopMultipleOutputFormat(OutputFormat<K, V> mapreduceOutputFormat, Job job) {
+        super(mapreduceOutputFormat, job);
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) throws IOException {
+        super.open(taskNumber, numTasks);
+
+        synchronized (OPEN_MULTIPLE_MUTEX) {
+            try {
+                TaskInputOutputContext taskInputOutputContext = new ReduceContextImpl(configuration,
+                        context.getTaskAttemptID(), new InputIterator(), new GenericCounter(), new GenericCounter(),
+                        recordWriter, outputCommitter, new DummyReporter(), null,
+                        BytesWritable.class, BytesWritable.class);
+                this.writer = new MultipleOutputs(taskInputOutputContext);
+            } catch (InterruptedException e) {
+                throw new IOException("Could not create MultipleOutputs.", e);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        super.close();
+        synchronized (CLOSE_MULTIPLE_MUTEX) {
+            try {
+                this.writer.close();
+            } catch (InterruptedException e) {
+                throw new IOException("Could not close MultipleOutputs.", e);
+            }
+        }
+    }
+
+    @Override
+    public void writeRecord(Tuple2<String, Tuple3<K, V, String>> record) throws IOException {
+        try {
+            this.writer.write(record.f0, record.f1.f0, record.f1.f1, record.f1.f2);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    static class InputIterator implements RawKeyValueIterator {
+        @Override
+        public DataInputBuffer getKey() throws IOException {
+            return null;
+        }
+
+        @Override
+        public DataInputBuffer getValue() throws IOException {
+            return null;
+        }
+
+        @Override
+        public Progress getProgress() {
+            return null;
+        }
+
+        @Override
+        public boolean next() throws IOException {
+            return false;
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+    }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsBase.java
new file mode 100644
index 0000000..26f0da3
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsBase.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.hadoop.io.ArrayPrimitiveWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinVersion;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IDictionaryBuilder;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.CuboidStatCalculator;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.DictColDeduper;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.hllc.RegisterType;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class FactDistinctColumnsBase {
+    private static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsBase.class);
+
+    private String cubeName;
+    private String segmentId;
+    private int samplingPercentage;
+    private KylinConfig envConfig;
+
+    // map
+    protected CubeInstance cube;
+    protected CubeSegment cubeSeg;
+    protected CubeDesc cubeDesc;
+    protected long baseCuboidId;
+    protected List<TblColRef> allCols;
+    protected CubeJoinedFlatTableEnrich intermediateTableDesc;
+    protected int[] columnIndex;
+    protected FactDistinctColumnsReducerMapping reducerMapping;
+    protected int nRowKey;
+    private Integer[][] allCuboidsBitSet = null;
+    private HLLCounter[] allCuboidsHLL = null;
+    private Long[] cuboidIds;
+    private int rowCount = 0;
+    private DictColDeduper dictColDeduper;
+    private Map<Integer, DimensionRangeInfo> dimensionRangeInfoMap = Maps.newHashMap();
+    private CuboidStatCalculator[] cuboidStatCalculators;
+    private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
+    private ByteBuffer tmpbuf;
+    private Text outputKey;
+    private Text outputValue;
+    private Text emptyText;
+
+    // reduce
+    public static final String DICT_FILE_POSTFIX = ".rldict";
+    public static final String DIMENSION_COL_INFO_FILE_POSTFIX = ".dci";
+    private int taskId;
+    private boolean isStatistics = false;
+    private List<Long> baseCuboidRowCountInMappers;
+    protected Map<Long, HLLCounter> cuboidHLLMap = null;
+    private TblColRef col = null;
+    private long totalRowsBeforeMerge = 0;
+    // local build dict
+    private boolean buildDictInReducer;
+    private IDictionaryBuilder builder;
+    private String maxValue = null;
+    private String minValue = null;
+
+
+    public FactDistinctColumnsBase(String cubeName, String segmentId, String metaUrl, SerializableConfiguration conf,
+                                   int samplingPercentage) {
+        this.cubeName = cubeName;
+        this.segmentId = segmentId;
+        this.samplingPercentage = samplingPercentage;
+        this.envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+    }
+
+    public void setupMap() {
+        outputKey = new Text();
+        outputValue = new Text();
+        emptyText = new Text();
+        try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                .setAndUnsetThreadLocalConfig(envConfig)) {
+            cube = CubeManager.getInstance(envConfig).getCube(cubeName);
+            cubeSeg = cube.getSegmentById(segmentId);
+            cubeDesc = cube.getDescriptor();
+            baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+            reducerMapping = new FactDistinctColumnsReducerMapping(cube);
+            allCols = reducerMapping.getAllDimDictCols();
+
+            intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
+            columnIndex = new int[allCols.size()];
+            for (int i = 0; i < allCols.size(); i++) {
+                TblColRef colRef = allCols.get(i);
+                int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
+                columnIndex[i] = columnIndexOnFlatTbl;
+            }
+
+            tmpbuf = ByteBuffer.allocate(4096);
+            nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+
+            Set<Long> cuboidIdSet = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
+            if (StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(cubeSeg)) {
+                // For cube planner, for every prebuilt cuboid, its related row count stats should be calculated
+                // If the precondition for trigger cube planner phase one is satisfied, we need to calculate row count stats for mandatory cuboids.
+                cuboidIdSet.addAll(cubeSeg.getCubeDesc().getMandatoryCuboids());
+            }
+            cuboidIds = cuboidIdSet.toArray(new Long[cuboidIdSet.size()]);
+            allCuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, nRowKey);
+
+            allCuboidsHLL = new HLLCounter[cuboidIds.length];
+            for (int i = 0; i < cuboidIds.length; i++) {
+                allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE);
+            }
+
+            //for KYLIN-2518 backward compatibility
+            boolean isUsePutRowKeyToHllNewAlgorithm;
+            if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
+                isUsePutRowKeyToHllNewAlgorithm = false;
+                logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion());
+            } else {
+                isUsePutRowKeyToHllNewAlgorithm = true;
+                logger.info(
+                        "Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518",
+                        cubeDesc.getVersion());
+            }
+
+            int calculatorNum = getStatsThreadNum(cuboidIds.length);
+            cuboidStatCalculators = new CuboidStatCalculator[calculatorNum];
+            int splitSize = cuboidIds.length / calculatorNum;
+            if (splitSize <= 0) {
+                splitSize = 1;
+            }
+            for (int i = 0; i < calculatorNum; i++) {
+                HLLCounter[] cuboidsHLLSplit;
+                Integer[][] cuboidsBitSetSplit;
+                Long[] cuboidIdSplit;
+                int start = i * splitSize;
+                if (start >= cuboidIds.length) {
+                    break;
+                }
+                int end = (i + 1) * splitSize;
+                if (i == calculatorNum - 1) {// last split
+                    end = cuboidIds.length;
+                }
+
+                cuboidsHLLSplit = Arrays.copyOfRange(allCuboidsHLL, start, end);
+                cuboidsBitSetSplit = Arrays.copyOfRange(allCuboidsBitSet, start, end);
+                cuboidIdSplit = Arrays.copyOfRange(cuboidIds, start, end);
+                CuboidStatCalculator calculator = new CuboidStatCalculator(i,
+                        intermediateTableDesc.getRowKeyColumnIndexes(), cuboidIdSplit, cuboidsBitSetSplit,
+                        isUsePutRowKeyToHllNewAlgorithm, cuboidsHLLSplit);
+                cuboidStatCalculators[i] = calculator;
+                calculator.start();
+            }
+
+            // setup dict col deduper
+            dictColDeduper = new DictColDeduper();
+            Set<TblColRef> dictCols = cubeDesc.getAllColumnsNeedDictionaryBuilt();
+            for (int i = 0; i < allCols.size(); i++) {
+                if (dictCols.contains(allCols.get(i)))
+                    dictColDeduper.setIsDictCol(i);
+            }
+        }
+    }
+
+    private int getStatsThreadNum(int cuboidNum) {
+        int unitNum = cubeDesc.getConfig().getCuboidNumberPerStatsCalculator();
+        if (unitNum <= 0) {
+            logger.warn("config from getCuboidNumberPerStatsCalculator() " + unitNum + " is should larger than 0");
+            logger.info("Will use single thread for cuboid statistics calculation");
+            return 1;
+        }
+
+        int maxCalculatorNum = cubeDesc.getConfig().getCuboidStatsCalculatorMaxNumber();
+        int calculatorNum = (cuboidNum - 1) / unitNum + 1;
+        if (calculatorNum > maxCalculatorNum) {
+            calculatorNum = maxCalculatorNum;
+        }
+        return calculatorNum;
+    }
+
+    private int countNewSize(int oldSize, int dataSize) {
+        int newSize = oldSize * 2;
+        while (newSize < dataSize) {
+            newSize = newSize * 2;
+        }
+        return newSize;
+    }
+
+    private void writeFieldValue(DataType type, Integer colIndex, String value, Visitor visitor) {
+        int reducerIndex = reducerMapping.getReducerIdForCol(colIndex, value);
+        tmpbuf.clear();
+        byte[] valueBytes = Bytes.toBytes(value);
+        int size = valueBytes.length + 1;
+        if (size >= tmpbuf.capacity()) {
+            tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
+        }
+        tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
+        tmpbuf.put(valueBytes);
+        outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+        sortableKey.init(outputKey, type);
+        visitor.collect(null, sortableKey, emptyText, null);
+        // log a few rows for troubleshooting
+        if (rowCount < 10) {
+            logger.info("Sample output: " + allCols.get(colIndex) + " '" + value + "' => reducer " + reducerIndex);
+        }
+    }
+
+    private void putRowKeyToHLL(String[] row) {
+        for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+            cuboidStatCalculator.putRow(row);
+        }
+    }
+
+    public long countSizeInBytes(String[] row) {
+        int size = 0;
+        for (String s : row) {
+            size += s == null ? 1 : StringUtil.utf8Length(s);
+            size++; // delimiter
+        }
+        return size;
+    }
+
+    public void map(String[] row, Visitor visitor) {
+        for (int i = 0; i < allCols.size(); i++) {
+            int colIndex = columnIndex[i];
+            int rowSize = row.length;
+            String fieldValue = " ";
+            if (colIndex <= rowSize - 1) {
+                fieldValue = row[colIndex];
+            } else {
+                logger.debug("colIndex:" + colIndex + " is more than rowSize: " + rowSize + " -1, so set empty value.");
+            }
+            if (fieldValue == null)
+                continue;
+
+            final DataType type = allCols.get(i).getType();
+
+            //for dic column, de dup before write value; for dim not dic column, hold util doCleanup()
+            if (dictColDeduper.isDictCol(i)) {
+                if (dictColDeduper.add(i, fieldValue)) {
+                    writeFieldValue(type, i, fieldValue, visitor);
+                }
+            } else {
+                DimensionRangeInfo old = dimensionRangeInfoMap.get(i);
+                if (old == null) {
+                    old = new DimensionRangeInfo(fieldValue, fieldValue);
+                    dimensionRangeInfoMap.put(i, old);
+                } else {
+                    old.setMax(type.getOrder().max(old.getMax(), fieldValue));
+                    old.setMin(type.getOrder().min(old.getMin(), fieldValue));
+                }
+            }
+
+            if (rowCount % 100 < samplingPercentage) {
+                putRowKeyToHLL(row);
+            }
+
+            if (rowCount % 100 == 0) {
+                dictColDeduper.resetIfShortOfMem();
+            }
+
+            rowCount++;
+        }
+    }
+
+    public void postMap(Visitor visitor) throws IOException {
+        ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+        // output each cuboid's hll to reducer, key is 0 - cuboidId
+        for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+            cuboidStatCalculator.waitForCompletion();
+        }
+        for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+            Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
+            HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters();
+            HLLCounter hll;
+
+            for (int i = 0; i < cuboidIds.length; i++) {
+                hll = cuboidsHLL[i];
+                tmpbuf.clear();
+                tmpbuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER); // one byte
+                tmpbuf.putLong(cuboidIds[i]);
+                outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+                hllBuf.clear();
+                hll.writeRegisters(hllBuf);
+                outputValue.set(hllBuf.array(), 0, hllBuf.position());
+                sortableKey.init(outputKey, (byte) 0);
+                visitor.collect(null, sortableKey, outputValue, null);
+            }
+        }
+        for (Integer colIndex : dimensionRangeInfoMap.keySet()) {
+            DimensionRangeInfo rangeInfo = dimensionRangeInfoMap.get(colIndex);
+            DataType dataType = allCols.get(colIndex).getType();
+            writeFieldValue(dataType, colIndex, rangeInfo.getMin(), visitor);
+            writeFieldValue(dataType, colIndex, rangeInfo.getMax(), visitor);
+        }
+    }
+
+
+    public void setupReduce(int taskId) throws IOException {
+        this.taskId = taskId;
+        try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                .setAndUnsetThreadLocalConfig(envConfig)) {
+            cube = CubeManager.getInstance(envConfig).getCube(cubeName);
+            cubeDesc = cube.getDescriptor();
+            reducerMapping = new FactDistinctColumnsReducerMapping(cube);
+            logger.info("reducer no " + taskId + ", role play " + reducerMapping.getRolePlayOfReducer(taskId));
+
+            if (reducerMapping.isCuboidRowCounterReducer(taskId)) {
+                // hll
+                isStatistics = true;
+                baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId();
+                baseCuboidRowCountInMappers = Lists.newArrayList();
+                cuboidHLLMap = Maps.newHashMap();
+                logger.info("Reducer " + taskId + " handling stats");
+            } else {
+                // normal col
+                col = reducerMapping.getColForReducer(taskId);
+                Preconditions.checkNotNull(col);
+
+                // local build dict
+                buildDictInReducer = envConfig.isBuildDictInReducerEnabled();
+                if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder
+                    buildDictInReducer = false;
+                }
+                if (reducerMapping.getReducerNumForDimCol(col) > 1) {
+                    buildDictInReducer = false; // only works if this is the only reducer of a dictionary column
+                }
+                if (buildDictInReducer) {
+                    builder = DictionaryGenerator.newDictionaryBuilder(col.getType());
+                    builder.init(null, 0, null);
+                }
+                logger.info("Reducer " + taskId + " handling column " + col + ", buildDictInReducer=" + buildDictInReducer);
+            }
+        }
+    }
+
+    public void reduce(Pair<SelfDefineSortableKey, Text> kv, Visitor visitor) throws IOException {
+        if (isStatistics) {
+            // for hll
+            long cuboidId = Bytes.toLong(kv.getFirst().getText().getBytes(), 1, Bytes.SIZEOF_LONG);
+            HLLCounter hll = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision());
+            ByteBuffer bf = ByteBuffer.wrap(kv.getSecond().getBytes(), 0, kv.getSecond().getLength());
+            hll.readRegisters(bf);
+
+            totalRowsBeforeMerge += hll.getCountEstimate();
+
+            if (cuboidId == baseCuboidId) {
+                baseCuboidRowCountInMappers.add(hll.getCountEstimate());
+            }
+
+            if (cuboidHLLMap.get(cuboidId) != null) {
+                cuboidHLLMap.get(cuboidId).merge(hll);
+            } else {
+                cuboidHLLMap.put(cuboidId, hll);
+            }
+        } else {
+            String value = Bytes.toString(kv.getFirst().getText().getBytes(), 1, kv.getFirst().getText().getLength() - 1);
+            logAFewRows(value);
+            // if dimension col, compute max/min value
+            if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col) &&
+                    col.getType().needCompare()) {
+                if (minValue == null || col.getType().compare(minValue, value) > 0) {
+                    minValue = value;
+                }
+                if (maxValue == null || col.getType().compare(maxValue, value) < 0) {
+                    maxValue = value;
+                }
+            }
+
+            // if dict column
+            if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
+                if (buildDictInReducer) {
+                    builder.addValue(value);
+                } else {
+                    byte[] keyBytes = Bytes.copy(kv.getFirst().getText().getBytes(), 1, kv.getFirst().getText().getLength() - 1);
+                    // output written to baseDir/colName/-r-00000 (etc)
+                    String fileName = col.getIdentity() + "/";
+                    visitor.collect(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
+                }
+            }
+            rowCount++;
+        }
+    }
+
+    public void postReduce(Visitor visitor) throws IOException {
+        if (isStatistics) {
+            //output the hll info;
+            List<Long> allCuboids = Lists.newArrayList();
+            allCuboids.addAll(cuboidHLLMap.keySet());
+            Collections.sort(allCuboids);
+
+            logMapperAndCuboidStatistics(allCuboids); // for human check
+            outputStatistics(allCuboids, visitor);
+        } else {
+            // dimension col
+            if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+                outputDimRangeInfo(visitor);
+            }
+            // dic col
+            if (buildDictInReducer) {
+                try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                        .setAndUnsetThreadLocalConfig(envConfig)) {
+                    Dictionary<String> dict = builder.build();
+                    outputDict(col, dict, visitor);
+                }
+            }
+        }
+    }
+
+    private void logAFewRows(String value) {
+        if (rowCount < 10) {
+            logger.info("Received value: " + value);
+        }
+    }
+
+    private void outputDimRangeInfo(Visitor visitor) throws IOException {
+        if (col != null && minValue != null) {
+            // output written to baseDir/colName/colName.dci-r-00000 (etc)
+            String dimRangeFileName = col.getIdentity() + "/" + col.getName() + DIMENSION_COL_INFO_FILE_POSTFIX;
+
+            visitor.collect(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(),
+                    new Text(minValue.getBytes(StandardCharsets.UTF_8)), dimRangeFileName);
+            visitor.collect(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(),
+                    new Text(maxValue.getBytes(StandardCharsets.UTF_8)), dimRangeFileName);
+            logger.info("write dimension range info for col : " + col.getName() + "  minValue:" + minValue
+                    + " maxValue:" + maxValue);
+        }
+    }
+
+    private void outputDict(TblColRef col, Dictionary<String> dict, Visitor visitor) throws IOException {
+        // output written to baseDir/colName/colName.rldict-r-00000 (etc)
+        String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
+
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             DataOutputStream outputStream = new DataOutputStream(baos)) {
+            outputStream.writeUTF(dict.getClass().getName());
+            dict.write(outputStream);
+
+            visitor.collect(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(),
+                    new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName);
+        }
+    }
+
+    private void outputStatistics(List<Long> allCuboids, Visitor visitor) throws IOException {
+        // output written to baseDir/statistics/statistics-r-00000 (etc)
+        String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_OUTPUT_STATISTICS;
+
+        ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+
+        // mapper overlap ratio at key -1
+        long grandTotal = 0;
+        for (HLLCounter hll : cuboidHLLMap.values()) {
+            grandTotal += hll.getCountEstimate();
+        }
+        double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
+        visitor.collect(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1),
+                new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName);
+
+        // mapper number at key -2
+        visitor.collect(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2),
+                new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName);
+
+        // sampling percentage at key 0
+        visitor.collect(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L),
+                new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName);
+
+        for (long i : allCuboids) {
+            valueBuf.clear();
+            cuboidHLLMap.get(i).writeRegisters(valueBuf);
+            valueBuf.flip();
+            visitor.collect(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(i),
+                    new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName);
+        }
+    }
+
+    private void logMapperAndCuboidStatistics(List<Long> allCuboids) throws IOException {
+        logger.info("Cuboid number for task: " + taskId + "\t" + allCuboids.size());
+        logger.info("Samping percentage: \t" + samplingPercentage);
+        logger.info("The following statistics are collected based on sampling data.");
+        logger.info("Number of Mappers: " + baseCuboidRowCountInMappers.size());
+
+        for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
+            if (baseCuboidRowCountInMappers.get(i) > 0) {
+                logger.info("Base Cuboid in Mapper " + i + " row count: \t " + baseCuboidRowCountInMappers.get(i));
+            }
+        }
+
+        long grantTotal = 0;
+        for (long i : allCuboids) {
+            grantTotal += cuboidHLLMap.get(i).getCountEstimate();
+            logger.info("Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate());
+        }
+
+        logger.info("Sum of row counts (before merge) is: \t " + totalRowsBeforeMerge);
+        logger.info("After merge, the row count: \t " + grantTotal);
+    }
+
+    public abstract static class Visitor<K, V> {
+        public abstract void collect(String namedOutput, K key, V value, String outputPath);
+    }
+}