You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2020/04/28 07:11:31 UTC

[kylin] branch master created (now 3b4db4a)

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git.


      at 3b4db4a  KYLIN-3847 Flink cubing step : fact distinct

This branch includes the following new commits:

     new 3b4db4a  KYLIN-3847 Flink cubing step : fact distinct

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[kylin] 01/01: KYLIN-3847 Flink cubing step : fact distinct

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 3b4db4aacf425fe6e706efbd240116823435348a
Author: harveyyue <yw...@126.com>
AuthorDate: Tue Mar 10 13:47:56 2020 +0800

    KYLIN-3847 Flink cubing step : fact distinct
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  16 +-
 .../org/apache/kylin/common/util/StringUtil.java   |   3 +
 .../engine/flink/FlinkBatchCubingJobBuilder2.java  |  34 +-
 .../engine/flink/FlinkFactDistinctColumns.java     | 357 +++++++++++++
 .../engine/flink/HadoopMultipleOutputFormat.java   | 114 +++++
 .../engine/mr/steps/FactDistinctColumnsBase.java   | 552 +++++++++++++++++++++
 6 files changed, 1069 insertions(+), 7 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 773254d..8eefc02 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1560,15 +1560,15 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     public boolean isSparkFactDistinctEnable() {
-        return Boolean.parseBoolean(getOptional("kylin.engine.spark-fact-distinct", "true"));
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-fact-distinct", FALSE));
     }
 
     public boolean isSparkUHCDictionaryEnable() {
-        return Boolean.parseBoolean(getOptional("kylin.engine.spark-udc-dictionary", "false"));
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-udc-dictionary", FALSE));
     }
 
     public boolean isSparkCardinalityEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.engine.spark-cardinality", "false"));
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-cardinality", FALSE));
     }
 
     public int getSparkOutputMaxSize() {
@@ -1576,15 +1576,19 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     public boolean isSparkDimensionDictionaryEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.engine.spark-dimension-dictionary", "false"));
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-dimension-dictionary", FALSE));
+    }
+
+    public boolean isSparCreateHiveTableViaSparkEnable() {
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-create-table-enabled", FALSE));
     }
 
     public boolean isFlinkSanityCheckEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.flink.sanity-check-enabled", FALSE));
     }
 
-    public boolean isSparCreateHiveTableViaSparkEnable() {
-        return Boolean.parseBoolean(getOptional("kylin.engine.spark-create-table-enabled", "false"));
+    public boolean isFlinkFactDistinctEnable() {
+        return Boolean.parseBoolean(getOptional("kylin.engine.flink-fact-distinct", FALSE));
     }
 
     // ============================================================================
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java
index 5dde9cf..80545dc 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java
@@ -220,4 +220,7 @@ public class StringUtil {
         return a == null ? b == null : a.equals(b);
     }
 
+    public static boolean isEmpty(String str) {
+        return str == null || str.length() == 0;
+    }
 }
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
index 925334c..b2db499 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
@@ -61,7 +61,12 @@ public class FlinkBatchCubingJobBuilder2 extends JobBuilderSupport {
         inputSide.addStepPhase1_CreateFlatTable(result);
 
         // Phase 2: Build Dictionary
-        result.addTask(createFactDistinctColumnsStep(jobId));
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        if (config.isFlinkFactDistinctEnable()) {
+            result.addTask(createFactDistinctColumnsFlinkStep(jobId));
+        } else {
+            result.addTask(createFactDistinctColumnsStep(jobId));
+        }
 
         if (isEnableUHCDictStep()) {
             result.addTask(createBuildUHCDictStep(jobId));
@@ -87,6 +92,33 @@ public class FlinkBatchCubingJobBuilder2 extends JobBuilderSupport {
         return result;
     }
 
+    public FlinkExecutable createFactDistinctColumnsFlinkStep(String jobId) {
+        final FlinkExecutable flinkExecutable = new FlinkExecutable();
+        final IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+        final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId));
+
+        flinkExecutable.setClassName(FlinkFactDistinctColumns.class.getName());
+        flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+        flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+        flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobId));
+        flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
+        flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_INPUT_PATH.getOpt(), tablePath);
+        flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_OUTPUT_PATH.getOpt(), getFactDistinctColumnsPath(jobId));
+        flinkExecutable.setParam(FlinkFactDistinctColumns.OPTION_STATS_SAMPLING_PERCENT.getOpt(), String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+
+        flinkExecutable.setJobId(jobId);
+        flinkExecutable.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+        flinkExecutable.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES, getCounterOutputPath(jobId));
+
+        StringBuilder jars = new StringBuilder();
+
+        StringUtil.appendWithSeparator(jars, seg.getConfig().getFlinkAdditionalJars());
+
+        flinkExecutable.setJars(jars.toString());
+
+        return flinkExecutable;
+    }
+
     protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
         final FlinkExecutable flinkExecutable = new FlinkExecutable();
         flinkExecutable.setClassName(FlinkCubingByLayer.class.getName());
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkFactDistinctColumns.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkFactDistinctColumns.java
new file mode 100644
index 0000000..b1787d8
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkFactDistinctColumns.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.flink;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayPrimitiveWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsBase;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducerMapping;
+import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class FlinkFactDistinctColumns extends AbstractApplication {
+    protected static final Logger logger = LoggerFactory.getLogger(FlinkFactDistinctColumns.class);
+
+    public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
+            .withDescription("Cube Segment Id").create("segmentId");
+    public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create("metaUrl");
+    public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
+            .withDescription("Hive Intermediate Table").create("hiveTable");
+    public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+            .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
+    public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+            .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUTPUT)
+            .hasArg().isRequired(true).withDescription("Counter output path").create(BatchConstants.ARG_COUNTER_OUTPUT);
+    public static final Option OPTION_STATS_SAMPLING_PERCENT = OptionBuilder
+            .withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(true)
+            .withDescription("Statistics sampling percent").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
+    public static final Option OPTION_ENABLE_OBJECT_REUSE = OptionBuilder.withArgName("enableObjectReuse").hasArg()
+            .isRequired(false).withDescription("Enable object reuse").create("enableObjectReuse");
+
+    private Options options;
+
+    public FlinkFactDistinctColumns() {
+        options = new Options();
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_META_URL);
+        options.addOption(OPTION_OUTPUT_PATH);
+        options.addOption(OPTION_INPUT_TABLE);
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_STATS_SAMPLING_PERCENT);
+        options.addOption(OPTION_COUNTER_PATH);
+        options.addOption(OPTION_ENABLE_OBJECT_REUSE);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
+        String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+        String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+        String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
+        int samplingPercent = Integer.parseInt(optionsHelper.getOptionValue(OPTION_STATS_SAMPLING_PERCENT));
+        String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
+
+        Job job = Job.getInstance();
+        FileSystem fs = HadoopUtil.getWorkingFileSystem(job.getConfiguration());
+        HadoopUtil.deletePath(job.getConfiguration(), new Path(outputPath));
+
+        final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
+        KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+
+        final FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+        final int totalReducer = reducerMapping.getTotalReducerNum();
+
+        logger.info("getTotalReducerNum: {}", totalReducer);
+        logger.info("getCuboidRowCounterReducerNum: {}", reducerMapping.getCuboidRowCounterReducerNum());
+        logger.info("counter path {}", counterPath);
+
+        boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+
+        // calculate source record bytes size
+        final String bytesWrittenName = "byte-writer-counter";
+        final String recordCounterName = "record-counter";
+
+        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        if (!StringUtil.isEmpty(enableObjectReuseOptValue) &&
+                enableObjectReuseOptValue.equalsIgnoreCase("true")) {
+            env.getConfig().enableObjectReuse();
+        }
+
+        DataSet<String[]> recordDataSet = FlinkUtil.readHiveRecords(isSequenceFile, env, inputPath, hiveTable, job);
+
+        // read record from flat table
+        // output:
+        //   1, statistic
+        //   2, field value of dict col
+        //   3, min/max field value of not dict col
+        DataSet<Tuple2<SelfDefineSortableKey, Text>> flatOutputDataSet = recordDataSet.mapPartition(
+                new FlatOutputMapPartitionFunction(sConf, cubeName, segmentId, metaUrl, samplingPercent,
+                        bytesWrittenName, recordCounterName));
+
+        // repartition data, make each reducer handle only one col data or the statistic data
+        DataSet<Tuple2<SelfDefineSortableKey, Text>> partitionDataSet = flatOutputDataSet
+                .partitionCustom(new FactDistinctColumnPartitioner(cubeName, metaUrl, sConf), 0)
+                .setParallelism(totalReducer);
+
+        // multiple output result
+        // 1, CFG_OUTPUT_COLUMN: field values of dict col, which will not be built in reducer, like globalDictCol
+        // 2, CFG_OUTPUT_DICT: dictionary object built in reducer
+        // 3, CFG_OUTPUT_STATISTICS: cube statistic: hll of cuboids ...
+        // 4, CFG_OUTPUT_PARTITION: dimension value range(min,max)
+        DataSet<Tuple2<String, Tuple3<Writable, Writable, String>>> outputDataSet = partitionDataSet
+                .mapPartition(new MultiOutputMapPartitionFunction(sConf, cubeName, segmentId, metaUrl, samplingPercent))
+                .setParallelism(totalReducer);
+
+        // make each reducer output to respective dir
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class,
+                NullWritable.class, Text.class);
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
+                NullWritable.class, ArrayPrimitiveWritable.class);
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
+                LongWritable.class, BytesWritable.class);
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
+                NullWritable.class, LongWritable.class);
+
+        FileOutputFormat.setOutputPath(job, new Path(outputPath));
+        FileOutputFormat.setCompressOutput(job, false);
+
+        // prevent to create zero-sized default output
+        LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+
+        outputDataSet.output(new HadoopMultipleOutputFormat(new LazyOutputFormat(), job));
+
+        JobExecutionResult jobExecutionResult =
+                env.execute("Fact distinct columns for:" + cubeName + " segment " + segmentId);
+        Map<String, Object> accumulatorResults = jobExecutionResult.getAllAccumulatorResults();
+        Long recordCount = (Long) accumulatorResults.get(recordCounterName);
+        Long bytesWritten = (Long) accumulatorResults.get(bytesWrittenName);
+        logger.info("Map input records={}", recordCount);
+        logger.info("HDFS Read: {} HDFS Write", bytesWritten);
+        logger.info("HDFS: Number of bytes written=" + FlinkBatchCubingJobBuilder2.getFileSize(outputPath, fs));
+
+        Map<String, String> counterMap = Maps.newHashMap();
+        counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, String.valueOf(recordCount));
+        counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, String.valueOf(bytesWritten));
+
+        // save counter to hdfs
+        HadoopUtil.writeToSequenceFile(job.getConfiguration(), counterPath, counterMap);
+    }
+
+
+    static class FlatOutputMapPartitionFunction extends
+            RichMapPartitionFunction<String[], Tuple2<SelfDefineSortableKey, Text>> {
+        private String cubeName;
+        private String segmentId;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private int samplingPercentage;
+        private String bytesWrittenName;
+        private String recordCounterName;
+        private LongCounter bytesWrittenCounter;
+        private LongCounter recordCounter;
+
+        private FactDistinctColumnsBase base;
+
+        public FlatOutputMapPartitionFunction(SerializableConfiguration conf, String cubeName, String segmentId,
+                                              String metaUrl, int samplingPercentage, String bytesWrittenName,
+                                              String recordCounterName) {
+            this.cubeName = cubeName;
+            this.segmentId = segmentId;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+            this.samplingPercentage = samplingPercentage;
+            this.bytesWrittenName = bytesWrittenName;
+            this.recordCounterName = recordCounterName;
+            this.bytesWrittenCounter = new LongCounter();
+            this.recordCounter = new LongCounter();
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            getRuntimeContext().addAccumulator(bytesWrittenName, bytesWrittenCounter);
+            getRuntimeContext().addAccumulator(recordCounterName, recordCounter);
+            base = new FactDistinctColumnsBase(cubeName, segmentId, metaUrl, conf, samplingPercentage);
+            base.setupMap();
+        }
+
+        @Override
+        public void mapPartition(Iterable<String[]> values, Collector<Tuple2<SelfDefineSortableKey, Text>> out) throws Exception {
+            FactDistinctColumnsBase.Visitor visitor = new FactDistinctColumnsBase.Visitor<SelfDefineSortableKey, Text>() {
+                @Override
+                public void collect(String namedOutput, SelfDefineSortableKey key, Text value, String outputPath) {
+                    out.collect(new Tuple2<>(key, value));
+                }
+            };
+
+            for (String[] row : values) {
+                bytesWrittenCounter.add(base.countSizeInBytes(row));
+                recordCounter.add(1L);
+                base.map(row, visitor);
+            }
+
+            base.postMap(visitor);
+        }
+    }
+
+    static class FactDistinctColumnPartitioner implements Partitioner<SelfDefineSortableKey> {
+        private transient volatile boolean initialized = false;
+        private String cubeName;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private transient FactDistinctColumnsReducerMapping reducerMapping;
+
+        public FactDistinctColumnPartitioner(String cubeName, String metaUrl, SerializableConfiguration conf) {
+            this.cubeName = cubeName;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+        }
+
+        private void init() {
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                    .setAndUnsetThreadLocalConfig(kConfig)) {
+                CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+                reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+                initialized = true;
+            }
+        }
+
+        @Override
+        public int partition(SelfDefineSortableKey key, int numPartitions) {
+            if (initialized == false) {
+                synchronized (FlinkFactDistinctColumns.class) {
+                    if (initialized == false) {
+                        init();
+                    }
+                }
+            }
+
+            Text keyText = key.getText();
+            if (keyText.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
+                Long cuboidId = Bytes.toLong(keyText.getBytes(), 1, Bytes.SIZEOF_LONG);
+                return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);
+            } else {
+                return BytesUtil.readUnsigned(keyText.getBytes(), 0, 1);
+            }
+        }
+    }
+
+    static class MultiOutputMapPartitionFunction extends
+            RichMapPartitionFunction<Tuple2<SelfDefineSortableKey, Text>, Tuple2<String, Tuple3<Writable, Writable, String>>> {
+        private String cubeName;
+        private String segmentId;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private int samplingPercentage;
+
+        private FactDistinctColumnsBase base;
+
+        public MultiOutputMapPartitionFunction(SerializableConfiguration conf, String cubeName, String segmentId,
+                                               String metaUrl, int samplingPercentage) {
+            this.cubeName = cubeName;
+            this.segmentId = segmentId;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+            this.samplingPercentage = samplingPercentage;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            int taskId = getRuntimeContext().getIndexOfThisSubtask();
+            base = new FactDistinctColumnsBase(cubeName, segmentId, metaUrl, conf, samplingPercentage);
+            base.setupReduce(taskId);
+        }
+
+        @Override
+        public void mapPartition(Iterable<Tuple2<SelfDefineSortableKey, Text>> values,
+                                 Collector<Tuple2<String, Tuple3<Writable, Writable, String>>> out) throws Exception {
+            FactDistinctColumnsBase.Visitor visitor = new FactDistinctColumnsBase.Visitor<Writable, Writable>() {
+                @Override
+                public void collect(String namedOutput, Writable key, Writable value, String outputPath) {
+                    out.collect(new Tuple2<>(namedOutput, new Tuple3<>(key, value, outputPath)));
+                }
+            };
+
+            for (Tuple2<SelfDefineSortableKey, Text> value : values) {
+                base.reduce(new Pair<>(value.f0, value.f1), visitor);
+            }
+
+            base.postReduce(visitor);
+        }
+    }
+}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/HadoopMultipleOutputFormat.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/HadoopMultipleOutputFormat.java
new file mode 100644
index 0000000..0d25028
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/HadoopMultipleOutputFormat.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.flink;
+
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.DummyReporter;
+import org.apache.hadoop.util.Progress;
+
+import java.io.IOException;
+
+public class HadoopMultipleOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<String, Tuple3<K, V, String>>> {
+
+    private static final long serialVersionUID = 1L;
+
+    protected static final Object OPEN_MULTIPLE_MUTEX = new Object();
+    protected static final Object CLOSE_MULTIPLE_MUTEX = new Object();
+
+    protected MultipleOutputs writer;
+
+    public HadoopMultipleOutputFormat(OutputFormat<K, V> mapreduceOutputFormat, Job job) {
+        super(mapreduceOutputFormat, job);
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) throws IOException {
+        super.open(taskNumber, numTasks);
+
+        synchronized (OPEN_MULTIPLE_MUTEX) {
+            try {
+                TaskInputOutputContext taskInputOutputContext = new ReduceContextImpl(configuration,
+                        context.getTaskAttemptID(), new InputIterator(), new GenericCounter(), new GenericCounter(),
+                        recordWriter, outputCommitter, new DummyReporter(), null,
+                        BytesWritable.class, BytesWritable.class);
+                this.writer = new MultipleOutputs(taskInputOutputContext);
+            } catch (InterruptedException e) {
+                throw new IOException("Could not create MultipleOutputs.", e);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        super.close();
+        synchronized (CLOSE_MULTIPLE_MUTEX) {
+            try {
+                this.writer.close();
+            } catch (InterruptedException e) {
+                throw new IOException("Could not close MultipleOutputs.", e);
+            }
+        }
+    }
+
+    @Override
+    public void writeRecord(Tuple2<String, Tuple3<K, V, String>> record) throws IOException {
+        try {
+            this.writer.write(record.f0, record.f1.f0, record.f1.f1, record.f1.f2);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    static class InputIterator implements RawKeyValueIterator {
+        @Override
+        public DataInputBuffer getKey() throws IOException {
+            return null;
+        }
+
+        @Override
+        public DataInputBuffer getValue() throws IOException {
+            return null;
+        }
+
+        @Override
+        public Progress getProgress() {
+            return null;
+        }
+
+        @Override
+        public boolean next() throws IOException {
+            return false;
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+    }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsBase.java
new file mode 100644
index 0000000..26f0da3
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsBase.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.hadoop.io.ArrayPrimitiveWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinVersion;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IDictionaryBuilder;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.CuboidStatCalculator;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.DictColDeduper;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.hllc.RegisterType;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class FactDistinctColumnsBase {
+    private static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsBase.class);
+
+    private String cubeName;
+    private String segmentId;
+    private int samplingPercentage;
+    private KylinConfig envConfig;
+
+    // map
+    protected CubeInstance cube;
+    protected CubeSegment cubeSeg;
+    protected CubeDesc cubeDesc;
+    protected long baseCuboidId;
+    protected List<TblColRef> allCols;
+    protected CubeJoinedFlatTableEnrich intermediateTableDesc;
+    protected int[] columnIndex;
+    protected FactDistinctColumnsReducerMapping reducerMapping;
+    protected int nRowKey;
+    private Integer[][] allCuboidsBitSet = null;
+    private HLLCounter[] allCuboidsHLL = null;
+    private Long[] cuboidIds;
+    private int rowCount = 0;
+    private DictColDeduper dictColDeduper;
+    private Map<Integer, DimensionRangeInfo> dimensionRangeInfoMap = Maps.newHashMap();
+    private CuboidStatCalculator[] cuboidStatCalculators;
+    private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
+    private ByteBuffer tmpbuf;
+    private Text outputKey;
+    private Text outputValue;
+    private Text emptyText;
+
+    // reduce
+    public static final String DICT_FILE_POSTFIX = ".rldict";
+    public static final String DIMENSION_COL_INFO_FILE_POSTFIX = ".dci";
+    private int taskId;
+    private boolean isStatistics = false;
+    private List<Long> baseCuboidRowCountInMappers;
+    protected Map<Long, HLLCounter> cuboidHLLMap = null;
+    private TblColRef col = null;
+    private long totalRowsBeforeMerge = 0;
+    // local build dict
+    private boolean buildDictInReducer;
+    private IDictionaryBuilder builder;
+    private String maxValue = null;
+    private String minValue = null;
+
+
+    public FactDistinctColumnsBase(String cubeName, String segmentId, String metaUrl, SerializableConfiguration conf,
+                                   int samplingPercentage) {
+        this.cubeName = cubeName;
+        this.segmentId = segmentId;
+        this.samplingPercentage = samplingPercentage;
+        this.envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+    }
+
+    public void setupMap() {
+        outputKey = new Text();
+        outputValue = new Text();
+        emptyText = new Text();
+        try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                .setAndUnsetThreadLocalConfig(envConfig)) {
+            cube = CubeManager.getInstance(envConfig).getCube(cubeName);
+            cubeSeg = cube.getSegmentById(segmentId);
+            cubeDesc = cube.getDescriptor();
+            baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+            reducerMapping = new FactDistinctColumnsReducerMapping(cube);
+            allCols = reducerMapping.getAllDimDictCols();
+
+            intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
+            columnIndex = new int[allCols.size()];
+            for (int i = 0; i < allCols.size(); i++) {
+                TblColRef colRef = allCols.get(i);
+                int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
+                columnIndex[i] = columnIndexOnFlatTbl;
+            }
+
+            tmpbuf = ByteBuffer.allocate(4096);
+            nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+
+            Set<Long> cuboidIdSet = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
+            if (StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(cubeSeg)) {
+                // For cube planner, for every prebuilt cuboid, its related row count stats should be calculated
+                // If the precondition for trigger cube planner phase one is satisfied, we need to calculate row count stats for mandatory cuboids.
+                cuboidIdSet.addAll(cubeSeg.getCubeDesc().getMandatoryCuboids());
+            }
+            cuboidIds = cuboidIdSet.toArray(new Long[cuboidIdSet.size()]);
+            allCuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, nRowKey);
+
+            allCuboidsHLL = new HLLCounter[cuboidIds.length];
+            for (int i = 0; i < cuboidIds.length; i++) {
+                allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE);
+            }
+
+            //for KYLIN-2518 backward compatibility
+            boolean isUsePutRowKeyToHllNewAlgorithm;
+            if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
+                isUsePutRowKeyToHllNewAlgorithm = false;
+                logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion());
+            } else {
+                isUsePutRowKeyToHllNewAlgorithm = true;
+                logger.info(
+                        "Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518",
+                        cubeDesc.getVersion());
+            }
+
+            int calculatorNum = getStatsThreadNum(cuboidIds.length);
+            cuboidStatCalculators = new CuboidStatCalculator[calculatorNum];
+            int splitSize = cuboidIds.length / calculatorNum;
+            if (splitSize <= 0) {
+                splitSize = 1;
+            }
+            for (int i = 0; i < calculatorNum; i++) {
+                HLLCounter[] cuboidsHLLSplit;
+                Integer[][] cuboidsBitSetSplit;
+                Long[] cuboidIdSplit;
+                int start = i * splitSize;
+                if (start >= cuboidIds.length) {
+                    break;
+                }
+                int end = (i + 1) * splitSize;
+                if (i == calculatorNum - 1) {// last split
+                    end = cuboidIds.length;
+                }
+
+                cuboidsHLLSplit = Arrays.copyOfRange(allCuboidsHLL, start, end);
+                cuboidsBitSetSplit = Arrays.copyOfRange(allCuboidsBitSet, start, end);
+                cuboidIdSplit = Arrays.copyOfRange(cuboidIds, start, end);
+                CuboidStatCalculator calculator = new CuboidStatCalculator(i,
+                        intermediateTableDesc.getRowKeyColumnIndexes(), cuboidIdSplit, cuboidsBitSetSplit,
+                        isUsePutRowKeyToHllNewAlgorithm, cuboidsHLLSplit);
+                cuboidStatCalculators[i] = calculator;
+                calculator.start();
+            }
+
+            // setup dict col deduper
+            dictColDeduper = new DictColDeduper();
+            Set<TblColRef> dictCols = cubeDesc.getAllColumnsNeedDictionaryBuilt();
+            for (int i = 0; i < allCols.size(); i++) {
+                if (dictCols.contains(allCols.get(i)))
+                    dictColDeduper.setIsDictCol(i);
+            }
+        }
+    }
+
+    private int getStatsThreadNum(int cuboidNum) {
+        int unitNum = cubeDesc.getConfig().getCuboidNumberPerStatsCalculator();
+        if (unitNum <= 0) {
+            logger.warn("config from getCuboidNumberPerStatsCalculator() " + unitNum + " is should larger than 0");
+            logger.info("Will use single thread for cuboid statistics calculation");
+            return 1;
+        }
+
+        int maxCalculatorNum = cubeDesc.getConfig().getCuboidStatsCalculatorMaxNumber();
+        int calculatorNum = (cuboidNum - 1) / unitNum + 1;
+        if (calculatorNum > maxCalculatorNum) {
+            calculatorNum = maxCalculatorNum;
+        }
+        return calculatorNum;
+    }
+
+    private int countNewSize(int oldSize, int dataSize) {
+        int newSize = oldSize * 2;
+        while (newSize < dataSize) {
+            newSize = newSize * 2;
+        }
+        return newSize;
+    }
+
+    private void writeFieldValue(DataType type, Integer colIndex, String value, Visitor visitor) {
+        int reducerIndex = reducerMapping.getReducerIdForCol(colIndex, value);
+        tmpbuf.clear();
+        byte[] valueBytes = Bytes.toBytes(value);
+        int size = valueBytes.length + 1;
+        if (size >= tmpbuf.capacity()) {
+            tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
+        }
+        tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
+        tmpbuf.put(valueBytes);
+        outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+        sortableKey.init(outputKey, type);
+        visitor.collect(null, sortableKey, emptyText, null);
+        // log a few rows for troubleshooting
+        if (rowCount < 10) {
+            logger.info("Sample output: " + allCols.get(colIndex) + " '" + value + "' => reducer " + reducerIndex);
+        }
+    }
+
+    private void putRowKeyToHLL(String[] row) {
+        for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+            cuboidStatCalculator.putRow(row);
+        }
+    }
+
+    public long countSizeInBytes(String[] row) {
+        int size = 0;
+        for (String s : row) {
+            size += s == null ? 1 : StringUtil.utf8Length(s);
+            size++; // delimiter
+        }
+        return size;
+    }
+
+    public void map(String[] row, Visitor visitor) {
+        for (int i = 0; i < allCols.size(); i++) {
+            int colIndex = columnIndex[i];
+            int rowSize = row.length;
+            String fieldValue = " ";
+            if (colIndex <= rowSize - 1) {
+                fieldValue = row[colIndex];
+            } else {
+                logger.debug("colIndex:" + colIndex + " is more than rowSize: " + rowSize + " -1, so set empty value.");
+            }
+            if (fieldValue == null)
+                continue;
+
+            final DataType type = allCols.get(i).getType();
+
+            //for dic column, de dup before write value; for dim not dic column, hold util doCleanup()
+            if (dictColDeduper.isDictCol(i)) {
+                if (dictColDeduper.add(i, fieldValue)) {
+                    writeFieldValue(type, i, fieldValue, visitor);
+                }
+            } else {
+                DimensionRangeInfo old = dimensionRangeInfoMap.get(i);
+                if (old == null) {
+                    old = new DimensionRangeInfo(fieldValue, fieldValue);
+                    dimensionRangeInfoMap.put(i, old);
+                } else {
+                    old.setMax(type.getOrder().max(old.getMax(), fieldValue));
+                    old.setMin(type.getOrder().min(old.getMin(), fieldValue));
+                }
+            }
+
+            if (rowCount % 100 < samplingPercentage) {
+                putRowKeyToHLL(row);
+            }
+
+            if (rowCount % 100 == 0) {
+                dictColDeduper.resetIfShortOfMem();
+            }
+
+            rowCount++;
+        }
+    }
+
+    public void postMap(Visitor visitor) throws IOException {
+        ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+        // output each cuboid's hll to reducer, key is 0 - cuboidId
+        for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+            cuboidStatCalculator.waitForCompletion();
+        }
+        for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
+            Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
+            HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters();
+            HLLCounter hll;
+
+            for (int i = 0; i < cuboidIds.length; i++) {
+                hll = cuboidsHLL[i];
+                tmpbuf.clear();
+                tmpbuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER); // one byte
+                tmpbuf.putLong(cuboidIds[i]);
+                outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+                hllBuf.clear();
+                hll.writeRegisters(hllBuf);
+                outputValue.set(hllBuf.array(), 0, hllBuf.position());
+                sortableKey.init(outputKey, (byte) 0);
+                visitor.collect(null, sortableKey, outputValue, null);
+            }
+        }
+        for (Integer colIndex : dimensionRangeInfoMap.keySet()) {
+            DimensionRangeInfo rangeInfo = dimensionRangeInfoMap.get(colIndex);
+            DataType dataType = allCols.get(colIndex).getType();
+            writeFieldValue(dataType, colIndex, rangeInfo.getMin(), visitor);
+            writeFieldValue(dataType, colIndex, rangeInfo.getMax(), visitor);
+        }
+    }
+
+
+    public void setupReduce(int taskId) throws IOException {
+        this.taskId = taskId;
+        try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                .setAndUnsetThreadLocalConfig(envConfig)) {
+            cube = CubeManager.getInstance(envConfig).getCube(cubeName);
+            cubeDesc = cube.getDescriptor();
+            reducerMapping = new FactDistinctColumnsReducerMapping(cube);
+            logger.info("reducer no " + taskId + ", role play " + reducerMapping.getRolePlayOfReducer(taskId));
+
+            if (reducerMapping.isCuboidRowCounterReducer(taskId)) {
+                // hll
+                isStatistics = true;
+                baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId();
+                baseCuboidRowCountInMappers = Lists.newArrayList();
+                cuboidHLLMap = Maps.newHashMap();
+                logger.info("Reducer " + taskId + " handling stats");
+            } else {
+                // normal col
+                col = reducerMapping.getColForReducer(taskId);
+                Preconditions.checkNotNull(col);
+
+                // local build dict
+                buildDictInReducer = envConfig.isBuildDictInReducerEnabled();
+                if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder
+                    buildDictInReducer = false;
+                }
+                if (reducerMapping.getReducerNumForDimCol(col) > 1) {
+                    buildDictInReducer = false; // only works if this is the only reducer of a dictionary column
+                }
+                if (buildDictInReducer) {
+                    builder = DictionaryGenerator.newDictionaryBuilder(col.getType());
+                    builder.init(null, 0, null);
+                }
+                logger.info("Reducer " + taskId + " handling column " + col + ", buildDictInReducer=" + buildDictInReducer);
+            }
+        }
+    }
+
+    public void reduce(Pair<SelfDefineSortableKey, Text> kv, Visitor visitor) throws IOException {
+        if (isStatistics) {
+            // for hll
+            long cuboidId = Bytes.toLong(kv.getFirst().getText().getBytes(), 1, Bytes.SIZEOF_LONG);
+            HLLCounter hll = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision());
+            ByteBuffer bf = ByteBuffer.wrap(kv.getSecond().getBytes(), 0, kv.getSecond().getLength());
+            hll.readRegisters(bf);
+
+            totalRowsBeforeMerge += hll.getCountEstimate();
+
+            if (cuboidId == baseCuboidId) {
+                baseCuboidRowCountInMappers.add(hll.getCountEstimate());
+            }
+
+            if (cuboidHLLMap.get(cuboidId) != null) {
+                cuboidHLLMap.get(cuboidId).merge(hll);
+            } else {
+                cuboidHLLMap.put(cuboidId, hll);
+            }
+        } else {
+            String value = Bytes.toString(kv.getFirst().getText().getBytes(), 1, kv.getFirst().getText().getLength() - 1);
+            logAFewRows(value);
+            // if dimension col, compute max/min value
+            if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col) &&
+                    col.getType().needCompare()) {
+                if (minValue == null || col.getType().compare(minValue, value) > 0) {
+                    minValue = value;
+                }
+                if (maxValue == null || col.getType().compare(maxValue, value) < 0) {
+                    maxValue = value;
+                }
+            }
+
+            // if dict column
+            if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
+                if (buildDictInReducer) {
+                    builder.addValue(value);
+                } else {
+                    byte[] keyBytes = Bytes.copy(kv.getFirst().getText().getBytes(), 1, kv.getFirst().getText().getLength() - 1);
+                    // output written to baseDir/colName/-r-00000 (etc)
+                    String fileName = col.getIdentity() + "/";
+                    visitor.collect(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
+                }
+            }
+            rowCount++;
+        }
+    }
+
+    public void postReduce(Visitor visitor) throws IOException {
+        if (isStatistics) {
+            //output the hll info;
+            List<Long> allCuboids = Lists.newArrayList();
+            allCuboids.addAll(cuboidHLLMap.keySet());
+            Collections.sort(allCuboids);
+
+            logMapperAndCuboidStatistics(allCuboids); // for human check
+            outputStatistics(allCuboids, visitor);
+        } else {
+            // dimension col
+            if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+                outputDimRangeInfo(visitor);
+            }
+            // dic col
+            if (buildDictInReducer) {
+                try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                        .setAndUnsetThreadLocalConfig(envConfig)) {
+                    Dictionary<String> dict = builder.build();
+                    outputDict(col, dict, visitor);
+                }
+            }
+        }
+    }
+
+    private void logAFewRows(String value) {
+        if (rowCount < 10) {
+            logger.info("Received value: " + value);
+        }
+    }
+
+    private void outputDimRangeInfo(Visitor visitor) throws IOException {
+        if (col != null && minValue != null) {
+            // output written to baseDir/colName/colName.dci-r-00000 (etc)
+            String dimRangeFileName = col.getIdentity() + "/" + col.getName() + DIMENSION_COL_INFO_FILE_POSTFIX;
+
+            visitor.collect(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(),
+                    new Text(minValue.getBytes(StandardCharsets.UTF_8)), dimRangeFileName);
+            visitor.collect(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(),
+                    new Text(maxValue.getBytes(StandardCharsets.UTF_8)), dimRangeFileName);
+            logger.info("write dimension range info for col : " + col.getName() + "  minValue:" + minValue
+                    + " maxValue:" + maxValue);
+        }
+    }
+
+    private void outputDict(TblColRef col, Dictionary<String> dict, Visitor visitor) throws IOException {
+        // output written to baseDir/colName/colName.rldict-r-00000 (etc)
+        String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
+
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             DataOutputStream outputStream = new DataOutputStream(baos)) {
+            outputStream.writeUTF(dict.getClass().getName());
+            dict.write(outputStream);
+
+            visitor.collect(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(),
+                    new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName);
+        }
+    }
+
+    private void outputStatistics(List<Long> allCuboids, Visitor visitor) throws IOException {
+        // output written to baseDir/statistics/statistics-r-00000 (etc)
+        String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_OUTPUT_STATISTICS;
+
+        ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+
+        // mapper overlap ratio at key -1
+        long grandTotal = 0;
+        for (HLLCounter hll : cuboidHLLMap.values()) {
+            grandTotal += hll.getCountEstimate();
+        }
+        double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
+        visitor.collect(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1),
+                new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName);
+
+        // mapper number at key -2
+        visitor.collect(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2),
+                new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName);
+
+        // sampling percentage at key 0
+        visitor.collect(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L),
+                new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName);
+
+        for (long i : allCuboids) {
+            valueBuf.clear();
+            cuboidHLLMap.get(i).writeRegisters(valueBuf);
+            valueBuf.flip();
+            visitor.collect(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(i),
+                    new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName);
+        }
+    }
+
+    private void logMapperAndCuboidStatistics(List<Long> allCuboids) throws IOException {
+        logger.info("Cuboid number for task: " + taskId + "\t" + allCuboids.size());
+        logger.info("Samping percentage: \t" + samplingPercentage);
+        logger.info("The following statistics are collected based on sampling data.");
+        logger.info("Number of Mappers: " + baseCuboidRowCountInMappers.size());
+
+        for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
+            if (baseCuboidRowCountInMappers.get(i) > 0) {
+                logger.info("Base Cuboid in Mapper " + i + " row count: \t " + baseCuboidRowCountInMappers.get(i));
+            }
+        }
+
+        long grantTotal = 0;
+        for (long i : allCuboids) {
+            grantTotal += cuboidHLLMap.get(i).getCountEstimate();
+            logger.info("Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate());
+        }
+
+        logger.info("Sum of row counts (before merge) is: \t " + totalRowsBeforeMerge);
+        logger.info("After merge, the row count: \t " + grantTotal);
+    }
+
+    public abstract static class Visitor<K, V> {
+        public abstract void collect(String namedOutput, K key, V value, String outputPath);
+    }
+}