You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/08/24 09:48:18 UTC

[kylin] branch master updated: KYLIN-3442 Fact distinct columns in Spark

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new a334ee4  KYLIN-3442 Fact distinct columns in Spark
a334ee4 is described below

commit a334ee4ff1f6609ab15bd7d4744206750e77ec21
Author: chao long <wa...@qq.com>
AuthorDate: Fri Aug 24 15:07:17 2018 +0800

    KYLIN-3442 Fact distinct columns in Spark
---
 engine-spark/pom.xml                               |  41 +
 .../kylin/engine/spark/MultipleOutputsRDD.scala    | 114 +++
 .../engine/spark/SparkBatchCubingJobBuilder2.java  |  34 +-
 .../kylin/engine/spark/SparkCubingByLayer.java     |  39 +-
 .../apache/kylin/engine/spark/SparkExecutable.java |   6 +-
 .../kylin/engine/spark/SparkFactDistinct.java      | 866 +++++++++++++++++++++
 .../org/apache/kylin/engine/spark/SparkUtil.java   |  45 +-
 kylin-it/pom.xml                                   |   3 -
 pom.xml                                            |  21 +
 9 files changed, 1123 insertions(+), 46 deletions(-)

diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml
index 700aeb5..ceb9337 100644
--- a/engine-spark/pom.xml
+++ b/engine-spark/pom.xml
@@ -100,6 +100,47 @@
             <version>0.9.10</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+        </dependency>
     </dependencies>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
 </project>
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/MultipleOutputsRDD.scala b/engine-spark/src/main/java/org/apache/kylin/engine/spark/MultipleOutputsRDD.scala
new file mode 100644
index 0000000..cb5458d
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/MultipleOutputsRDD.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark
+
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.{DataInputBuffer, Writable}
+import org.apache.hadoop.mapred.RawKeyValueIterator
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.counters.GenericCounter
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.DummyReporter
+import org.apache.hadoop.mapreduce.task.{ReduceContextImpl, TaskAttemptContextImpl}
+import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, TaskAttemptID, TaskType}
+import org.apache.hadoop.util.Progress
+import org.apache.spark._
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+
+import scala.reflect.ClassTag
+
+class MultipleOutputsRDD[K, V](self: RDD[(String, (K, V, String))])
+                              (implicit kt: ClassTag[K], vt: ClassTag[V]) extends Serializable {
+
+  def saveAsNewAPIHadoopDatasetWithMultipleOutputs(conf: Configuration) {
+    val hadoopConf = conf
+    val job = NewAPIHadoopJob.getInstance(hadoopConf)
+    val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
+    val jobtrackerID = formatter.format(new Date())
+    val stageId = self.id
+    val jobConfiguration = job.getConfiguration
+    val wrappedConf = new SerializableWritable(jobConfiguration)
+    val outfmt = job.getOutputFormatClass
+    val jobFormat = outfmt.newInstance
+
+    if (conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
+      jobFormat.checkOutputSpecs(job)
+    }
+
+    val writeShard = (context: TaskContext, itr: Iterator[(String, (K, V, String))]) => {
+      val config = wrappedConf.value
+
+      val attemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.REDUCE, context.partitionId,
+        context.attemptNumber)
+      val hadoopContext = new TaskAttemptContextImpl(config, attemptId)
+      val format = outfmt.newInstance
+
+      format match {
+        case c: Configurable => c.setConf(wrappedConf.value)
+        case _ => ()
+      }
+
+      val committer = format.getOutputCommitter(hadoopContext)
+      committer.setupTask(hadoopContext)
+
+      val recordWriter = format.getRecordWriter(hadoopContext).asInstanceOf[RecordWriter[K, V]]
+
+      val taskInputOutputContext = new ReduceContextImpl(wrappedConf.value, attemptId, new InputIterator(itr), new GenericCounter, new GenericCounter,
+        recordWriter, committer, new DummyReporter, null, kt.runtimeClass, vt.runtimeClass)
+
+      // use hadoop MultipleOutputs
+      val writer = new MultipleOutputs(taskInputOutputContext)
+
+      try {
+        while (itr.hasNext) {
+          val pair = itr.next()
+          writer.write(pair._1, pair._2._1, pair._2._2, pair._2._3)
+        }
+      } finally {
+        writer.close()
+      }
+      committer.commitTask(hadoopContext)
+      1
+    }: Int
+
+    val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.MAP, 0, 0)
+    val jobTaskContext = new TaskAttemptContextImpl(wrappedConf.value, jobAttemptId)
+    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
+    jobCommitter.setupJob(jobTaskContext)
+    self.context.runJob(self, writeShard)
+    jobCommitter.commitJob(jobTaskContext)
+  }
+
+  class InputIterator(itr: Iterator[_]) extends RawKeyValueIterator {
+    def getKey: DataInputBuffer = null
+    def getValue: DataInputBuffer = null
+    def getProgress: Progress = null
+    def next = itr.hasNext
+    def close() { }
+  }
+}
+
+object MultipleOutputsRDD {
+  def rddToMultipleOutputsRDD[K, V](rdd: JavaPairRDD[String, (Writable, Writable, String)]) = {
+    new MultipleOutputsRDD(rdd)
+  }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index e545166..5fd7213 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -18,6 +18,9 @@
 
 package org.apache.kylin.engine.spark;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.util.StringUtil;
@@ -32,9 +35,6 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  */
 public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
@@ -61,7 +61,7 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
         inputSide.addStepPhase1_CreateFlatTable(result);
 
         // Phase 2: Build Dictionary
-        result.addTask(createFactDistinctColumnsStep(jobId));
+        result.addTask(createFactDistinctColumnsSparkStep(jobId));
 
         if (isEnableUHCDictStep()) {
             result.addTask(createBuildUHCDictStep(jobId));
@@ -87,6 +87,32 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
         return result;
     }
 
+    public SparkExecutable createFactDistinctColumnsSparkStep(String jobId) {
+        final SparkExecutable sparkExecutable = new SparkExecutable();
+        final IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+        final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId));
+
+        sparkExecutable.setClassName(SparkFactDistinct.class.getName());
+        sparkExecutable.setParam(SparkFactDistinct.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+        sparkExecutable.setParam(SparkFactDistinct.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobId));
+        sparkExecutable.setParam(SparkFactDistinct.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
+        sparkExecutable.setParam(SparkFactDistinct.OPTION_INPUT_PATH.getOpt(), tablePath);
+        sparkExecutable.setParam(SparkFactDistinct.OPTION_OUTPUT_PATH.getOpt(), getFactDistinctColumnsPath(jobId));
+        sparkExecutable.setParam(SparkFactDistinct.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+        sparkExecutable.setParam(SparkFactDistinct.OPTION_STATS_SAMPLING_PERCENT.getOpt(), String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+
+        sparkExecutable.setJobId(jobId);
+        sparkExecutable.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+        sparkExecutable.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
+
+        StringBuilder jars = new StringBuilder();
+
+        StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
+
+        sparkExecutable.setJars(jars.toString());
+
+        return sparkExecutable;
+    }
 
     protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
         final SparkExecutable sparkExecutable = new SparkExecutable();
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index cb3af31..9f4ae34 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -27,13 +27,10 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.cube.CubeDescManager;
@@ -68,9 +65,6 @@ import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
 import org.apache.spark.storage.StorageLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -169,37 +163,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
 
         boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
 
-        final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD;
-
-        if (isSequenceFile) {
-            encodedBaseRDD = sc.sequenceFile(inputPath, BytesWritable.class, Text.class).values()
-                    .map(new Function<Text, String[]>() {
-                        @Override
-                        public String[] call(Text text) throws Exception {
-                            String s = Bytes.toString(text.getBytes(), 0, text.getLength());
-                            return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
-                        }
-                    }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
-        } else {
-            SparkSession sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate();
-            final Dataset intermediateTable = sparkSession.table(hiveTable);
-            encodedBaseRDD = intermediateTable.javaRDD().map(new Function<Row, String[]>() {
-                @Override
-                public String[] call(Row row) throws Exception {
-                    String[] result = new String[row.size()];
-                    for (int i = 0; i < row.size(); i++) {
-                        final Object o = row.get(i);
-                        if (o != null) {
-                            result[i] = o.toString();
-                        } else {
-                            result[i] = null;
-                        }
-                    }
-                    return result;
-                }
-            }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
-
-        }
+        final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable)
+                .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
 
         Long totalCount = 0L;
         if (envConfig.isSparkSanityCheckEnabled()) {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 637382c..6122397 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -375,7 +375,11 @@ public class SparkExecutable extends AbstractExecutable {
         Set<String> dumpList = new LinkedHashSet<>();
         dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
         dumpList.addAll(segment.getDictionaryPaths());
-        dumpList.add(segment.getStatisticsResourcePath());
+        ResourceStore rs = ResourceStore.getStore(segment.getConfig());
+        if (rs.exists(segment.getStatisticsResourcePath())) {
+            // cube statistics is not available for new segment
+            dumpList.add(segment.getStatisticsResourcePath());
+        }
         JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig(), this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt()));
     }
 
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
new file mode 100644
index 0000000..61e2e53
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -0,0 +1,866 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayPrimitiveWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinVersion;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IDictionaryBuilder;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.DictColDeduper;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducerMapping;
+import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.hllc.RegisterType;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.spark.Partitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.util.LongAccumulator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+import scala.Tuple2;
+import scala.Tuple3;
+
+public class SparkFactDistinct extends AbstractApplication implements Serializable {
+
+    protected static final Logger logger = LoggerFactory.getLogger(SparkFactDistinct.class);
+
+    public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create("metaUrl");
+    public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+            .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
+            .withDescription("Cube Segment Id").create("segmentId");
+    public static final Option OPTION_STATS_SAMPLING_PERCENT = OptionBuilder
+            .withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(true)
+            .withDescription("Statistics sampling percent").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
+    public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
+            .withDescription("Hive Intermediate Table").create("hiveTable");
+    public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+            .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
+
+    private Options options;
+
+    public SparkFactDistinct() {
+        options = new Options();
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_META_URL);
+        options.addOption(OPTION_OUTPUT_PATH);
+        options.addOption(OPTION_INPUT_TABLE);
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_STATS_SAMPLING_PERCENT);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
+        String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+        String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+        int samplingPercent = Integer.parseInt(optionsHelper.getOptionValue(OPTION_STATS_SAMPLING_PERCENT));
+
+        Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey") };
+
+        SparkConf conf = new SparkConf().setAppName("Fact distinct columns for:" + cubeName + " segment " + segmentId);
+        //serialization conf
+        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
+        conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
+
+        KylinSparkJobListener jobListener = new KylinSparkJobListener();
+        JavaSparkContext sc = new JavaSparkContext(conf);
+        sc.sc().addSparkListener(jobListener);
+        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+
+        final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+        KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+
+        final Job job = Job.getInstance(sConf.get());
+
+        final FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+
+        logger.info("RDD Output path: {}", outputPath);
+        logger.info("getTotalReducerNum: {}", reducerMapping.getTotalReducerNum());
+        logger.info("getCuboidRowCounterReducerNum: {}", reducerMapping.getCuboidRowCounterReducerNum());
+
+        boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+
+        // calculate source record bytes size
+        final LongAccumulator bytesWritten = sc.sc().longAccumulator();
+
+        final JavaRDD<String[]> recordRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
+
+        JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD = recordRDD.mapPartitionsToPair(new FlatOutputFucntion(cubeName, segmentId, metaUrl, sConf, samplingPercent, bytesWritten));
+
+        JavaPairRDD<SelfDefineSortableKey, Iterable<Text>> aggredRDD = flatOutputRDD.groupByKey(new FactDistinctPartitioner(cubeName, metaUrl, sConf, reducerMapping.getTotalReducerNum()));
+
+        JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD = aggredRDD.mapPartitionsToPair(new MultiOutputFunction(cubeName, metaUrl, sConf, samplingPercent));
+
+        // make each reducer output to respective dir
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
+        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class);
+
+        FileOutputFormat.setOutputPath(job, new Path(outputPath));
+
+        // prevent to create zero-sized default output
+        LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+
+        MultipleOutputsRDD multipleOutputsRDD = MultipleOutputsRDD.rddToMultipleOutputsRDD(outputRDD);
+
+        multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
+
+        // only work for client mode, not work when spark.submit.deployMode=cluster
+        logger.info("Map input records={}", recordRDD.count());
+        logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
+
+        HadoopUtil.deleteHDFSMeta(metaUrl);
+    }
+
+    static class FlatOutputFucntion implements PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> {
+        private volatile transient boolean initialized = false;
+        private String cubeName;
+        private String segmentId;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private int samplingPercent;
+        private CuboidStatCalculator cuboidStatCalculator;
+        private FactDistinctColumnsReducerMapping reducerMapping;
+        private List<TblColRef> allCols;
+        private int[] columnIndex;
+        private DictColDeduper dictColDeduper;
+        private Map<Integer, DimensionRangeInfo> dimensionRangeInfoMap;
+        private ByteBuffer tmpbuf;
+        private LongAccumulator bytesWritten;
+
+        public FlatOutputFucntion(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf, int samplingPercent, LongAccumulator bytesWritten) {
+            this.cubeName = cubeName;
+            this.segmentId = segmentId;
+            this.metaUrl = metaurl;
+            this.conf = conf;
+            this.samplingPercent = samplingPercent;
+            this.dimensionRangeInfoMap = Maps.newHashMap();
+            this.bytesWritten = bytesWritten;
+        }
+
+        private void init() {
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+            CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+            CubeDesc cubeDesc = cubeInstance.getDescriptor();
+            CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+            CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+
+            reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+            tmpbuf = ByteBuffer.allocate(4096);
+
+            int[] rokeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
+
+            Long[] cuboidIds = getCuboidIds(cubeSegment);
+
+            Integer[][] cuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, rokeyColumnIndexes.length);
+
+            boolean isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm(cubeDesc);
+
+            HLLCounter[] cuboidsHLL = getInitCuboidsHLL(cuboidIds.length, cubeDesc.getConfig().getCubeStatsHLLPrecision());
+
+            cuboidStatCalculator = new CuboidStatCalculator(rokeyColumnIndexes, cuboidIds, cuboidsBitSet, isNewAlgorithm, cuboidsHLL);
+            allCols = reducerMapping.getAllDimDictCols();
+
+            initDictColDeduper(cubeDesc);
+            initColumnIndex(intermediateTableDesc);
+
+            initialized = true;
+        }
+
+        @Override
+        public Iterator<Tuple2<SelfDefineSortableKey, Text>> call(Iterator<String[]> rowIterator) throws Exception {
+            if (initialized == false) {
+                synchronized (SparkFactDistinct.class) {
+                    if (initialized == false) {
+                        init();
+                    }
+                }
+            }
+
+            List<String[]> rows = Lists.newArrayList(rowIterator);
+            List<Tuple2<SelfDefineSortableKey, Text>> result = Lists.newArrayList();
+
+            int rowCount = 0;
+
+            for (String[] row : rows) {
+                bytesWritten.add(countSizeInBytes(row));
+
+                for (int i = 0; i < allCols.size(); i++) {
+                    String fieldValue = row[columnIndex[i]];
+                    if (fieldValue == null)
+                        continue;
+
+                    final DataType type = allCols.get(i).getType();
+
+                    //for dic column, de dup before write value; for dim not dic column, hold util doCleanup()
+                    if (dictColDeduper.isDictCol(i)) {
+                        if (dictColDeduper.add(i, fieldValue)) {
+                            addFieldValue(type, i, fieldValue, result);
+                        }
+                    } else {
+                        DimensionRangeInfo old = dimensionRangeInfoMap.get(i);
+                        if (old == null) {
+                            old = new DimensionRangeInfo(fieldValue, fieldValue);
+                            dimensionRangeInfoMap.put(i, old);
+                        } else {
+                            old.setMax(type.getOrder().max(old.getMax(), fieldValue));
+                            old.setMin(type.getOrder().min(old.getMin(), fieldValue));
+                        }
+                    }
+                }
+
+                if (rowCount % 100 < samplingPercent) {
+                    cuboidStatCalculator.putRow(row);
+                }
+
+                if (rowCount % 100 == 0) {
+                    dictColDeduper.resetIfShortOfMem();
+                }
+
+                rowCount++;
+            }
+
+            ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+
+            // output each cuboid's hll to reducer, key is 0 - cuboidId
+            Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
+            HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters();
+            HLLCounter hll;
+
+            for (int i = 0; i < cuboidIds.length; i++) {
+                hll = cuboidsHLL[i];
+                tmpbuf.clear();
+                tmpbuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER); // one byte
+                tmpbuf.putLong(cuboidIds[i]);
+                Text outputKey = new Text();
+                Text outputValue = new Text();
+                SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
+
+                outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+                hllBuf.clear();
+                hll.writeRegisters(hllBuf);
+                outputValue.set(hllBuf.array(), 0, hllBuf.position());
+
+                sortableKey.init(outputKey, (byte) 0);
+
+                result.add(new Tuple2<SelfDefineSortableKey, Text>(sortableKey, outputValue));
+            }
+
+            for (Integer colIndex : dimensionRangeInfoMap.keySet()) {
+                DimensionRangeInfo rangeInfo = dimensionRangeInfoMap.get(colIndex);
+                DataType dataType = allCols.get(colIndex).getType();
+                addFieldValue(dataType, colIndex, rangeInfo.getMin(), result);
+                addFieldValue(dataType, colIndex, rangeInfo.getMax(), result);
+            }
+
+            return result.iterator();
+        }
+
+        private boolean isUsePutRowKeyToHllNewAlgorithm(CubeDesc cubeDesc) {
+            boolean isUsePutRowKeyToHllNewAlgorithm;
+            if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
+                isUsePutRowKeyToHllNewAlgorithm = false;
+                logger.info("Found KylinVersion: {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion());
+            } else {
+                isUsePutRowKeyToHllNewAlgorithm = true;
+                logger.info(
+                        "Found KylinVersion: {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518",
+                        cubeDesc.getVersion());
+            }
+            return isUsePutRowKeyToHllNewAlgorithm;
+        }
+
+        private Long[] getCuboidIds(CubeSegment cubeSegment) {
+            Set<Long> cuboidIdSet = Sets.newHashSet(cubeSegment.getCuboidScheduler().getAllCuboidIds());
+            if (StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(cubeSegment)) {
+                // For cube planner, for every prebuilt cuboid, its related row count stats should be calculated
+                // If the precondition for trigger cube planner phase one is satisfied, we need to calculate row count stats for mandatory cuboids.
+                cuboidIdSet.addAll(cubeSegment.getCubeDesc().getMandatoryCuboids());
+            }
+
+            return cuboidIdSet.toArray(new Long[cuboidIdSet.size()]);
+        }
+
+        private HLLCounter[] getInitCuboidsHLL(int cuboidSize, int hllPrecision) {
+            HLLCounter[] cuboidsHLL = new HLLCounter[cuboidSize];
+            for (int i = 0; i < cuboidSize; i++) {
+                cuboidsHLL[i] = new HLLCounter(hllPrecision, RegisterType.DENSE);
+            }
+            return cuboidsHLL;
+        }
+
+        private void initDictColDeduper(CubeDesc cubeDesc) {
+            // setup dict col deduper
+            dictColDeduper = new DictColDeduper();
+            Set<TblColRef> dictCols = cubeDesc.getAllColumnsNeedDictionaryBuilt();
+            for (int i = 0; i < allCols.size(); i++) {
+                if (dictCols.contains(allCols.get(i)))
+                    dictColDeduper.setIsDictCol(i);
+            }
+        }
+
+        private void initColumnIndex(CubeJoinedFlatTableEnrich intermediateTableDesc) {
+            columnIndex = new int[allCols.size()];
+            for (int i = 0; i < allCols.size(); i++) {
+                TblColRef colRef = allCols.get(i);
+                int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
+                columnIndex[i] = columnIndexOnFlatTbl;
+            }
+        }
+
+        private void addFieldValue(DataType type, Integer colIndex, String value,
+                List<Tuple2<SelfDefineSortableKey, Text>> result) {
+            int reducerIndex = reducerMapping.getReducerIdForCol(colIndex, value);
+            tmpbuf.clear();
+            byte[] valueBytes = Bytes.toBytes(value);
+            int size = valueBytes.length + 1;
+            if (size >= tmpbuf.capacity()) {
+                tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
+            }
+            tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
+            tmpbuf.put(valueBytes);
+
+            Text outputKey = new Text();
+            SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
+
+            outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+            sortableKey.init(outputKey, type);
+
+            result.add(new Tuple2<SelfDefineSortableKey, Text>(sortableKey, new Text()));
+
+            // log a few rows for troubleshooting
+            if (result.size() < 10) {
+                logger.info("Sample output: " + allCols.get(colIndex) + " '" + value + "' => reducer " + reducerIndex);
+            }
+        }
+
+        private int countNewSize(int oldSize, int dataSize) {
+            int newSize = oldSize * 2;
+            while (newSize < dataSize) {
+                newSize = newSize * 2;
+            }
+            return newSize;
+        }
+
+        private int countSizeInBytes(String[] row) {
+            int size = 0;
+            for (String s : row) {
+                size += s == null ? 1 : StringUtil.utf8Length(s);
+                size++; // delimiter
+            }
+            return size;
+        }
+    }
+
+    static class CuboidStatCalculator {
+        private final int nRowKey;
+        private final int[] rowkeyColIndex;
+        private final Long[] cuboidIds;
+        private final Integer[][] cuboidsBitSet;
+        private volatile HLLCounter[] cuboidsHLL;
+
+        //about details of the new algorithm, please see KYLIN-2518
+        private final boolean isNewAlgorithm;
+        private final HashFunction hf;
+        private long[] rowHashCodesLong;
+
+        public CuboidStatCalculator(int[] rowkeyColIndex, Long[] cuboidIds, Integer[][] cuboidsBitSet,
+                boolean isUsePutRowKeyToHllNewAlgorithm, HLLCounter[] cuboidsHLL) {
+            this.nRowKey = rowkeyColIndex.length;
+            this.rowkeyColIndex = rowkeyColIndex;
+            this.cuboidIds = cuboidIds;
+            this.cuboidsBitSet = cuboidsBitSet;
+            this.isNewAlgorithm = isUsePutRowKeyToHllNewAlgorithm;
+            if (!isNewAlgorithm) {
+                this.hf = Hashing.murmur3_32();
+            } else {
+                rowHashCodesLong = new long[nRowKey];
+                this.hf = Hashing.murmur3_128();
+            }
+            this.cuboidsHLL = cuboidsHLL;
+        }
+
+        public void putRow(final String[] row) {
+            String[] copyRow = Arrays.copyOf(row, row.length);
+
+            if (isNewAlgorithm) {
+                putRowKeyToHLLNew(copyRow);
+            } else {
+                putRowKeyToHLLOld(copyRow);
+            }
+        }
+
+        private void putRowKeyToHLLOld(String[] row) {
+            //generate hash for each row key column
+            byte[][] rowHashCodes = new byte[nRowKey][];
+            for (int i = 0; i < nRowKey; i++) {
+                Hasher hc = hf.newHasher();
+                String colValue = row[rowkeyColIndex[i]];
+                if (colValue != null) {
+                    rowHashCodes[i] = hc.putString(colValue).hash().asBytes();
+                } else {
+                    rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+                }
+            }
+
+            // user the row key column hash to get a consolidated hash for each cuboid
+            for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+                Hasher hc = hf.newHasher();
+                for (int position = 0; position < cuboidsBitSet[i].length; position++) {
+                    hc.putBytes(rowHashCodes[cuboidsBitSet[i][position]]);
+                }
+
+                cuboidsHLL[i].add(hc.hash().asBytes());
+            }
+        }
+
+        private void putRowKeyToHLLNew(String[] row) {
+            //generate hash for each row key column
+            for (int i = 0; i < nRowKey; i++) {
+                Hasher hc = hf.newHasher();
+                String colValue = row[rowkeyColIndex[i]];
+                if (colValue == null)
+                    colValue = "0";
+                byte[] bytes = hc.putString(colValue).hash().asBytes();
+                rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+            }
+
+            // user the row key column hash to get a consolidated hash for each cuboid
+            for (int i = 0, n = cuboidsBitSet.length; i < n; i++) {
+                long value = 0;
+                for (int position = 0; position < cuboidsBitSet[i].length; position++) {
+                    value += rowHashCodesLong[cuboidsBitSet[i][position]];
+                }
+                cuboidsHLL[i].addHashDirectly(value);
+            }
+        }
+
+        public HLLCounter[] getHLLCounters() {
+            return cuboidsHLL;
+        }
+
+        public Long[] getCuboidIds() {
+            return cuboidIds;
+        }
+    }
+
+    static class FactDistinctPartitioner extends Partitioner {
+        private volatile transient boolean initialized = false;
+        private String cubeName;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private int totalReducerNum;
+        private FactDistinctColumnsReducerMapping reducerMapping;
+
+        public FactDistinctPartitioner(String cubeName, String metaUrl, SerializableConfiguration conf, int totalReducerNum) {
+            this.cubeName = cubeName;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+            this.totalReducerNum = totalReducerNum;
+        }
+
+        private void init() {
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+            CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+            reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+
+            initialized = true;
+        }
+
+        @Override
+        public int numPartitions() {
+            return totalReducerNum;
+        }
+
+        @Override
+        public int getPartition(Object o) {
+            if (initialized == false) {
+                synchronized (SparkFactDistinct.class) {
+                    if (initialized == false) {
+                        init();
+                    }
+                }
+            }
+
+            SelfDefineSortableKey skey = (SelfDefineSortableKey) o;
+            Text key = skey.getText();
+            if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
+                Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
+                return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);
+            } else {
+                return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
+            }
+        }
+    }
+
+    static class MultiOutputFunction implements
+            PairFlatMapFunction<Iterator<Tuple2<SelfDefineSortableKey, Iterable<Text>>>, String, Tuple3<Writable, Writable, String>> {
+        private volatile transient boolean initialized = false;
+        private String DICT_FILE_POSTFIX = ".rldict";
+        private String DIMENSION_COL_INFO_FILE_POSTFIX = ".dci";
+        private String cubeName;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private int samplingPercent;
+        private FactDistinctColumnsReducerMapping reducerMapping;
+        private int taskId;
+        private boolean isStatistics = false;
+        private long baseCuboidId;
+        private List<Long> baseCuboidRowCountInMappers;
+        private Map<Long, HLLCounter> cuboidHLLMap;
+        private TblColRef col;
+        private boolean buildDictInReducer;
+        private IDictionaryBuilder builder;
+        private int rowCount = 0;
+        private long totalRowsBeforeMerge = 0;
+        private KylinConfig cubeConfig;
+        private CubeDesc cubeDesc;
+        private String maxValue = null;
+        private String minValue = null;
+        private List<Tuple2<String, Tuple3<Writable, Writable, String>>> result;
+
+        public MultiOutputFunction(String cubeName, String metaurl, SerializableConfiguration conf, int samplingPercent) {
+            this.cubeName = cubeName;
+            this.metaUrl = metaurl;
+            this.conf = conf;
+            this.samplingPercent = samplingPercent;
+        }
+
+        private void init() throws IOException {
+            taskId = TaskContext.getPartitionId();
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+            CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+            cubeDesc = cubeInstance.getDescriptor();
+            cubeConfig = cubeInstance.getConfig();
+            reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+
+            result = Lists.newArrayList();
+
+            if (reducerMapping.isCuboidRowCounterReducer(taskId)) {
+                // hll
+                isStatistics = true;
+                baseCuboidId = cubeInstance.getCuboidScheduler().getBaseCuboidId();
+                baseCuboidRowCountInMappers = Lists.newArrayList();
+                cuboidHLLMap = Maps.newHashMap();
+
+                logger.info("Partition " + taskId + " handling stats");
+            } else {
+                // normal col
+                col = reducerMapping.getColForReducer(taskId);
+                Preconditions.checkNotNull(col);
+
+                // local build dict
+                buildDictInReducer = kConfig.isBuildDictInReducerEnabled();
+                if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder
+                    buildDictInReducer = false;
+                }
+
+                if (reducerMapping.getReducerNumForDimCol(col) > 1) {
+                    buildDictInReducer = false; // only works if this is the only reducer of a dictionary column
+                }
+
+                if (buildDictInReducer) {
+                    builder = DictionaryGenerator.newDictionaryBuilder(col.getType());
+                    builder.init(null, 0, null);
+                }
+                logger.info("Partition " + taskId + " handling column " + col + ", buildDictInReducer=" + buildDictInReducer);
+            }
+
+            initialized = true;
+        }
+
+        private void logAFewRows(String value) {
+            if (rowCount < 10) {
+                logger.info("Received value: " + value);
+            }
+        }
+
+        @Override
+        public Iterator<Tuple2<String, Tuple3<Writable, Writable, String>>> call(
+                Iterator<Tuple2<SelfDefineSortableKey, Iterable<Text>>> tuple2Iterator) throws Exception {
+            if (initialized == false) {
+                synchronized (SparkFactDistinct.class) {
+                    if (initialized == false) {
+                        init();
+                    }
+                }
+            }
+
+            List<Tuple2<SelfDefineSortableKey, Iterable<Text>>> tuples = Lists.newArrayList(tuple2Iterator);
+
+            for (Tuple2<SelfDefineSortableKey, Iterable<Text>> tuple : tuples) {
+                Text key = tuple._1.getText();
+
+                if (isStatistics) {
+                    // for hll
+                    long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
+
+                    for (Text value : tuple._2) {
+                        HLLCounter hll = new HLLCounter(cubeConfig.getCubeStatsHLLPrecision());
+                        ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
+                        hll.readRegisters(bf);
+
+                        totalRowsBeforeMerge += hll.getCountEstimate();
+
+                        if (cuboidId == baseCuboidId) {
+                            baseCuboidRowCountInMappers.add(hll.getCountEstimate());
+                        }
+
+                        if (cuboidHLLMap.get(cuboidId) != null) {
+                            cuboidHLLMap.get(cuboidId).merge(hll);
+                        } else {
+                            cuboidHLLMap.put(cuboidId, hll);
+                        }
+                    }
+
+                } else {
+                    String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
+                    logAFewRows(value);
+                    // if dimension col, compute max/min value
+                    if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+                        if (minValue == null || col.getType().compare(minValue, value) > 0) {
+                            minValue = value;
+                        }
+                        if (maxValue == null || col.getType().compare(maxValue, value) < 0) {
+                            maxValue = value;
+                        }
+                    }
+
+                    //if dict column
+                    if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
+                        if (buildDictInReducer) {
+                            builder.addValue(value);
+                        } else {
+                            byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
+                            // output written to baseDir/colName/-r-00000 (etc)
+                            String fileName = col.getIdentity() + "/";
+                            result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(
+                                    BatchConstants.CFG_OUTPUT_COLUMN, new Tuple3<Writable, Writable, String>(
+                                    NullWritable.get(), new Text(keyBytes), fileName)));
+                        }
+                    }
+                }
+
+                rowCount++;
+            }
+
+            if (isStatistics) {
+                //output the hll info;
+                List<Long> allCuboids = Lists.newArrayList();
+                allCuboids.addAll(cuboidHLLMap.keySet());
+                Collections.sort(allCuboids);
+
+                logMapperAndCuboidStatistics(allCuboids); // for human check
+                outputStatistics(allCuboids, result);
+            } else {
+                //dimension col
+                if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+                    outputDimRangeInfo(result);
+                }
+                // dic col
+                if (buildDictInReducer) {
+                    Dictionary<String> dict = builder.build();
+                    outputDict(col, dict, result);
+                }
+            }
+
+            return result.iterator();
+        }
+
+        private void logMapperAndCuboidStatistics(List<Long> allCuboids) throws IOException {
+            logger.info("Cuboid number for task: " + taskId + "\t" + allCuboids.size());
+            logger.info("Samping percentage: \t" + samplingPercent);
+            logger.info("The following statistics are collected based on sampling data. ");
+            logger.info("Number of Mappers: " + baseCuboidRowCountInMappers.size());
+
+            for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
+                if (baseCuboidRowCountInMappers.get(i) > 0) {
+                    logger.info("Base Cuboid in Mapper " + i + " row count: \t " + baseCuboidRowCountInMappers.get(i));
+                }
+            }
+
+            long grantTotal = 0;
+            for (long i : allCuboids) {
+                grantTotal += cuboidHLLMap.get(i).getCountEstimate();
+                logger.info("Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate());
+            }
+
+            logger.info("Sum of row counts (before merge) is: \t " + totalRowsBeforeMerge);
+            logger.info("After merge, the row count: \t " + grantTotal);
+        }
+
+        private void outputDimRangeInfo(List<Tuple2<String, Tuple3<Writable, Writable, String>>> result) {
+            if (col != null && minValue != null) {
+                // output written to baseDir/colName/colName.dci-r-00000 (etc)
+                String dimRangeFileName = col.getIdentity() + "/" + col.getName() + DIMENSION_COL_INFO_FILE_POSTFIX;
+
+                result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_PARTITION,
+                        new Tuple3<Writable, Writable, String>(NullWritable.get(), new Text(minValue.getBytes()),
+                                dimRangeFileName)));
+                result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_PARTITION,
+                        new Tuple3<Writable, Writable, String>(NullWritable.get(), new Text(maxValue.getBytes()),
+                                dimRangeFileName)));
+                logger.info("write dimension range info for col : " + col.getName() + "  minValue:" + minValue
+                        + " maxValue:" + maxValue);
+            }
+        }
+
+        private void outputDict(TblColRef col, Dictionary<String> dict, List<Tuple2<String, Tuple3<Writable, Writable, String>>> result)
+                throws IOException, InterruptedException {
+            // output written to baseDir/colName/colName.rldict-r-00000 (etc)
+            String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
+
+            try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(baos)) {
+                outputStream.writeUTF(dict.getClass().getName());
+                dict.write(outputStream);
+
+                result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_DICT,
+                        new Tuple3<Writable, Writable, String>(NullWritable.get(),
+                                new ArrayPrimitiveWritable(baos.toByteArray()), dictFileName)));
+            }
+        }
+
+        private void outputStatistics(List<Long> allCuboids, List<Tuple2<String, Tuple3<Writable, Writable, String>>> result)
+                throws IOException, InterruptedException {
+            // output written to baseDir/statistics/statistics-r-00000 (etc)
+            String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_OUTPUT_STATISTICS;
+
+            // mapper overlap ratio at key -1
+            long grandTotal = 0;
+
+            for (HLLCounter hll : cuboidHLLMap.values()) {
+                grandTotal += hll.getCountEstimate();
+            }
+
+            double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
+            result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+                    new Tuple3<Writable, Writable, String>(new LongWritable(-1),
+                            new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName)));
+
+            // mapper number at key -2
+            result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+                    new Tuple3<Writable, Writable, String>(new LongWritable(-2),
+                            new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName)));
+
+            // sampling percentage at key 0
+            result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+                    new Tuple3<Writable, Writable, String>(new LongWritable(0L),
+                            new BytesWritable(Bytes.toBytes(samplingPercent)), statisticsFileName)));
+
+            ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+
+            for (long i : allCuboids) {
+                valueBuf.clear();
+                cuboidHLLMap.get(i).writeRegisters(valueBuf);
+                valueBuf.flip();
+
+                byte[] valueCopy = new byte[valueBuf.limit()];
+                System.arraycopy(valueBuf.array(), 0, valueCopy, 0, valueBuf.limit());
+
+                result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(BatchConstants.CFG_OUTPUT_STATISTICS,
+                        new Tuple3<Writable, Writable, String>(new LongWritable(i),
+                                new BytesWritable(valueCopy, valueCopy.length), statisticsFileName)));
+            }
+        }
+    }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index 31eebc8..82a1a9b 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -24,23 +24,31 @@ import java.util.List;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.storage.StorageFactory;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import com.google.common.collect.Lists;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
 
 public class SparkUtil {
 
@@ -130,6 +138,41 @@ public class SparkUtil {
         sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");
         sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
         sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.DefaultCodec"); // or org.apache.hadoop.io.compress.SnappyCodec
-  }
+    }
+
+    public static JavaRDD<String[]> hiveRecordInputRDD(boolean isSequenceFile, JavaSparkContext sc, String inputPath, String hiveTable) {
+        JavaRDD<String[]> recordRDD;
+
+        if (isSequenceFile) {
+            recordRDD = sc.sequenceFile(inputPath, BytesWritable.class, Text.class).values()
+                    .map(new Function<Text, String[]>() {
+                        @Override
+                        public String[] call(Text text) throws Exception {
+                            String s = Bytes.toString(text.getBytes(), 0, text.getLength());
+                            return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
+                        }
+                    });
+        } else {
+            SparkSession sparkSession = SparkSession.builder().config(sc.getConf()).enableHiveSupport().getOrCreate();
+            final Dataset intermediateTable = sparkSession.table(hiveTable);
+            recordRDD = intermediateTable.javaRDD().map(new Function<Row, String[]>() {
+                @Override
+                public String[] call(Row row) throws Exception {
+                    String[] result = new String[row.size()];
+                    for (int i = 0; i < row.size(); i++) {
+                        final Object o = row.get(i);
+                        if (o != null) {
+                            result[i] = o.toString();
+                        } else {
+                            result[i] = null;
+                        }
+                    }
+                    return result;
+                }
+            });
+        }
+
+        return recordRDD;
+    }
 
 }
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 16bedb5..a3e7e68 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -307,17 +307,14 @@
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
-            <version>2.11.0</version>
         </dependency>
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-compiler</artifactId>
-            <version>2.11.0</version>
         </dependency>
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-reflect</artifactId>
-            <version>2.11.0</version>
         </dependency>
     </dependencies>
 
diff --git a/pom.xml b/pom.xml
index d9b9efe..cd18659 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,9 @@
         <spark.version>2.1.2</spark.version>
         <kryo.version>4.0.0</kryo.version>
 
+        <!-- Scala versions -->
+        <scala.version>2.11.0</scala.version>
+
         <!-- <reflections.version>0.9.10</reflections.version> -->
 
         <!-- Calcite Version -->
@@ -895,6 +898,24 @@
                 <version>${tomcat.version}</version>
                 <scope>provided</scope>
             </dependency>
+
+            <dependency>
+                <groupId>org.scala-lang</groupId>
+                <artifactId>scala-library</artifactId>
+                <version>${scala.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.scala-lang</groupId>
+                <artifactId>scala-compiler</artifactId>
+                <version>${scala.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.scala-lang</groupId>
+                <artifactId>scala-reflect</artifactId>
+                <version>${scala.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>