You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/06/18 06:13:22 UTC

[kylin] branch master updated: [KYLIN-4947] Implement spark engine for cube optimization jobs.

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new efedf07  [KYLIN-4947] Implement spark engine for cube optimization jobs.
efedf07 is described below

commit efedf07043c704cbdfa04ac3af3d73cec866cb76
Author: yangjiang <ya...@ebay.com>
AuthorDate: Wed Apr 7 15:39:04 2021 +0800

    [KYLIN-4947] Implement spark engine for cube optimization jobs.
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   8 +
 .../apache/kylin/cube/cuboid/CuboidScheduler.java  |   3 +-
 .../org/apache/kylin/cube/cuboid/CuboidUtil.java   |   1 +
 .../kylin/cube/cuboid/TreeCuboidScheduler.java     |   7 +-
 .../org/apache/kylin/cube/kv/RowKeyDecoder.java    |   3 +-
 .../kylin/job/constant/ExecutableConstants.java    |   6 +
 .../kylin/engine/mr/BatchOptimizeJobBuilder2.java  |   1 +
 .../kylin/engine/mr/common/BatchConstants.java     |   5 +-
 .../kylin/engine/mr/common/CubeStatsWriter.java    |   6 +
 .../kylin/engine/spark/KylinKryoRegistrator.java   |   5 +
 .../engine/spark/SparkBatchCubingEngine2.java      |   7 +-
 .../spark/SparkBatchOptimizeJobBuilder2.java       | 211 ++++++++++++
 .../kylin/engine/spark/SparkBuildDictionary.java   |   4 +-
 .../SparkCalculateStatsFromBaseCuboidJob.java      | 354 +++++++++++++++++++++
 .../kylin/engine/spark/SparkCubingByLayer.java     |  10 +-
 .../engine/spark/SparkCubingByLayerForOpt.java     | 269 ++++++++++++++++
 .../kylin/engine/spark/SparkCubingMerge.java       |   4 +-
 .../apache/kylin/engine/spark/SparkExecutable.java |  10 +-
 .../kylin/engine/spark/SparkFactDistinct.java      |   4 +-
 .../spark/SparkFilterRecommendCuboidDataJob.java   | 168 ++++++++++
 .../apache/kylin/engine/spark/SparkFunction.java   | 104 ++++++
 .../kylin/engine/spark/SparkMergingDictionary.java |   4 +-
 .../kylin/engine/spark/SparkUHCDictionary.java     |   4 +-
 .../SparkUpdateShardForOldCuboidDataStep.java      | 223 +++++++++++++
 .../org/apache/kylin/engine/spark/SparkUtil.java   |  99 +++++-
 .../kylin/storage/hbase/steps/BulkLoadJob.java     |   2 +-
 .../kylin/storage/hbase/steps/CreateHTableJob.java |   2 +
 .../hbase/steps/HBaseSparkOutputTransition.java    |  26 +-
 .../kylin/storage/hbase/steps/SparkCubeHFile.java  |   4 +-
 29 files changed, 1518 insertions(+), 36 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 8d22fa3..767ae8b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1731,6 +1731,14 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.engine.spark-create-table-enabled", FALSE));
     }
 
+    public boolean isSparkOptimizeCubeViaSparkEnable() {
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-optimize-cube-enabled", TRUE));
+    }
+
+    public boolean isUseSparkCalculateStatsEnable() {
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-calculate-stats-enabled", TRUE));
+    }
+
     public boolean isFlinkSanityCheckEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.flink.sanity-check-enabled", FALSE));
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
index 5c57fad..8ac19fd 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.cube.cuboid;
 
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -32,7 +33,7 @@ import org.apache.kylin.shaded.com.google.common.collect.Lists;
 /**
  * Defines a cuboid tree, rooted by the base cuboid. A parent cuboid generates its child cuboids.
  */
-abstract public class CuboidScheduler {
+abstract public class CuboidScheduler implements Serializable {
     
     public static CuboidScheduler getInstance(CubeDesc cubeDesc) {
         String clzName = cubeDesc.getConfig().getCuboidScheduler();
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
index aae3129..e40ea5f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
@@ -32,6 +32,7 @@ import org.apache.kylin.shaded.com.google.common.collect.Maps;
 
 public class CuboidUtil {
 
+    // get the i cuboid the j '1' `s index
     public static Integer[][] getCuboidBitSet(Long[] cuboidIds, int nRowKey) {
         Preconditions.checkArgument(nRowKey < Long.SIZE,
                 "the size of row key could not be large than " + (Long.SIZE - 1));
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidScheduler.java
index 40242c4..b18e38b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidScheduler.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidScheduler.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.cube.cuboid;
 
 import java.io.PrintWriter;
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -71,7 +72,7 @@ public class TreeCuboidScheduler extends CuboidScheduler {
         return cuboidTree.isValid(requestCuboid);
     }
 
-    public static class CuboidTree {
+    public static class CuboidTree implements Serializable {
         private int treeLevels;
 
         private TreeNode root;
@@ -232,7 +233,7 @@ public class TreeCuboidScheduler extends CuboidScheduler {
         }
     }
 
-    public static class TreeNode {
+    public static class TreeNode implements Serializable {
         @JsonProperty("cuboid_id")
         long cuboidId;
         @JsonIgnore
@@ -290,7 +291,7 @@ public class TreeCuboidScheduler extends CuboidScheduler {
     /**
      * Compare cuboid according to the cuboid data row count
      */
-    public static class CuboidCostComparator implements Comparator<Long> {
+    public static class CuboidCostComparator implements Comparator<Long>, Serializable {
         private Map<Long, Long> cuboidStatistics;
 
         public CuboidCostComparator(Map<Long, Long> cuboidStatistics) {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
index 71ad4bf..516ce7f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.cube.kv;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -34,7 +35,7 @@ import org.apache.kylin.metadata.model.TblColRef;
  * @author xjiang
  * 
  */
-public class RowKeyDecoder {
+public class RowKeyDecoder implements Serializable {
 
     private final CubeDesc cubeDesc;
     private final RowKeyColumnIO colIO;
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 4deab99..aa9e875 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -46,12 +46,17 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE = "Sqoop To Flat Hive Table";
     public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables";
     public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
+    public static final String STEP_NAME_FACT_DISTINCT_COLUMNS_SPARK = "Extract Fact Table Distinct Columns With Spark";
     public static final String STEP_NAME_CALCULATE_STATS_FROM_BASE_CUBOID = "Calculate Statistics from Base Cuboid";
     public static final String STEP_NAME_FILTER_RECOMMEND_CUBOID_DATA_FOR_OPTIMIZATION = "Filter Recommend Cuboid Data for Optimization";
+    public static final String STEP_NAME_FILTER_RECOMMEND_CUBOID_DATA_FOR_OPTIMIZATION_SPARK = "Filter Recommend Cuboid Data for Optimization with Spark";
+    public static final String STEP_NAME_CALCULATE_STATS_FROM_BASECUBOID_SPARK = "Calculate Stats From BaseCuboid Step with Spark";
     public static final String STEP_NAME_UPDATE_OLD_CUBOID_SHARD = "Update Old Cuboid Shard for Optimization";
+    public static final String STEP_NAME_UPDATE_OLD_CUBOID_SHARD_SPARK = "Update Old Cuboid Shard for Optimization With Spark";
     public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid";
     public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube In-Mem";
     public static final String STEP_NAME_BUILD_SPARK_CUBE = "Build Cube with Spark";
+    public static final String STEP_NAME_OPTIMIZE_SPARK_CUBE = "Optimize Cube with Spark";
     public static final String STEP_NAME_BUILD_FLINK_CUBE = "Build Cube with Flink";
     public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid";
     public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits";
@@ -64,6 +69,7 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics";
     public static final String STEP_NAME_MERGE_UPDATE_DICTIONARY = "Update Dictionary Data";
     public static final String STEP_NAME_MERGE_STATISTICS_WITH_OLD = "Merge Cuboid Statistics with Old for Optimization";
+    public static final String STEP_NAME_MERGE_STATISTICS_WITH_SPARK = "Merge Cuboid Statistics with Spark for Optimization";
     public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics";
     public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
     public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
index 9e8b9e8..18a9ded 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
@@ -64,6 +64,7 @@ public class BatchOptimizeJobBuilder2 extends JobBuilderSupport {
 
         // Phase 1: Prepare base cuboid data from old segment
         String oldcuboidRootPath = getCuboidRootPath(oldSegment) + "*";
+        // write to optimizeCuboidRootPath + /base_cuboid OR +/old
         result.addTask(createFilterRecommendCuboidDataStep(oldcuboidRootPath, optimizeCuboidRootPath));
 
         // Phase 2: Prepare dictionary and statistics for new segment
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index d0e2936..dd2dc6e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -94,7 +94,7 @@ public interface BatchConstants {
     String ARG_CUBE_NAME = "cubename";
     String ARG_II_NAME = "iiname";
     String ARG_SEGMENT_NAME = "segmentname";
-    String ARG_SEGMENT_ID = "segmentid";
+    String ARG_SEGMENT_ID = "segmentId";
     String ARG_PARTITION = "partitions";
     String ARG_STATS_ENABLED = "statisticsenabled";
     String ARG_STATS_OUTPUT = "statisticsoutput";
@@ -107,7 +107,7 @@ public interface BatchConstants {
     String ARG_TABLE_NAME = "tableName";
     String ARG_LOOKUP_SNAPSHOT_ID = "snapshotID";
     String ARG_EXT_LOOKUP_SNAPSHOTS_INFO = "extlookupsnapshots";
-    String ARG_META_URL = "metadataUrl";
+    String ARG_META_URL = "metaUrl";
     String ARG_HBASE_CONF_PATH = "hbaseConfPath";
     String ARG_SHRUNKEN_DICT_PATH = "shrunkenDictPath";
     String ARG_COUNTER_OUTPUT = "counterOutput";
@@ -116,6 +116,7 @@ public interface BatchConstants {
     String ARG_BASE64_ENCODED_SQL = "base64EncodedSql";
     String ARG_GLOBAL_DIC_PART_REDUCE_STATS = "global_dict_part_reduce_stats";
     String ARG_GLOBAL_DIC_MAX_DISTINCT_COUNT = "global_dict_max_distinct_count";
+    String ARG_HIVE_TABLE = "hiveTable";
 
     /**
      * logger and counter
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
index 3c41e1f..4348bdf 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
@@ -35,9 +35,13 @@ import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.hllc.HLLCounter;
 
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CubeStatsWriter {
 
+    protected static final Logger logger = LoggerFactory.getLogger(CubeStatsWriter.class);
+
     public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
             Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage) throws IOException {
         writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, 0);
@@ -62,6 +66,8 @@ public class CubeStatsWriter {
             Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio,
             int shard) throws IOException {
         Path seqFilePath = new Path(outputPath, BatchConstants.CFG_OUTPUT_STATISTICS + "_" + shard);
+        logger.info("writePartialCuboidStatistics for cuboid: " + cuboidHLLMap.keySet().toString());
+        logger.info("writePartialCuboidStatistics Path: " + seqFilePath);
         writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber,
                 mapperOverlapRatio, 0);
     }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
index a50c11d..5cf1e2f 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
@@ -86,6 +86,8 @@ public class KylinKryoRegistrator implements KryoRegistrator {
         kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnFamilyDesc[].class);
         kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnDesc[].class);
         kyroClasses.add(org.apache.kylin.cube.model.RowKeyColDesc[].class);
+        kyroClasses.add(org.apache.kylin.cube.cuboid.TreeCuboidScheduler.TreeNode.class);
+        kyroClasses.add(org.apache.kylin.cube.cuboid.TreeCuboidScheduler.CuboidCostComparator.class);
         kylinClassByReflection1(kyroClasses);
         kylinClassByReflection2(kyroClasses);
 
@@ -146,6 +148,8 @@ public class KylinKryoRegistrator implements KryoRegistrator {
         kyroClasses.add(org.apache.kylin.cube.common.RowKeySplitter.class);
         kyroClasses.add(org.apache.kylin.cube.cuboid.Cuboid.class);
         kyroClasses.add(org.apache.kylin.cube.cuboid.DefaultCuboidScheduler.class);
+        kyroClasses.add(org.apache.kylin.cube.cuboid.TreeCuboidScheduler.class);
+        kyroClasses.add(org.apache.kylin.cube.cuboid.TreeCuboidScheduler.CuboidTree.class);
         kyroClasses.add(org.apache.kylin.cube.gridtable.TrimmedDimensionSerializer.class);
         kyroClasses.add(org.apache.kylin.cube.kv.AbstractRowKeyEncoder.class);
         kyroClasses.add(org.apache.kylin.cube.kv.CubeDimEncMap.class);
@@ -155,6 +159,7 @@ public class KylinKryoRegistrator implements KryoRegistrator {
         kyroClasses.add(org.apache.kylin.cube.kv.RowKeyColumnIO.class);
         kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoder.class);
         kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoderProvider.class);
+        kyroClasses.add(org.apache.kylin.cube.kv.RowKeyDecoder.class);
         kyroClasses.add(org.apache.kylin.cube.model.AggregationGroup.class);
         kyroClasses.add(org.apache.kylin.cube.model.AggregationGroup.HierarchyMask.class);
         kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.class);
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
index d3afb03..24f3f91 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
@@ -51,8 +51,11 @@ public class SparkBatchCubingEngine2 implements IBatchCubingEngine {
 
     @Override
     public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter) {
-        //TODO use Spark to optimize
-        return new BatchOptimizeJobBuilder2(optimizeSegment, submitter).build();
+        if (optimizeSegment.getConfig().isSparkOptimizeCubeViaSparkEnable()) {
+            return new SparkBatchOptimizeJobBuilder2(optimizeSegment, submitter).build();
+        } else {
+            return new BatchOptimizeJobBuilder2(optimizeSegment, submitter).build();
+        }
     }
 
     @Override
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchOptimizeJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchOptimizeJobBuilder2.java
new file mode 100644
index 0000000..2971857
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchOptimizeJobBuilder2.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.steps.CopyDictionaryStep;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.mr.steps.MergeStatisticsWithOldStep;
+import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterOptimizeStep;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SparkBatchOptimizeJobBuilder2 extends JobBuilderSupport {
+    private static final Logger logger = LoggerFactory.getLogger(SparkBatchOptimizeJobBuilder2.class);
+
+    private final ISparkOutput.ISparkBatchOptimizeOutputSide outputSide;
+
+    public SparkBatchOptimizeJobBuilder2(CubeSegment optimizeSegment, String submitter) {
+        super(optimizeSegment, submitter);
+        this.outputSide = SparkUtil.getBatchOptimizeOutputSide2(seg);
+    }
+
+    public CubingJob build() {
+        logger.info("Spark new job to Optimize segment " + seg);
+        final CubingJob result = CubingJob.createOptimizeJob(seg, submitter, config);
+        final String jobId = result.getId();
+        final String cuboidRootPath = getCuboidRootPath(jobId);
+
+        final String optimizeCuboidRootPath = getOptimizationCuboidPath(jobId);
+
+        CubeSegment oldSegment = seg.getCubeInstance().getOriginalSegmentToOptimize(seg);
+        Preconditions.checkNotNull(oldSegment, "cannot find the original segment to be optimized by " + seg);
+
+        // Phase 1: Prepare base cuboid data from old segment
+        String oldcuboidRootPath = getCuboidRootPath(oldSegment) + "*";
+        // Filter cuboid to optimizeCuboidRootPath + /base_cuboid and +/old
+        result.addTask(createFilterRecommendCuboidDataStep(oldcuboidRootPath, optimizeCuboidRootPath, jobId));
+
+        // Phase 2: Prepare dictionary and statistics for new segment
+        result.addTask(createCopyDictionaryStep());
+        String optStatsSourcePath = getBaseCuboidPath(optimizeCuboidRootPath);
+        String optStatsDstPath = getOptimizationStatisticsPath(jobId);
+        // Calculate statistic
+        if (seg.getConfig().isUseSparkCalculateStatsEnable()) {
+            result.addTask(createCalculateStatsFromBaseCuboidStepWithSpark(optStatsSourcePath, optStatsDstPath,
+                    CuboidModeEnum.RECOMMEND_MISSING, jobId));
+        } else {
+            result.addTask(createCalculateStatsFromBaseCuboid(optStatsSourcePath, optStatsDstPath,
+                    CuboidModeEnum.RECOMMEND_MISSING));
+        }
+
+        result.addTask(createMergeStatisticsWithOldStep(jobId, optStatsDstPath, getStatisticsPath(jobId)));
+        outputSide.addStepPhase2_CreateHTable(result);
+
+        result.addTask(createUpdateShardForOldCuboidDataStep(optimizeCuboidRootPath, cuboidRootPath, jobId));
+
+        // Phase 3: Build Cube for Missing Cuboid Data
+        addLayerCubingSteps(result, jobId, CuboidModeEnum.RECOMMEND_MISSING_WITH_BASE,
+                SparkUtil.generateFilePath(PathNameCuboidBase, cuboidRootPath), cuboidRootPath); // layer cubing
+
+        outputSide.addStepPhase3_BuildCube(result);
+
+        // Phase 4: Update Metadata & Cleanup
+        result.addTask(createUpdateCubeInfoAfterOptimizeStep(jobId));
+        outputSide.addStepPhase4_Cleanup(result);
+
+        return result;
+    }
+
+    private SparkExecutable createCalculateStatsFromBaseCuboidStepWithSpark(String inputPath, String outputPath,
+            CuboidModeEnum recommendMissing, String jobId) {
+        final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig());
+        sparkExecutable.setName(ExecutableConstants.STEP_NAME_CALCULATE_STATS_FROM_BASECUBOID_SPARK);
+
+        sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_CUBE_NAME.getOpt(),
+                seg.getRealization().getName());
+        sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+        sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_INPUT_PATH.getOpt(), inputPath);
+        sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_OUTPUT_PATH.getOpt(), outputPath);
+        sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_META_URL.getOpt(),
+                getSegmentMetadataUrl(seg.getConfig(), jobId));
+        sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_JOB_MODE.getOpt(),
+                recommendMissing.toString());
+        sparkExecutable.setParam(SparkCalculateStatsFromBaseCuboidJob.OPTION_SAMPLING_PERCENT.getOpt(),
+                String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+        //need JobId in sparkExecutable
+        sparkExecutable.setJobId(jobId);
+
+        sparkExecutable.setClassName(SparkCalculateStatsFromBaseCuboidJob.class.getName());
+
+        return sparkExecutable;
+    }
+
+    private SparkExecutable createFilterRecommendCuboidDataStep(String inputPath, String outputPath , String jobId) {
+        final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig());
+        sparkExecutable.setName(ExecutableConstants.STEP_NAME_FILTER_RECOMMEND_CUBOID_DATA_FOR_OPTIMIZATION_SPARK);
+
+        sparkExecutable.setParam(SparkFilterRecommendCuboidDataJob.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+        sparkExecutable.setParam(SparkFilterRecommendCuboidDataJob.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+        sparkExecutable.setParam(SparkFilterRecommendCuboidDataJob.OPTION_INPUT_PATH.getOpt(), inputPath);
+        sparkExecutable.setParam(SparkFilterRecommendCuboidDataJob.OPTION_OUTPUT_PATH.getOpt(), outputPath);
+        sparkExecutable.setParam(SparkFilterRecommendCuboidDataJob.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobId));
+        sparkExecutable.setClassName(SparkFilterRecommendCuboidDataJob.class.getName());
+        //need JobId in sparkExecutable
+        sparkExecutable.setJobId(jobId);
+
+        return sparkExecutable;
+    }
+
+    private UpdateCubeInfoAfterOptimizeStep createUpdateCubeInfoAfterOptimizeStep(String jobId) {
+        final UpdateCubeInfoAfterOptimizeStep result = new UpdateCubeInfoAfterOptimizeStep();
+        result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+
+        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+        CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
+
+        return result;
+    }
+
+    private void addLayerCubingSteps(CubingJob result, String jobId, CuboidModeEnum mode, String input, String output) {
+        final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig());
+        sparkExecutable.setClassName(SparkCubingByLayerForOpt.class.getName());
+        configureSparkJob(seg, sparkExecutable, jobId, input, output, mode);
+        result.addTask(sparkExecutable);
+    }
+
+    private SparkExecutable createUpdateShardForOldCuboidDataStep(String inputPath, String outputPath, String jobId) {
+        final SparkExecutable sparkExecutable = SparkExecutableFactory.instance(seg.getConfig());
+        sparkExecutable.setName(ExecutableConstants.STEP_NAME_UPDATE_OLD_CUBOID_SHARD_SPARK);
+
+        sparkExecutable.setParam(SparkUpdateShardForOldCuboidDataStep.OPTION_CUBE_NAME.getOpt(),
+                seg.getRealization().getName());
+        sparkExecutable.setParam(SparkUpdateShardForOldCuboidDataStep.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+        sparkExecutable.setParam(SparkUpdateShardForOldCuboidDataStep.OPTION_INPUT_PATH.getOpt(), inputPath);
+        sparkExecutable.setParam(SparkUpdateShardForOldCuboidDataStep.OPTION_OUTPUT_PATH.getOpt(), outputPath);
+        sparkExecutable.setParam(SparkUpdateShardForOldCuboidDataStep.OPTION_META_URL.getOpt(),
+                getSegmentMetadataUrl(seg.getConfig(), jobId));
+        //need JobId in sparkExecutable
+        sparkExecutable.setJobId(jobId);
+        sparkExecutable.setClassName(SparkUpdateShardForOldCuboidDataStep.class.getName());
+
+        return sparkExecutable;
+    }
+
+    private MergeStatisticsWithOldStep createMergeStatisticsWithOldStep(String jobId, String optStatsPath,
+             String mergedStatisticsFolder) {
+        MergeStatisticsWithOldStep result = new MergeStatisticsWithOldStep();
+        result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS_WITH_OLD);
+
+        CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
+        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+        CubingExecutableUtil.setStatisticsPath(optStatsPath, result.getParams());
+        CubingExecutableUtil.setMergedStatisticsPath(mergedStatisticsFolder, result.getParams());
+
+        return result;
+    }
+
+    private AbstractExecutable createCopyDictionaryStep() {
+        CopyDictionaryStep result = new CopyDictionaryStep();
+        result.setName(ExecutableConstants.STEP_NAME_COPY_DICTIONARY);
+
+        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+        return result;
+    }
+
+    private void configureSparkJob(final CubeSegment seg, final SparkExecutable sparkExecutable, final String jobId,
+            final String input, String output, CuboidModeEnum mode) {
+
+        sparkExecutable.setParam(SparkCubingByLayerForOpt.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+        sparkExecutable.setParam(SparkCubingByLayerForOpt.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+        sparkExecutable.setParam(SparkCubingByLayerForOpt.OPTION_META_URL.getOpt(),
+                getSegmentMetadataUrl(seg.getConfig(), jobId));
+        sparkExecutable.setParam(SparkCubingByLayerForOpt.OPTION_OUTPUT_PATH.getOpt(), output);
+        sparkExecutable.setParam(SparkCubingByLayerForOpt.OPTION_INPUT_PATH.getOpt(), input);
+        sparkExecutable.setParam(SparkCubingByLayerForOpt.OPTION_CUBOID_MODE.getOpt(), mode.toString());
+        sparkExecutable.setJobId(jobId);
+
+        StringBuilder jars = new StringBuilder();
+
+        StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
+        sparkExecutable.setJars(jars.toString());
+        sparkExecutable.setName(ExecutableConstants.STEP_NAME_OPTIMIZE_SPARK_CUBE + ":" + seg.toString());
+    }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBuildDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBuildDictionary.java
index 25020eb..2d29e1a 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBuildDictionary.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBuildDictionary.java
@@ -93,14 +93,14 @@ public class SparkBuildDictionary extends AbstractApplication implements Seriali
     public static final Option OPTION_DICT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_DICT_PATH).hasArg()
             .isRequired(true).withDescription("Cube dictionary output path").create(BatchConstants.ARG_DICT_PATH);
     public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
-            .withDescription("Cube Segment Id").create("segmentId");
+            .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID);
     public static final Option OPTION_CUBING_JOB_ID = OptionBuilder
             .withArgName(BatchConstants.ARG_CUBING_JOB_ID).hasArg().isRequired(true)
             .withDescription("Cubing job id").create(BatchConstants.ARG_CUBING_JOB_ID);
     public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
             .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
     public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
-            .withDescription("HDFS metadata url").create("metaUrl");
+            .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
     public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUTPUT).hasArg()
             .isRequired(true).withDescription("counter output path").create(BatchConstants.ARG_COUNTER_OUTPUT);
 
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCalculateStatsFromBaseCuboidJob.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCalculateStatsFromBaseCuboidJob.java
new file mode 100644
index 0000000..838efc0
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCalculateStatsFromBaseCuboidJob.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinVersion;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
+import org.apache.kylin.cube.kv.RowKeyDecoder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.kylin.shaded.com.google.common.hash.HashFunction;
+import org.apache.kylin.shaded.com.google.common.hash.Hasher;
+import org.apache.kylin.shaded.com.google.common.hash.Hashing;
+import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import scala.Tuple2;
+
+public class SparkCalculateStatsFromBaseCuboidJob extends AbstractApplication implements Serializable {
+
+    protected static final Logger logger = LoggerFactory.getLogger(SparkCalculateStatsFromBaseCuboidJob.class);
+
+    protected Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+
+    public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true)
+            .create(BatchConstants.ARG_SEGMENT_ID);
+    public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+            .isRequired(true).create(BatchConstants.ARG_INPUT);
+    public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+            .isRequired(true).create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_META_URL = OptionBuilder.withArgName(BatchConstants.ARG_META_URL).hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
+    public static final Option OPTION_JOB_MODE = OptionBuilder.withArgName(BatchConstants.ARG_CUBOID_MODE).hasArg().isRequired(true)
+            .create(BatchConstants.ARG_CUBOID_MODE);
+    public static final Option OPTION_SAMPLING_PERCENT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg()
+            .isRequired(true).create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
+
+    private Options options;
+
+    private int samplingPercent;
+    private int rowCount = 0;
+    private long[] rowHashCodesLong = null;
+    //about details of the new algorithm, please see KYLIN-2518
+    private boolean isUsePutRowKeyToHllNewAlgorithm;
+
+    private HLLCounter[] allCuboidsHLL = null;
+    private Long[] cuboidIds;
+    private Integer[][] allCuboidsBitSet = null;
+    private HashFunction hf = null;
+
+    protected int nRowKey;
+    protected long baseCuboidId;
+
+    RowKeyDecoder rowKeyDecoder;
+
+    public SparkCalculateStatsFromBaseCuboidJob() {
+        options = new Options();
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_OUTPUT_PATH);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_META_URL);
+        options.addOption(OPTION_JOB_MODE);
+        options.addOption(OPTION_SAMPLING_PERCENT);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        String input = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+        String output = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+        String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        String jobMode = optionsHelper.getOptionValue(OPTION_JOB_MODE);
+        this.samplingPercent = Integer.parseInt(optionsHelper.getOptionValue(OPTION_SAMPLING_PERCENT));
+
+        SparkConf sparkConf = SparkUtil.setKryoSerializerInConf();
+        sparkConf.setAppName("Kylin_Calculate_Statics_From_BaseCuboid_Data_" + cubeName + "_With_Spark");
+
+        KylinSparkJobListener jobListener = new KylinSparkJobListener();
+        try (JavaSparkContext sc = new JavaSparkContext(sparkConf)) {
+            sc.sc().addSparkListener(jobListener);
+
+            HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(output));
+
+            SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+            KylinConfig config = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+            CubeSegment optSegment;
+            int cubeStatsHLLPrecision;
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                    .setAndUnsetThreadLocalConfig(config)) {
+
+                CubeManager cubeManager = CubeManager.getInstance(config);
+                CubeInstance cube = cubeManager.getCube(cubeName);
+                CubeDesc cubeDesc = cube.getDescriptor();
+                optSegment = cube.getSegmentById(segmentId);
+
+                baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId();
+                nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+
+                Set<Long> cuboids = cube.getCuboidsByMode(jobMode);
+                if (cuboids.size() == 0) {
+                    Set<Long> current = cube.getCuboidsByMode(CuboidModeEnum.CURRENT);
+                    current.removeAll(cube.getCuboidsRecommend());
+                    cuboids = current;
+                }
+                cuboidIds = cuboids.toArray(new Long[cuboids.size()]);
+                allCuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, nRowKey);
+                cubeStatsHLLPrecision = config.getCubeStatsHLLPrecision();
+
+
+                allCuboidsHLL = new HLLCounter[cuboidIds.length];
+                for (int i = 0; i < cuboidIds.length; i++) {
+                    allCuboidsHLL[i] = new HLLCounter(cubeStatsHLLPrecision);
+                }
+
+                //for KYLIN-2518 backward compatibility
+                if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
+                    isUsePutRowKeyToHllNewAlgorithm = false;
+                    hf = Hashing.murmur3_32();
+                    logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion());
+                } else {
+                    isUsePutRowKeyToHllNewAlgorithm = true;
+                    rowHashCodesLong = new long[nRowKey];
+                    hf = Hashing.murmur3_128();
+                    logger.info(
+                            "Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518",
+                            cubeDesc.getVersion());
+                }
+            }
+
+            JavaPairRDD<Text, Text> inputRDD = sc.sequenceFile(input, Text.class, Text.class);
+
+            JavaPairRDD<Text, Text> afterMapRDD = inputRDD.mapPartitionsToPair(
+                    new SparkFunction.PairFlatMapFunctionBase<Iterator<Tuple2<Text, Text>>, Text, Text>() {
+                        @Override
+                        protected void doInit() {
+                            KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+                            CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+                            CubeSegment cubeSeg = cubeInstance.getSegmentById(segmentId);
+                            rowKeyDecoder = new RowKeyDecoder(cubeSeg);
+                        }
+
+                        @Override
+                        protected Iterator<Tuple2<Text, Text>> doCall(Iterator<Tuple2<Text, Text>> iterator)
+                                throws Exception {
+                            while (iterator.hasNext()) {
+                                Text key = iterator.next()._1();
+                                long cuboidID = rowKeyDecoder.decode(key.getBytes());
+                                if (cuboidID != baseCuboidId) {
+                                    continue; // Skip data from cuboids which are not the base cuboid
+                                }
+
+                                List<String> keyValues = rowKeyDecoder.getValues();
+
+                                if (rowCount < samplingPercent) {
+                                    Preconditions.checkArgument(nRowKey == keyValues.size());
+
+                                    String[] row = keyValues.toArray(new String[keyValues.size()]);
+                                    if (isUsePutRowKeyToHllNewAlgorithm) {
+                                        putRowKeyToHLLNew(row);
+                                    } else {
+                                        putRowKeyToHLLOld(row);
+                                    }
+                                }
+
+                                if (++rowCount == 100)
+                                    rowCount = 0;
+                            }
+
+                            ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+                            HLLCounter hll;
+                            List<Tuple2<Text, Text>> result = new ArrayList<>();
+
+                            for (int i = 0; i < cuboidIds.length; i++) {
+                                hll = allCuboidsHLL[i];
+                                Text outputKey = new Text();
+                                Text outputValue = new Text();
+
+                                outputKey.set(Bytes.toBytes(cuboidIds[i]));
+                                logger.info("Cuboid id to be processed1: " + cuboidIds[i]);
+                                hllBuf.clear();
+                                hll.writeRegisters(hllBuf);
+                                outputValue.set(hllBuf.array(), 0, hllBuf.position());
+                                logger.info("Cuboid id to be processed1: " + cuboidIds[i] + "value is "
+                                        + hllBuf.array().toString());
+                                result.add(new Tuple2<Text, Text>(outputKey, outputValue));
+                                logger.info("result size: " + result.size());
+                                for (Tuple2<Text, Text> t : result) {
+                                    logger.info("result key: " + t._1().toString());
+                                    logger.info("result values: " + t._2.toString());
+                                }
+                            }
+                            return result.iterator();
+                        }
+
+
+                    });
+
+            afterMapRDD.groupByKey().foreach(new SparkFunction.VoidFunctionBase<Tuple2<Text, Iterable<Text>>>() {
+                @Override
+                protected void doInit() {
+                    KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+                    KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig);
+                }
+
+                @Override
+                protected void doCall(Tuple2<Text, Iterable<Text>> v1) throws Exception {
+                    Text key = v1._1();
+                    Iterable<Text> values = v1._2();
+
+                    long cuboidId = Bytes.toLong(key.getBytes());
+                    logger.info("Cuboid id to be processed: " + cuboidId);
+
+                    List<Long> baseCuboidRowCountInMappers = Lists.newArrayList();
+                    long totalRowsBeforeMerge = 0;
+
+                    for (Text value : values) {
+                        HLLCounter hll = new HLLCounter(cubeStatsHLLPrecision);
+                        ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
+                        hll.readRegisters(bf);
+
+                        if (cuboidId == baseCuboidId) {
+                            baseCuboidRowCountInMappers.add(hll.getCountEstimate());
+                        }
+
+                        totalRowsBeforeMerge += hll.getCountEstimate();
+
+                        if (cuboidHLLMap.get(cuboidId) != null) {
+                            cuboidHLLMap.get(cuboidId).merge(hll);
+                        } else {
+                            cuboidHLLMap.put(cuboidId, hll);
+                        }
+                    }
+
+                    long grandTotal = 0;
+                    for (HLLCounter hll : cuboidHLLMap.values()) {
+                        grandTotal += hll.getCountEstimate();
+                    }
+                    double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
+
+                    logger.info("writer cuboIdstatic to " + output);
+                    CubeStatsWriter.writePartialCuboidStatistics(sConf.get(), new Path(output), cuboidHLLMap,
+                            samplingPercent, baseCuboidRowCountInMappers.size(), mapperOverlapRatio,
+                            TaskContext.getPartitionId());
+                }
+            });
+        }
+
+    }
+
+    private void putRowKeyToHLLOld(String[] row) {
+        //generate hash for each row key column
+        byte[][] rowHashCodes = new byte[nRowKey][];
+        for (int i = 0; i < nRowKey; i++) {
+            Hasher hc = hf.newHasher();
+            String colValue = row[i];
+            if (colValue != null) {
+                rowHashCodes[i] = hc.putUnencodedChars(colValue).hash().asBytes();
+            } else {
+                rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+            }
+        }
+
+        // use the row key column hash to get a consolidated hash for each cuboid
+        for (int i = 0; i < cuboidIds.length; i++) {
+            Hasher hc = hf.newHasher();
+            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+                hc.putBytes(rowHashCodes[allCuboidsBitSet[i][position]]);
+            }
+
+            allCuboidsHLL[i].add(hc.hash().asBytes());
+        }
+    }
+
+    private void putRowKeyToHLLNew(String[] row) {
+        //generate hash for each row key column
+        for (int i = 0; i < nRowKey; i++) {
+            Hasher hc = hf.newHasher();
+            String colValue = row[i];
+            if (colValue == null)
+                colValue = "0";
+            byte[] bytes = hc.putUnencodedChars(colValue).hash().asBytes();
+            rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+        }
+
+        // user the row key column hash to get a consolidated hash for each cuboid
+        for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+            long value = 0;
+            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+                value += rowHashCodesLong[allCuboidsBitSet[i][position]];
+            }
+            allCuboidsHLL[i].addHashDirectly(value);
+        }
+    }
+
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 4c9c391..ab0a565 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -96,14 +96,14 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
 
     public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
             .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
-    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
-            .withDescription("Cube Segment Id").create("segmentId");
-    public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
-            .withDescription("HDFS metadata url").create("metaUrl");
+    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
+            .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID);
+    public static final Option OPTION_META_URL = OptionBuilder.withArgName(BatchConstants.ARG_META_URL).hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
     public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
             .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
     public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
-            .withDescription("Hive Intermediate Table").create("hiveTable");
+            .withDescription("Hive Intermediate Table").create(BatchConstants.ARG_HIVE_TABLE);
     public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
             .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
     public static final Option OPTION_SHRUNK_INPUT_PATH = OptionBuilder
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerForOpt.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerForOpt.java
new file mode 100644
index 0000000..8857cc8
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerForOpt.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
+import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
+import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil;
+import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.storage.StorageLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+public class SparkCubingByLayerForOpt extends AbstractApplication implements Serializable {
+
+    protected static final Logger logger = LoggerFactory.getLogger(SparkCubingByLayerForOpt.class);
+
+    public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
+            .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID);
+    public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
+    public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+            .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+            .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
+    public static final Option OPTION_CUBOID_MODE = OptionBuilder.withArgName(BatchConstants.ARG_CUBOID_MODE).hasArg()
+            .isRequired(true).withDescription("CoboId Mode ").create(BatchConstants.ARG_CUBOID_MODE);
+
+    private Options options;
+
+    public SparkCubingByLayerForOpt() {
+        options = new Options();
+        options.addOption(OPTION_CUBOID_MODE);
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_META_URL);
+        options.addOption(OPTION_OUTPUT_PATH);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+        String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+        String cuboidMode = optionsHelper.getOptionValue(OPTION_CUBOID_MODE);
+
+        SparkConf sparkConf = SparkUtil.setKryoSerializerInConf();
+        sparkConf.setAppName("Kylin_Cubing_For_Optimize_" + cubeName + "_With_Spark");
+
+        KylinSparkJobListener jobListener = new KylinSparkJobListener();
+        try (JavaSparkContext sc = new JavaSparkContext(sparkConf)) {
+            SparkUtil.modifySparkHadoopConfiguration(sc.sc(), AbstractHadoopJob
+                    .loadKylinConfigFromHdfs(new SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set dfs.replication and enable compress
+            sc.sc().addSparkListener(jobListener);
+
+            final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+            final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+            final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+            final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+            final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+            final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig);
+
+            final Job job = Job.getInstance(sConf.get());
+            SparkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
+
+            StorageLevel storageLevel = StorageLevel.fromString(envConfig.getSparkStorageLevel());
+            JavaPairRDD<ByteArray, Object[]> baseCuboIdRDD = SparkUtil.getCuboIdRDDFromHdfs(sc, metaUrl, cubeName,
+                    cubeSegment, inputPath, cubeDesc.getMeasures().size(), sConf);
+
+            // Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime
+            final Set<Long> cuboidsByMode = cubeSegment.getCubeInstance().getCuboidsByMode(cuboidMode);
+            final int maxLevel = CuboidUtil.getLongestDepth(cuboidsByMode);
+
+            logger.info("cuboidMode" + cuboidMode);
+            logger.info("maxLevel" + maxLevel);
+
+            CuboidScheduler scheduler = CuboidSchedulerUtil.getCuboidSchedulerByMode(cubeSegment, cuboidMode);
+            
+            JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[maxLevel + 1];
+            allRDDs[0] = baseCuboIdRDD;
+
+            SparkCubingByLayer.BaseCuboidReducerFunction2 reducerFunction2 = new SparkCubingByLayer.BaseCuboidReducerFunction2(
+                    cubeName, metaUrl, sConf);
+
+            boolean allNormalMeasure = true;
+            boolean[] needAggr = new boolean[cubeDesc.getMeasures().size()];
+            for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
+                needAggr[i] = !cubeDesc.getMeasures().get(i).getFunction().getMeasureType().onlyAggrInBaseCuboid();
+                allNormalMeasure = allNormalMeasure && needAggr[i];
+            }
+
+            if (!allNormalMeasure) {
+                reducerFunction2 = new SparkCubingByLayer.CuboidReducerFunction2(cubeName, metaUrl, sConf, needAggr);
+            }
+
+            for (int i = 1; i <= maxLevel; i++) {
+                int partition = SparkUtil.estimateLayerPartitionNum(i, cubeStatsReader, envConfig);
+                allRDDs[i] = allRDDs[i - 1]
+                        .flatMapToPair(new CuboidFlatMap(cubeName, segmentId, metaUrl, sConf, scheduler))
+                        .reduceByKey(reducerFunction2, partition);
+                allRDDs[i].persist(storageLevel);
+                saveToHDFS(allRDDs[i], metaUrl, cubeName, cubeSegment, outputPath, i, job);
+                // must do unpersist after allRDDs[level] is created, otherwise this RDD will be recomputed
+                allRDDs[i - 1].unpersist(false);
+            }
+            allRDDs[maxLevel].unpersist(false);
+            logger.info("Finished on calculating needed cuboids For Optimize.");
+            logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
+
+        }
+
+    }
+
+    static public class CuboidFlatMap
+            extends SparkFunction.PairFlatMapFunctionBase<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> {
+
+        private String cubeName;
+        private String segmentId;
+        private String metaUrl;
+        private CubeSegment cubeSegment;
+        private CubeDesc cubeDesc;
+        private NDCuboidBuilder ndCuboidBuilder;
+        private RowKeySplitter rowKeySplitter;
+        private SerializableConfiguration conf;
+        private CuboidScheduler cuboidScheduler;
+
+        public CuboidFlatMap(String cubeName, String segmentId, String metaUrl, SerializableConfiguration conf,
+                CuboidScheduler scheduler) {
+            this.cubeName = cubeName;
+            this.segmentId = segmentId;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+            this.cuboidScheduler = scheduler;
+        }
+
+        @Override
+        protected void doInit() {
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoClose = KylinConfig.setAndUnsetThreadLocalConfig(kConfig)) {
+                CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+                this.cubeSegment = cubeInstance.getSegmentById(segmentId);
+                this.cubeDesc = cubeInstance.getDescriptor();
+                this.rowKeySplitter = new RowKeySplitter(cubeSegment);
+                this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new RowKeyEncoderProvider(cubeSegment));
+            }
+        }
+
+        @Override
+        public Iterator<Tuple2<ByteArray, Object[]>> doCall(final Tuple2<ByteArray, Object[]> tuple2) throws Exception {
+            byte[] key = tuple2._1().array();
+            long cuboidId = rowKeySplitter.parseCuboid(key);
+            final List<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId);
+
+            // if still empty or null
+            if (myChildren == null || myChildren.size() == 0) {
+                return EMTPY_ITERATOR.iterator();
+            }
+            rowKeySplitter.split(key);
+            final Cuboid parentCuboid = Cuboid.findForMandatory(cubeDesc, cuboidId);
+
+            List<Tuple2<ByteArray, Object[]>> tuples = new ArrayList(myChildren.size());
+            for (Long child : myChildren) {
+                Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, child);
+                ByteArray result = ndCuboidBuilder.buildKey2(parentCuboid, childCuboid,
+                        rowKeySplitter.getSplitBuffers());
+
+                tuples.add(new Tuple2<>(result, tuple2._2()));
+            }
+
+            return tuples.iterator();
+        }
+    }
+
+    private static final java.lang.Iterable<Tuple2<ByteArray, Object[]>> EMTPY_ITERATOR = new ArrayList(0);
+
+    protected void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final String metaUrl, final String cubeName,
+            final CubeSegment cubeSeg, final String hdfsBaseLocation, final int level, final Job job) throws Exception {
+        final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
+        final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
+
+        IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getOutputFormat();
+        outputFormat.configureJobOutput(job, cuboidOutputPath, cubeSeg, cubeSeg.getCuboidScheduler(), level);
+
+        rdd.mapToPair(new SparkFunction.PairFunctionBase<Tuple2<ByteArray, Object[]>, Text, Text>() {
+            private BufferedMeasureCodec codec;
+
+            @Override
+            protected void doInit() {
+                KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+                try (KylinConfig.SetAndUnsetThreadLocalConfig autoClose = KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig)) {
+                    CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+                    codec = new BufferedMeasureCodec(desc.getMeasures());
+                }
+            }
+
+            @Override
+            public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> doCall(
+                    Tuple2<ByteArray, Object[]> tuple2) throws Exception {
+                ByteBuffer valueBuf = codec.encode(tuple2._2());
+                org.apache.hadoop.io.Text textResult = new org.apache.hadoop.io.Text();
+                textResult.set(valueBuf.array(), 0, valueBuf.position());
+                return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1().array()), textResult);
+            }
+        }).saveAsNewAPIHadoopDataset(job.getConfiguration());
+        logger.info("Persisting RDD for level " + level + " into " + cuboidOutputPath);
+    }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
index 94f3a4e..71cd5aa 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -68,9 +68,9 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
     public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
             .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
     public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
-            .withDescription("Cube Segment Id").create("segmentId");
+            .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID);
     public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
-            .withDescription("HDFS metadata url").create("metaUrl");
+            .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
     public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
             .isRequired(true).withDescription("HFile output path").create(BatchConstants.ARG_OUTPUT);
     public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 66ae57e..fca1839 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -210,7 +210,7 @@ public class SparkExecutable extends AbstractExecutable {
         if (!StringUtils.isEmpty(sparkJobId)) {
             return onResumed(sparkJobId, mgr);
         } else {
-            String cubeName = this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt());
+            String cubeName = this.getParam(BatchConstants.ARG_CUBE_NAME);
             CubeInstance cube;
             if (cubeName != null) {
                 cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
@@ -222,7 +222,7 @@ public class SparkExecutable extends AbstractExecutable {
                 config = cube.getConfig();
             } else {
                 // when loading hive table, we can't get cube name/config, so we get config from project.
-                String projectName = this.getParam(SparkColumnCardinality.OPTION_PRJ.getOpt());
+                String projectName = this.getParam(BatchConstants.ARG_PROJECT);
                 ProjectInstance projectInst = ProjectManager.getInstance(context.getConfig()).getProject(projectName);
                 config = projectInst.getConfig();
             }
@@ -253,7 +253,7 @@ public class SparkExecutable extends AbstractExecutable {
 
             if (cube != null && !isCreateFlatTable()) {
                 setAlgorithmLayer();
-                String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
+                String segmentID = this.getParam(BatchConstants.ARG_SEGMENT_ID);
                 CubeSegment segment = cube.getSegmentById(segmentID);
                 Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
                 dumpMetadata(segment, mergingSeg);
@@ -522,7 +522,7 @@ public class SparkExecutable extends AbstractExecutable {
         CubeDescTiretreeGlobalDomainDictUtil.cuboidJob(segment.getCubeDesc(), dumpList);
 
         JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig(),
-                this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt()));
+                this.getParam(BatchConstants.ARG_META_URL));
     }
 
     private void attachSegmentsMetadataWithDict(List<CubeSegment> segments) throws IOException {
@@ -539,7 +539,7 @@ public class SparkExecutable extends AbstractExecutable {
             CubeDescTiretreeGlobalDomainDictUtil.cuboidJob(segment.getCubeDesc(), dumpList);
         }
         JobRelatedMetaUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segments.get(0).getConfig(),
-                this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt()));
+                this.getParam(BatchConstants.ARG_META_URL));
     }
 
     protected void readCounters(final Map<String, String> info) {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
index a116cc8..d2020f1 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -110,11 +110,11 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
     public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
             .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
     public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
-            .withDescription("HDFS metadata url").create("metaUrl");
+            .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
     public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
             .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
     public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
-            .withDescription("Cube Segment Id").create("segmentId");
+            .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID);
     public static final Option OPTION_STATS_SAMPLING_PERCENT = OptionBuilder
             .withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(true)
             .withDescription("Statistics sampling percent").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFilterRecommendCuboidDataJob.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFilterRecommendCuboidDataJob.java
new file mode 100644
index 0000000..704dc8d
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFilterRecommendCuboidDataJob.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.MapReduceUtil;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import java.util.Set;
+
+import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase;
+import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld;
+import static org.apache.kylin.engine.spark.SparkUtil.configConvergeCuboidDataReduceOut;
+import static org.apache.kylin.engine.spark.SparkUtil.generateFilePath;
+
+public class SparkFilterRecommendCuboidDataJob extends AbstractApplication implements Serializable {
+
+    protected static final Logger logger = LoggerFactory.getLogger(SparkFilterRecommendCuboidDataJob.class);
+
+    public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true)
+            .create(BatchConstants.ARG_SEGMENT_ID);
+    public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+            .isRequired(true).create(BatchConstants.ARG_INPUT);
+    public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+            .isRequired(true).create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_META_URL = OptionBuilder.withArgName(BatchConstants.ARG_META_URL).hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
+
+    private Options options;
+
+    public SparkFilterRecommendCuboidDataJob() {
+        options = new Options();
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_OUTPUT_PATH);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_META_URL);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+        String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+        String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+
+        boolean enableSharding;
+        long baseCuboid;
+        Set<Long> recommendCuboids;
+
+        SparkConf sparkConf = SparkUtil.setKryoSerializerInConf();
+        sparkConf.setAppName("Kylin_Filter_Recommend_Cuboid_Data_" + cubeName + "_With_Spark");
+
+        KylinSparkJobListener jobListener = new KylinSparkJobListener();
+        try (JavaSparkContext sc = new JavaSparkContext(sparkConf)) {
+            sc.sc().addSparkListener(jobListener);
+
+            final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+            KylinConfig config = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+            HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new Path(outputPath));
+
+            CubeManager cubeManager = CubeManager.getInstance(config);
+            CubeInstance cube = cubeManager.getCube(cubeName);
+            CubeSegment optSegment = cube.getSegmentById(segmentId);
+            CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment);
+
+            enableSharding = originalSegment.isEnableSharding();
+            baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
+
+            recommendCuboids = cube.getCuboidsRecommend();
+            Preconditions.checkNotNull(recommendCuboids, "The recommend cuboid map could not be null");
+
+            FileSystem hdfs = FileSystem.get(sc.hadoopConfiguration());
+            if (!hdfs.exists(new Path(inputPath.substring(0, inputPath.length() - 1)))) {
+                throw new IOException("OldCuboIdFilePath " + inputPath + " does not exists");
+            }
+
+            // inputPath is oldcuboidRootPath
+            JavaPairRDD<Text, Text> inputRDD = sc.sequenceFile(inputPath, Text.class, Text.class);
+
+            logger.info("start to calculate nBaseReduceTasks");
+            Pair<Integer, Integer> taskNums = MapReduceUtil.getConvergeCuboidDataReduceTaskNums(originalSegment);
+            int reduceTasks = taskNums.getFirst();
+            int nBaseReduceTasks = taskNums.getSecond();
+            logger.info("nBaseReduceTasks is {}", nBaseReduceTasks);
+
+            final Job job = Job.getInstance(sConf.get());
+            SparkUtil.setHadoopConfForCuboid(job, originalSegment, metaUrl);
+
+            JavaPairRDD<Text, Text> baseCuboIdRDD = inputRDD.filter(new Function<Tuple2<Text, Text>, Boolean>() {
+                @Override
+                public Boolean call(Tuple2<Text, Text> v1) throws Exception {
+                    long cuboidId = RowKeySplitter.getCuboidId(v1._1.getBytes(), enableSharding);
+                    return cuboidId == baseCuboid;
+                }
+            });
+
+            configConvergeCuboidDataReduceOut(job, generateFilePath(PathNameCuboidBase, outputPath));
+            baseCuboIdRDD.coalesce(nBaseReduceTasks).saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+            JavaPairRDD<Text, Text> reuseCuboIdRDD = inputRDD.filter(new Function<Tuple2<Text, Text>, Boolean>() {
+                @Override
+                public Boolean call(Tuple2<Text, Text> v1) throws Exception {
+                    long cuboidId = RowKeySplitter.getCuboidId(v1._1.getBytes(), enableSharding);
+                    return recommendCuboids.contains(cuboidId);
+                }
+            });
+
+            configConvergeCuboidDataReduceOut(job, generateFilePath(PathNameCuboidOld, outputPath));
+            reuseCuboIdRDD.coalesce(reduceTasks).saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+        }
+    }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFunction.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFunction.java
new file mode 100644
index 0000000..39917a3
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFunction.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+public class SparkFunction {
+
+    protected static final Logger logger = LoggerFactory.getLogger(SparkFunction.class);
+
+    private static abstract class FunctionBase implements Serializable {
+        private volatile transient boolean initialized = false;
+        private transient int recordCounter;
+
+        protected abstract void doInit();
+
+        protected void init() {
+            if (!initialized) {
+                synchronized (SparkFunction.class) {
+                    if (!initialized) {
+                        logger.info("Start to do init for {}", this);
+                        doInit();
+                        initialized = true;
+                        recordCounter = 0;
+                    }
+                }
+            }
+            if (recordCounter++ % SparkUtil.getNormalRecordLogThreshold() == 0) {
+                logger.info("Accepting record with ordinal: " + recordCounter);
+                logger.info("Do call, available memory: {}m", MemoryBudgetController.getSystemAvailMB());
+            }
+        }
+    }
+
+    public static abstract class PairFunctionBase<T, K, V> extends FunctionBase implements PairFunction<T, K, V> {
+
+        protected abstract Tuple2<K, V> doCall(T t) throws Exception;
+
+        @Override
+        public Tuple2<K, V> call(T t) throws Exception {
+            init();
+            return doCall(t);
+        }
+    }
+
+    public static abstract class Function2Base<T1, T2, R> extends FunctionBase implements Function2<T1, T2, R> {
+
+        protected abstract R doCall(T1 v1, T2 v2) throws Exception;
+
+        @Override
+        public R call(T1 v1, T2 v2) throws Exception {
+            init();
+            return doCall(v1, v2);
+        }
+    }
+
+    public static abstract class PairFlatMapFunctionBase<T, K, V> extends FunctionBase implements PairFlatMapFunction<T, K, V> {
+
+        protected abstract Iterator<Tuple2<K, V>> doCall(T t) throws Exception;
+
+        @Override
+        public Iterator<Tuple2<K, V>> call(T t) throws Exception {
+            init();
+            return doCall(t);
+        }
+    }
+
+    public static abstract class VoidFunctionBase<T> extends FunctionBase implements VoidFunction<T> {
+
+        protected abstract void doCall(T t) throws Exception;
+
+        @Override
+        public void call(T t) throws Exception {
+            init();
+            doCall(t);
+        }
+    }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
index ddd0755..4a4bbb0 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
@@ -82,9 +82,9 @@ public class SparkMergingDictionary extends AbstractApplication implements Seria
     public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
             .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
     public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
-            .withDescription("Cube Segment Id").create("segmentId");
+            .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID);
     public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
-            .withDescription("HDFS metadata url").create("metaUrl");
+            .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
     public static final Option OPTION_MERGE_SEGMENT_IDS = OptionBuilder.withArgName("segmentIds").hasArg()
             .isRequired(true).withDescription("Merging Cube Segment Ids").create("segmentIds");
     public static final Option OPTION_OUTPUT_PATH_DICT = OptionBuilder.withArgName("dictOutputPath").hasArg()
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUHCDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUHCDictionary.java
index 0662abb..f1c4419 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUHCDictionary.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUHCDictionary.java
@@ -76,11 +76,11 @@ public class SparkUHCDictionary extends AbstractApplication implements Serializa
     public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
             .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
     public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
-            .withDescription("HDFS metadata url").create("metaUrl");
+            .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
     public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
             .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
     public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
-            .withDescription("Cube Segment Id").create("segmentId");
+            .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID);
     public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
             .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
     public static final Option OPTION_CUBING_JOB_ID = OptionBuilder
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUpdateShardForOldCuboidDataStep.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUpdateShardForOldCuboidDataStep.java
new file mode 100644
index 0000000..f153459
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUpdateShardForOldCuboidDataStep.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowKeyEncoder;
+import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.MapReduceUtil;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.Serializable;
+
+import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase;
+import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld;
+import static org.apache.kylin.engine.spark.SparkUtil.configConvergeCuboidDataReduceOut;
+import static org.apache.kylin.engine.spark.SparkUtil.generateFilePath;
+
+public class SparkUpdateShardForOldCuboidDataStep extends AbstractApplication implements Serializable {
+
+    protected static final Logger logger = LoggerFactory.getLogger(SparkUpdateShardForOldCuboidDataStep.class);
+
+    public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true)
+            .create(BatchConstants.ARG_SEGMENT_ID);
+    public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+            .isRequired(true).create(BatchConstants.ARG_INPUT);
+    public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+            .isRequired(true).create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
+
+    private Options options;
+    private CubeDesc cubeDesc;
+    private RowKeySplitter rowKeySplitter;
+    private RowKeyEncoderProvider rowKeyEncoderProvider;
+
+    private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];
+    private ByteArray newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
+
+    public SparkUpdateShardForOldCuboidDataStep() {
+        options = new Options();
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_OUTPUT_PATH);
+        options.addOption(OPTION_META_URL);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        // input is optimizeCuboidRootPath + "*",  output is cuboidRootPath.
+        String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+        String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+        String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+
+        String baseCuboIdInputPath = inputPath + PathNameCuboidBase;
+        String oldCuboIdInputPath = inputPath + PathNameCuboidOld;
+
+        SparkConf sparkConf = SparkUtil.setKryoSerializerInConf();
+        sparkConf.setAppName("Update_Old_Cuboid_Shard_for_Optimization" + cubeName + "_With_Spark");
+
+        KylinSparkJobListener jobListener = new KylinSparkJobListener();
+        try (JavaSparkContext sc = new JavaSparkContext(sparkConf)) {
+            sc.sc().addSparkListener(jobListener);
+
+            final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+            KylinConfig config = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+            HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new Path(outputPath));
+
+            CubeManager cubeManager = CubeManager.getInstance(config);
+            CubeInstance cube = cubeManager.getCube(cubeName);
+            CubeSegment optSegment = cube.getSegmentById(segmentId);
+            CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment);
+            //
+            optSegment.setCubeInstance(originalSegment.getCubeInstance());
+
+            JavaPairRDD<Text, Text> baseCuboIdRDD = sc.sequenceFile(baseCuboIdInputPath, Text.class, Text.class);
+            JavaPairRDD<Text, Text> oldCuboIdRDD = sc.sequenceFile(oldCuboIdInputPath, Text.class, Text.class);
+
+            cubeDesc = cube.getDescriptor();
+
+            logger.info("start to calculate nBaseReduceTasks");
+            Pair<Integer, Integer> taskNums = MapReduceUtil.getConvergeCuboidDataReduceTaskNums(originalSegment);
+            int reduceTasks = taskNums.getFirst();
+            int nBaseReduceTasks = taskNums.getSecond();
+            logger.info("nBaseReduceTasks is {}", nBaseReduceTasks);
+
+            final Job job = Job.getInstance(sConf.get());
+            SparkUtil.setHadoopConfForCuboid(job, originalSegment, metaUrl);
+
+            //UpdateCuboidShard for baseCuboId
+            JavaPairRDD<Text, Text> mapBaseCuboIdRDD = baseCuboIdRDD.mapToPair(new SparkFunction.PairFunctionBase<Tuple2<Text, Text>, Text, Text>() {
+                @Override
+                protected void doInit() {
+                    initMethod(sConf, metaUrl, cubeName, optSegment, originalSegment);
+                }
+
+                @Override
+                protected Tuple2<Text, Text> doCall(Tuple2<Text, Text> tuple2) throws Exception {
+                    Text outputKey = new Text();
+                    long cuboidID = rowKeySplitter.split(tuple2._1.getBytes());
+
+                    Cuboid cuboid = new Cuboid(cubeDesc, cuboidID, cuboidID);
+                    int fullKeySize = buildKey(cuboid, rowKeySplitter.getSplitBuffers());
+                    outputKey.set(newKeyBuf.array(), 0, fullKeySize);
+                    return new Tuple2<Text, Text>(outputKey, tuple2._2);
+                }
+            });
+
+            configConvergeCuboidDataReduceOut(job, generateFilePath(PathNameCuboidBase, outputPath));
+            mapBaseCuboIdRDD.repartition(nBaseReduceTasks).saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+            //UpdateCuboidShard for oldCuboIds
+            JavaPairRDD<Text, Text> mapOldCuboIdRDD = oldCuboIdRDD.mapToPair(new SparkFunction.PairFunctionBase<Tuple2<Text, Text>, Text, Text>() {
+                @Override
+                protected void doInit() {
+                    initMethod(sConf, metaUrl, cubeName, optSegment, originalSegment);
+                }
+
+                @Override
+                protected Tuple2<Text, Text> doCall(Tuple2<Text, Text> tuple2) throws Exception {
+                    Text outputKey = new Text();
+                    long cuboidID = rowKeySplitter.split(tuple2._1.getBytes());
+
+                    Cuboid cuboid = new Cuboid(cubeDesc, cuboidID, cuboidID);
+                    int fullKeySize = buildKey(cuboid, rowKeySplitter.getSplitBuffers());
+                    outputKey.set(newKeyBuf.array(), 0, fullKeySize);
+                    return new Tuple2<Text, Text>(outputKey, tuple2._2);
+                }
+            });
+
+            configConvergeCuboidDataReduceOut(job, generateFilePath(PathNameCuboidOld, outputPath));
+            mapOldCuboIdRDD.repartition(reduceTasks).saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+            //SparkUtil.convergeCuboidDataReduce(mapRDD, cube, originalSegment, outputPath, metaUrl, config, sConf);
+
+        }
+    }
+
+    private int buildKey(Cuboid cuboid, ByteArray[] splitBuffers) {
+        RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
+
+        int startIdx = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId
+        int endIdx = startIdx + Long.bitCount(cuboid.getId());
+        int offset = 0;
+        for (int i = startIdx; i < endIdx; i++) {
+            System.arraycopy(splitBuffers[i].array(), splitBuffers[i].offset(), newKeyBodyBuf, offset,
+                    splitBuffers[i].length());
+            offset += splitBuffers[i].length();
+        }
+
+        int fullKeySize = rowkeyEncoder.getBytesLength();
+        while (newKeyBuf.array().length < fullKeySize) {
+            newKeyBuf = new ByteArray(newKeyBuf.length() * 2);
+        }
+        newKeyBuf.setLength(fullKeySize);
+
+        rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), newKeyBuf);
+
+        return fullKeySize;
+    }
+
+    private void initMethod(SerializableConfiguration sConf, String metaUrl, String cubeName, CubeSegment optSegment, CubeSegment originalSegment) {
+        KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+        KylinConfig.setKylinConfigInEnvIfMissing(kylinConfig.exportToProperties());
+        CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+        CubeSegment originalSeg = cubeInstance.getSegmentById(originalSegment.getUuid());
+        CubeSegment optSeg = cubeInstance.getSegmentById(optSegment.getUuid());
+        rowKeySplitter = new RowKeySplitter(originalSeg);
+        rowKeyEncoderProvider = new RowKeyEncoderProvider(optSeg);
+    }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index d146c85..66ff734 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -20,7 +20,9 @@ package org.apache.kylin.engine.spark;
 
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
@@ -30,22 +32,31 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.ShrunkenDictionary;
 import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.storage.StorageFactory;
+import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
@@ -61,11 +72,16 @@ import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.hive.HiveUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 public class SparkUtil {
 
     private static final Logger logger = LoggerFactory.getLogger(SparkUtil.class);
 
+    public static int getNormalRecordLogThreshold() {
+        return 1000;
+    }
+
     public static ISparkBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
         IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
         return (ISparkBatchCubingInputSide)SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchCubingInputSide(flatDesc);
@@ -83,8 +99,8 @@ public class SparkUtil {
         return (ISparkBatchMergeInputSide)SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchMergeInputSide(seg);
     }
 
-    public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
-        return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg);
+    public static ISparkOutput.ISparkBatchOptimizeOutputSide getBatchOptimizeOutputSide2(CubeSegment seg) {
+        return StorageFactory.createEngineAdapter(seg, ISparkOutput.class).getBatchOptimizeOutputSide(seg);
     }
 
     /**
@@ -228,4 +244,81 @@ public class SparkUtil {
 
         return dictionaryMap;
     }
+
+    public static SparkConf setKryoSerializerInConf() throws ClassNotFoundException {
+        Class[] kryoClassArray = new Class<?>[] { Class.forName("scala.reflect.ClassTag$$anon$1"),
+                Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey"),
+                Class.forName("scala.collection.mutable.WrappedArray$ofRef"),
+                Class.forName("org.apache.hadoop.io.Text") };
+
+        SparkConf sparkConf = new SparkConf();
+
+        //serialization conf
+        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        sparkConf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
+        sparkConf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
+
+        return sparkConf;
+    }
+
+    public static String generateFilePath(String subOutputPath, String jobOutPath) {
+        logger.info(
+                "ConvergeCuboidDataReduce Out Path" + String.format(Locale.ROOT, "%s%s", jobOutPath, subOutputPath));
+        return String.format(Locale.ROOT, "%s%s", jobOutPath, subOutputPath);
+    }
+
+    public static JavaPairRDD<ByteArray, Object[]> getCuboIdRDDFromHdfs(JavaSparkContext sc, final String metaUrl,
+                                                                        final String cubeName, final CubeSegment seg, final String inputPath, final int measureNum,
+                                                                        SerializableConfiguration sConf) {
+
+        JavaPairRDD<Text, Text> rdd = sc.sequenceFile(inputPath, Text.class, Text.class);
+        // re-encode
+        return rdd.mapToPair(new ReEncodeCuboidFunction(cubeName, seg.getUuid(), metaUrl, sConf, measureNum));
+    }
+
+    static class ReEncodeCuboidFunction
+            extends SparkFunction.PairFunctionBase<Tuple2<Text, Text>, ByteArray, Object[]> {
+        private String cubeName;
+        private String sourceSegmentId;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private int measureNum;
+        private transient KylinConfig kylinConfig;
+        private transient BufferedMeasureCodec segmentReEncoder = null;
+
+        ReEncodeCuboidFunction(String cubeName, String sourceSegmentId, String metaUrl, SerializableConfiguration conf,
+                int measureNum) {
+            this.cubeName = cubeName;
+            this.sourceSegmentId = sourceSegmentId;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+            this.measureNum = measureNum;
+        }
+
+        @Override
+        protected void doInit() {
+            this.kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                    .setAndUnsetThreadLocalConfig(kylinConfig)) {
+                final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+                final CubeDesc cubeDesc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cube.getDescName());
+                final CubeSegment sourceSeg = cube.getSegmentById(sourceSegmentId);
+                this.segmentReEncoder = new BufferedMeasureCodec(cubeDesc.getMeasures());
+            }
+        }
+
+        @Override
+        public Tuple2<ByteArray, Object[]> doCall(Tuple2<Text, Text> textTextTuple2) throws Exception {
+            Object[] result = new Object[measureNum];
+            segmentReEncoder.decode(ByteBuffer.wrap(textTextTuple2._2().getBytes(), 0, textTextTuple2._2().getLength()),
+                    result);
+            return new Tuple2<ByteArray, Object[]>(new ByteArray(textTextTuple2._1().getBytes()), result);
+        }
+    }
+
+    public static void configConvergeCuboidDataReduceOut(Job job, String output) throws IOException {
+        Path outputPath = new Path(output);
+        FileOutputFormat.setOutputPath(job, outputPath);
+        HadoopUtil.deletePath(job.getConfiguration(), outputPath);
+    }
 }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
index 03ae3f2..1ea5f2c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java
@@ -98,7 +98,7 @@ public class BulkLoadJob extends AbstractHadoopJob {
 
         int ret = 0;
         if (count > 0) {
-            logger.debug("Start to run LoadIncrementalHFiles");
+            logger.debug("Start to run LoadIncrementalHFiles, File count is: " + count);
             ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
             logger.debug("End to run LoadIncrementalHFiles");
             return ret;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index cfeec57..e9af5e0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -102,6 +102,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         // for cube planner, will keep cuboidSizeMap unchanged if cube planner is disabled
         Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName);
         if (buildingCuboids != null && !buildingCuboids.isEmpty()) {
+            logger.info("CreateHTableJob buildingCuboids size: " + buildingCuboids.size());
             Map<Long, Double> optimizedCuboidSizeMap = Maps.newHashMapWithExpectedSize(buildingCuboids.size());
             for (Long cuboid : buildingCuboids) {
                 Double cuboidSize = cuboidSizeMap.get(cuboid);
@@ -109,6 +110,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
                     logger.warn("{} cuboid's size is null will replace by 0", cuboid);
                     cuboidSize = 0.0;
                 }
+                logger.info("Cuboid:" + cuboid + " size: " + cuboidSize);
                 optimizedCuboidSizeMap.put(cuboid, cuboidSize);
             }
             cuboidSizeMap = optimizedCuboidSizeMap;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
index 3702a92..b0b47dd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
@@ -21,6 +21,7 @@ package org.apache.kylin.storage.hbase.steps;
 import java.util.List;
 
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
 import org.apache.kylin.engine.spark.ISparkOutput;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.slf4j.Logger;
@@ -98,6 +99,29 @@ public class HBaseSparkOutputTransition implements ISparkOutput {
     }
 
     public ISparkBatchOptimizeOutputSide getBatchOptimizeOutputSide(final CubeSegment seg) {
-        return null;
+        return new ISparkBatchOptimizeOutputSide() {
+            HBaseSparkSteps steps = new HBaseSparkSteps(seg);
+
+            @Override
+            public void addStepPhase2_CreateHTable(DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.createCreateHTableStep(jobFlow.getId(), CuboidModeEnum.RECOMMEND));
+            }
+
+            @Override
+            public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
+                jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+            }
+
+            @Override
+            public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+                steps.addOptimizeGarbageCollectionSteps(jobFlow);
+            }
+
+            @Override
+            public void addStepPhase5_Cleanup(DefaultChainedExecutable jobFlow) {
+                steps.addCheckpointGarbageCollectionSteps(jobFlow);
+            }
+        };
     }
 }
\ No newline at end of file
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index d458b8c..807d23f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -82,9 +82,9 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
     public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
             .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
     public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
-            .withDescription("Cube Segment Id").create("segmentId");
+            .withDescription("Cube Segment Id").create(BatchConstants.ARG_SEGMENT_ID);
     public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
-            .withDescription("HDFS metadata url").create("metaUrl");
+            .withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
     public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
             .isRequired(true).withDescription("HFile output path").create(BatchConstants.ARG_OUTPUT);
     public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()