You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/16 15:03:01 UTC

[kylin] branch master updated (a565fda -> d5affd9)

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from a565fda  KYLIN-3427 Bug fix for covnerting to HFile in Spark
     new 4bda69e  KYLIN-3441 Merge cube segments in Spark
     new d5affd9  KYLIN-3438 mapreduce.job.queuename does not work at 'Convert Cuboid Data to HFile' Step

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/kylin/common/KylinConfig.java  |   4 +
 .../org/apache/kylin/common/util/HadoopUtil.java   |   7 +
 .../java/org/apache/kylin/cube/CubeInstance.java   |  10 +
 .../org/apache/kylin/cube/DimensionRangeInfo.java  |   3 +-
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |  13 ++
 .../kylin/engine/mr/steps/MergeCuboidMapper.java   | 196 ++--------------
 ...ergeCuboidMapper.java => SegmentReEncoder.java} | 178 ++++++++-------
 .../kylin/engine/spark/KylinKryoRegistrator.java   |   8 +
 .../kylin/engine/spark/KylinSparkJobListener.java  |   2 +-
 .../engine/spark/SparkBatchMergeJobBuilder2.java   |  45 +++-
 .../kylin/engine/spark/SparkCubingByLayer.java     |  28 +--
 .../kylin/engine/spark/SparkCubingMerge.java       | 251 +++++++++++++++++++++
 .../apache/kylin/engine/spark/SparkExecutable.java |  23 +-
 .../org/apache/kylin/engine/spark/SparkUtil.java   |  63 +++++-
 .../template/cube_desc/kylin_sales_cube.json       |   3 +-
 examples/test_case_data/sandbox/kylin.properties   |   8 +-
 .../kylin/storage/hbase/steps/CubeHFileJob.java    |   3 +
 .../hbase/steps/HBaseMROutput2Transition.java      |  36 +--
 .../hbase/steps/HBaseSparkOutputTransition.java    |   3 -
 .../kylin/storage/hbase/steps/HBaseSparkSteps.java |  18 +-
 .../kylin/storage/hbase/steps/SparkCubeHFile.java  |  42 +---
 21 files changed, 573 insertions(+), 371 deletions(-)
 copy engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/{MergeCuboidMapper.java => SegmentReEncoder.java} (68%)
 mode change 100755 => 100644
 create mode 100644 engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java


[kylin] 02/02: KYLIN-3438 mapreduce.job.queuename does not work at 'Convert Cuboid Data to HFile' Step

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit d5affd97fd95b6e915b97502eb5639153a3d6365
Author: wyj <wu...@cmss.chinamobile.com>
AuthorDate: Sun Jul 15 00:40:11 2018 -0700

    KYLIN-3438 mapreduce.job.queuename does not work at 'Convert Cuboid Data to HFile' Step
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java   | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
index ce4ceac..5ffdd48 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
@@ -46,6 +46,8 @@ import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.hbase.HBaseConfiguration.merge;
+
 /**
  * @author George Song (ysong1)
  */
@@ -76,6 +78,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
 
             // use current hbase configuration
             Configuration configuration = HBaseConnection.getCurrentHBaseConfiguration();
+            merge(configuration, getConf());
             job = Job.getInstance(configuration, getOptionValue(OPTION_JOB_NAME));
 
             setJobClasspath(job, cube.getConfig());


[kylin] 01/02: KYLIN-3441 Merge cube segments in Spark

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 4bda69ebbe5f25cfa9f94310efcdbb734df486b3
Author: shaofengshi <sh...@apache.org>
AuthorDate: Wed Jul 11 18:40:35 2018 +0800

    KYLIN-3441 Merge cube segments in Spark
---
 .../java/org/apache/kylin/common/KylinConfig.java  |   4 +
 .../org/apache/kylin/common/util/HadoopUtil.java   |   7 +
 .../java/org/apache/kylin/cube/CubeInstance.java   |  10 +
 .../org/apache/kylin/cube/DimensionRangeInfo.java  |   3 +-
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |  13 ++
 .../kylin/engine/mr/steps/MergeCuboidMapper.java   | 196 ++--------------
 ...ergeCuboidMapper.java => SegmentReEncoder.java} | 178 ++++++++-------
 .../kylin/engine/spark/KylinKryoRegistrator.java   |   8 +
 .../kylin/engine/spark/KylinSparkJobListener.java  |   2 +-
 .../engine/spark/SparkBatchMergeJobBuilder2.java   |  45 +++-
 .../kylin/engine/spark/SparkCubingByLayer.java     |  28 +--
 .../kylin/engine/spark/SparkCubingMerge.java       | 251 +++++++++++++++++++++
 .../apache/kylin/engine/spark/SparkExecutable.java |  23 +-
 .../org/apache/kylin/engine/spark/SparkUtil.java   |  63 +++++-
 .../template/cube_desc/kylin_sales_cube.json       |   3 +-
 examples/test_case_data/sandbox/kylin.properties   |   8 +-
 .../hbase/steps/HBaseMROutput2Transition.java      |  36 +--
 .../hbase/steps/HBaseSparkOutputTransition.java    |   3 -
 .../kylin/storage/hbase/steps/HBaseSparkSteps.java |  18 +-
 .../kylin/storage/hbase/steps/SparkCubeHFile.java  |  42 +---
 20 files changed, 570 insertions(+), 371 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 4b3b7c3..e09ce26 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -414,6 +414,10 @@ public class KylinConfig extends KylinConfigBase {
         KylinConfig base = base();
         if (base != this)
             return base.getManager(clz);
+
+        if (managersCache == null) {
+            managersCache = new ConcurrentHashMap<>();
+        }
         
         Object mgr = managersCache.get(clz);
         if (mgr != null)
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index ed6ef89..3cb43fc 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Writable;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -186,4 +187,10 @@ public class HadoopUtil {
         }
         return result;
     }
+
+    public static void deleteHDFSMeta(String metaUrl) throws IOException {
+        String realHdfsPath = StorageURL.valueOf(metaUrl).getParameter("path");
+        HadoopUtil.getFileSystem(realHdfsPath).delete(new Path(realHdfsPath), true);
+        logger.info("Delete metadata in HDFS for this job: " + realHdfsPath);
+    }
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index c0f3536..ff86e7d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -707,4 +707,14 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
         return newCube;
     }
 
+    public static CubeSegment findSegmentWithJobId(String jobID, CubeInstance cubeInstance) {
+        for (CubeSegment segment : cubeInstance.getSegments()) {
+            String lastBuildJobID = segment.getLastBuildJobID();
+            if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) {
+                return segment;
+            }
+        }
+        throw new IllegalStateException("No segment's last build job ID equals " + jobID);
+    }
+
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
index 0b0d1c4..cde8d3f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.cube;
 
+import java.io.Serializable;
 import java.util.Map;
 
 import org.apache.kylin.metadata.datatype.DataTypeOrder;
@@ -31,7 +32,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.Maps;
 
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class DimensionRangeInfo {
+public class DimensionRangeInfo implements Serializable {
 
     private static final Logger logger = LoggerFactory.getLogger(DimensionRangeInfo.class);
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index a1b2cfe..42e0f42 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -20,6 +20,8 @@ package org.apache.kylin.engine.mr;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidModeEnum;
@@ -57,6 +59,7 @@ public class JobBuilderSupport {
     final public static String PathNameCuboidBase = "base_cuboid";
     final public static String PathNameCuboidOld = "old";
     final public static String PathNameCuboidInMem = "in_memory";
+    final public static Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
 
     public JobBuilderSupport(CubeSegment seg, String submitter) {
         Preconditions.checkNotNull(seg, "segment cannot be null");
@@ -324,4 +327,14 @@ public class JobBuilderSupport {
         return getRealizationRootPath(jobId) + "/metadata";
     }
 
+
+    public static String extractJobIDFromPath(String path) {
+        Matcher matcher = JOB_NAME_PATTERN.matcher(path);
+        // check the first occurrence
+        if (matcher.find()) {
+            return matcher.group(1);
+        } else {
+            throw new IllegalStateException("Can not extract job ID from file path : " + path);
+        }
+    }
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 0283c21..d63c8b8 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -19,41 +19,20 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SplittedBytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.common.RowKeySplitter;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.kv.RowKeyEncoder;
-import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.engine.mr.IMROutput2;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 /**
  * @author ysong1, honma
@@ -61,179 +40,34 @@ import com.google.common.collect.Maps;
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
-    private KylinConfig config;
-    private String cubeName;
-    private String segmentID;
-    private CubeManager cubeManager;
-    private CubeInstance cube;
-    private CubeDesc cubeDesc;
-    private CubeSegment mergedCubeSegment;
-    private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle
-
-    private Text outputKey = new Text();
-
-    private byte[] newKeyBodyBuf;
-    private ByteArray newKeyBuf;
-    private RowKeySplitter rowKeySplitter;
-    private RowKeyEncoderProvider rowKeyEncoderProvider;
-
-
-    // for re-encode measures that use dictionary
-    private List<Pair<Integer, MeasureIngester>> dictMeasures;
-    private Map<TblColRef, Dictionary<String>> oldDicts;
-    private Map<TblColRef, Dictionary<String>> newDicts;
-    private List<MeasureDesc> measureDescs;
-    private BufferedMeasureCodec codec;
-    private Object[] measureObjs;
-    private Text outputValue;
+    private SegmentReEncoder reEncoder;
+    private Pair<Text, Text> newPair;
 
     @Override
     protected void doSetup(Context context) throws IOException, InterruptedException {
         super.bindCurrentConfiguration(context.getConfiguration());
 
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
-        segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
-
-        config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
+        String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
 
-        cubeManager = CubeManager.getInstance(config);
-        cube = cubeManager.getCube(cubeName);
-        cubeDesc = cube.getDescriptor();
-        mergedCubeSegment = cube.getSegmentById(segmentID);
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
-        // int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
-        newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];// size will auto-grow
-        newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
+        CubeManager cubeManager = CubeManager.getInstance(config);
+        CubeInstance cube = cubeManager.getCube(cubeName);
+        CubeDesc cubeDesc = cube.getDescriptor();
+        CubeSegment mergedCubeSegment = cube.getSegmentById(segmentID);
 
         // decide which source segment
         FileSplit fileSplit = (FileSplit) context.getInputSplit();
-        IMROutput2.IMRMergeOutputFormat outputFormat = MRUtil.getBatchMergeOutputSide2(mergedCubeSegment).getOuputFormat();
-        sourceCubeSegment = outputFormat.findSourceSegment(fileSplit, cube);
-
-        rowKeySplitter = new RowKeySplitter(sourceCubeSegment);
-        rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
-
-        measureDescs = cubeDesc.getMeasures();
-        codec = new BufferedMeasureCodec(measureDescs);
-        measureObjs = new Object[measureDescs.size()];
-        outputValue = new Text();
-
-        dictMeasures = Lists.newArrayList();
-        oldDicts = Maps.newHashMap();
-        newDicts = Maps.newHashMap();
-        for (int i = 0; i < measureDescs.size(); i++) {
-            MeasureDesc measureDesc = measureDescs.get(i);
-            MeasureType measureType = measureDesc.getFunction().getMeasureType();
-            List<TblColRef> columns = measureType.getColumnsNeedDictionary(measureDesc.getFunction());
-            boolean needReEncode = false;
-            for (TblColRef col : columns) {
-                //handle the column that all records is null
-                if (sourceCubeSegment.getDictionary(col) == null) {
-                    continue;
-                }
-
-                oldDicts.put(col, sourceCubeSegment.getDictionary(col));
-                newDicts.put(col, mergedCubeSegment.getDictionary(col));
-                if (!sourceCubeSegment.getDictionary(col).equals(mergedCubeSegment.getDictionary(col))) {
-                    needReEncode = true;
-                }
-            }
-            if (needReEncode) {
-                dictMeasures.add(Pair.newPair(i, measureType.newIngester()));
-            }
-        }
+        IMROutput2.IMRMergeOutputFormat outputFormat = MRUtil.getBatchMergeOutputSide2(mergedCubeSegment)
+                .getOuputFormat();
+        CubeSegment sourceCubeSegment = outputFormat.findSourceSegment(fileSplit, cube);
+        reEncoder = new SegmentReEncoder(cubeDesc, sourceCubeSegment, mergedCubeSegment, config);
     }
 
     @Override
     public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
-        long cuboidID = rowKeySplitter.split(key.getBytes());
-        Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID);
-        RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
-
-        SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
-        int bufOffset = 0;
-        int bodySplitOffset = rowKeySplitter.getBodySplitOffset();
-
-        for (int i = 0; i < cuboid.getColumns().size(); ++i) {
-            int useSplit = i + bodySplitOffset;
-            TblColRef col = cuboid.getColumns().get(i);
-
-            if (cubeDesc.getRowkey().isUseDictionary(col)) {
-                // if dictionary on fact table column, needs rewrite
-                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-                Dictionary<String> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
-
-                // handle the dict of all merged segments is null
-                if (mergedDict == null) {
-                    continue;
-                }
-
-                Dictionary<String> sourceDict;
-                // handle the column that all records is null
-                if (sourceCubeSegment.getDictionary(col) == null) {
-                    BytesUtil.writeUnsigned(mergedDict.nullId(), newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-                    bufOffset += mergedDict.getSizeOfId();
-                    continue;
-                } else {
-                    sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
-                }
-
-                while (sourceDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || //
-                        mergedDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || //
-                        mergedDict.getSizeOfId() > newKeyBodyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBodyBuf;
-                    newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
-                }
-
-                int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0, splittedByteses[useSplit].length);
-                int idInMergedDict;
-
-                //int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
-                String v = sourceDict.getValueFromId(idInSourceDict);
-                if (v == null) {
-                    idInMergedDict = mergedDict.nullId();
-                } else {
-                    idInMergedDict = mergedDict.getIdFromValue(v);
-                }
-
-                BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-                bufOffset += mergedDict.getSizeOfId();
-            } else {
-                // keep as it is
-                while (splittedByteses[useSplit].length > newKeyBodyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBodyBuf;
-                    newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
-                }
-
-                System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset, splittedByteses[useSplit].length);
-                bufOffset += splittedByteses[useSplit].length;
-            }
-        }
-
-        int fullKeySize = rowkeyEncoder.getBytesLength();
-        while (newKeyBuf.array().length < fullKeySize) {
-            newKeyBuf = new ByteArray(newKeyBuf.length() * 2);
-        }
-        newKeyBuf.setLength(fullKeySize);
-
-        rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf);
-        outputKey.set(newKeyBuf.array(), 0, fullKeySize);
-
-        // re-encode measures if dictionary is used
-        if (dictMeasures.size() > 0) {
-            codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
-            for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
-                int i = pair.getFirst();
-                MeasureIngester ingester = pair.getSecond();
-                measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
-            }
-            ByteBuffer valueBuf = codec.encode(measureObjs);
-            outputValue.set(valueBuf.array(), 0, valueBuf.position());
-            value = outputValue;
-        }
-
-        context.write(outputKey, value);
+        newPair = reEncoder.reEncode(key, value);
+        context.write(newPair.getFirst(), newPair.getSecond());
     }
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java
old mode 100755
new mode 100644
similarity index 68%
copy from engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
copy to engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java
index 0283c21..856a59f
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java
@@ -6,33 +6,31 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -41,11 +39,6 @@ import org.apache.kylin.cube.kv.RowKeyEncoder;
 import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.engine.mr.IMROutput2;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.measure.MeasureType;
@@ -56,67 +49,44 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
- * @author ysong1, honma
+ * Re-encode the cuboid from old segment (before merge) to new segment (after merge).
  */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
-
-    private KylinConfig config;
-    private String cubeName;
-    private String segmentID;
-    private CubeManager cubeManager;
-    private CubeInstance cube;
-    private CubeDesc cubeDesc;
-    private CubeSegment mergedCubeSegment;
-    private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle
-
-    private Text outputKey = new Text();
+public class SegmentReEncoder implements Serializable {
+    private volatile transient boolean initialized = false;
+    private CubeSegment mergingSeg;
+    private CubeSegment mergedSeg;
 
     private byte[] newKeyBodyBuf;
     private ByteArray newKeyBuf;
     private RowKeySplitter rowKeySplitter;
     private RowKeyEncoderProvider rowKeyEncoderProvider;
 
-
     // for re-encode measures that use dictionary
     private List<Pair<Integer, MeasureIngester>> dictMeasures;
     private Map<TblColRef, Dictionary<String>> oldDicts;
     private Map<TblColRef, Dictionary<String>> newDicts;
     private List<MeasureDesc> measureDescs;
     private BufferedMeasureCodec codec;
-    private Object[] measureObjs;
-    private Text outputValue;
-
-    @Override
-    protected void doSetup(Context context) throws IOException, InterruptedException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
-        segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
-
-        config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
-        cubeManager = CubeManager.getInstance(config);
-        cube = cubeManager.getCube(cubeName);
-        cubeDesc = cube.getDescriptor();
-        mergedCubeSegment = cube.getSegmentById(segmentID);
+    private CubeDesc cubeDesc;
+    private KylinConfig kylinConfig;
+
+    public SegmentReEncoder(CubeDesc cubeDesc, CubeSegment mergingSeg, CubeSegment mergedSeg, KylinConfig kylinConfig) {
+        this.cubeDesc = cubeDesc;
+        this.mergingSeg = mergingSeg;
+        this.mergedSeg = mergedSeg;
+        this.kylinConfig = kylinConfig;
+        init();
+    }
 
-        // int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
+    private void init() {
         newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];// size will auto-grow
         newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
 
-        // decide which source segment
-        FileSplit fileSplit = (FileSplit) context.getInputSplit();
-        IMROutput2.IMRMergeOutputFormat outputFormat = MRUtil.getBatchMergeOutputSide2(mergedCubeSegment).getOuputFormat();
-        sourceCubeSegment = outputFormat.findSourceSegment(fileSplit, cube);
-
-        rowKeySplitter = new RowKeySplitter(sourceCubeSegment);
-        rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
+        rowKeySplitter = new RowKeySplitter(mergingSeg);
+        rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedSeg);
 
         measureDescs = cubeDesc.getMeasures();
         codec = new BufferedMeasureCodec(measureDescs);
-        measureObjs = new Object[measureDescs.size()];
-        outputValue = new Text();
 
         dictMeasures = Lists.newArrayList();
         oldDicts = Maps.newHashMap();
@@ -128,13 +98,13 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
             boolean needReEncode = false;
             for (TblColRef col : columns) {
                 //handle the column that all records is null
-                if (sourceCubeSegment.getDictionary(col) == null) {
+                if (mergingSeg.getDictionary(col) == null) {
                     continue;
                 }
 
-                oldDicts.put(col, sourceCubeSegment.getDictionary(col));
-                newDicts.put(col, mergedCubeSegment.getDictionary(col));
-                if (!sourceCubeSegment.getDictionary(col).equals(mergedCubeSegment.getDictionary(col))) {
+                oldDicts.put(col, mergingSeg.getDictionary(col));
+                newDicts.put(col, mergedSeg.getDictionary(col));
+                if (!mergingSeg.getDictionary(col).equals(mergedSeg.getDictionary(col))) {
                     needReEncode = true;
                 }
             }
@@ -142,10 +112,71 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
                 dictMeasures.add(Pair.newPair(i, measureType.newIngester()));
             }
         }
+        initialized = true;
+    }
+
+    /**
+     * Re-encode with both dimension and measure in encoded (Text) format.
+     * @param key
+     * @param value
+     * @return
+     * @throws IOException
+     */
+    public Pair<Text, Text> reEncode(Text key, Text value) throws IOException {
+        if (initialized == false) {
+            throw new IllegalStateException("Not initialized");
+        }
+        Object[] measureObjs = new Object[measureDescs.size()];
+        // re-encode measures if dictionary is used
+        if (dictMeasures.size() > 0) {
+            codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
+            for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
+                int i = pair.getFirst();
+                MeasureIngester ingester = pair.getSecond();
+                measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
+            }
+
+            ByteBuffer valueBuf = codec.encode(measureObjs);
+            byte[] resultValue = new byte[valueBuf.position()];
+            System.arraycopy(valueBuf.array(), 0, resultValue, 0, valueBuf.position());
+
+            return Pair.newPair(processKey(key), new Text(resultValue));
+        } else {
+            return Pair.newPair(processKey(key), value);
+        }
     }
 
-    @Override
-    public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
+    /**
+     * Re-encode with measures in Object[] format.
+     * @param key
+     * @param value
+     * @return
+     * @throws IOException
+     */
+    public Pair<Text, Object[]> reEncode2(Text key, Text value) throws IOException {
+        if (initialized == false) {
+            throw new IllegalStateException("Not initialized");
+        }
+
+        Object[] measureObjs = new Object[measureDescs.size()];
+        codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
+        // re-encode measures if dictionary is used
+        if (dictMeasures.size() > 0) {
+            for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
+                int i = pair.getFirst();
+                MeasureIngester ingester = pair.getSecond();
+                measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
+            }
+
+            ByteBuffer valueBuf = codec.encode(measureObjs);
+            byte[] resultValue = new byte[valueBuf.position()];
+            System.arraycopy(valueBuf.array(), 0, resultValue, 0, valueBuf.position());
+
+        }
+        return Pair.newPair(processKey(key), measureObjs);
+    }
+
+    private Text processKey(Text key) throws IOException {
         long cuboidID = rowKeySplitter.split(key.getBytes());
         Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID);
         RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
@@ -160,8 +191,8 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
             if (cubeDesc.getRowkey().isUseDictionary(col)) {
                 // if dictionary on fact table column, needs rewrite
-                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-                Dictionary<String> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
+                DictionaryManager dictMgr = DictionaryManager.getInstance(kylinConfig);
+                Dictionary<String> mergedDict = dictMgr.getDictionary(mergedSeg.getDictResPath(col));
 
                 // handle the dict of all merged segments is null
                 if (mergedDict == null) {
@@ -170,12 +201,12 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
                 Dictionary<String> sourceDict;
                 // handle the column that all records is null
-                if (sourceCubeSegment.getDictionary(col) == null) {
+                if (mergingSeg.getDictionary(col) == null) {
                     BytesUtil.writeUnsigned(mergedDict.nullId(), newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
                     bufOffset += mergedDict.getSizeOfId();
                     continue;
                 } else {
-                    sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
+                    sourceDict = dictMgr.getDictionary(mergingSeg.getDictResPath(col));
                 }
 
                 while (sourceDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || //
@@ -186,7 +217,8 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
                     System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
                 }
 
-                int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0, splittedByteses[useSplit].length);
+                int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0,
+                        splittedByteses[useSplit].length);
                 int idInMergedDict;
 
                 //int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
@@ -207,7 +239,8 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
                     System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
                 }
 
-                System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset, splittedByteses[useSplit].length);
+                System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset,
+                        splittedByteses[useSplit].length);
                 bufOffset += splittedByteses[useSplit].length;
             }
         }
@@ -219,21 +252,10 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         newKeyBuf.setLength(fullKeySize);
 
         rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf);
-        outputKey.set(newKeyBuf.array(), 0, fullKeySize);
 
-        // re-encode measures if dictionary is used
-        if (dictMeasures.size() > 0) {
-            codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
-            for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
-                int i = pair.getFirst();
-                MeasureIngester ingester = pair.getSecond();
-                measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
-            }
-            ByteBuffer valueBuf = codec.encode(measureObjs);
-            outputValue.set(valueBuf.array(), 0, valueBuf.position());
-            value = outputValue;
-        }
+        byte[] resultKey = new byte[fullKeySize];
+        System.arraycopy(newKeyBuf.array(), 0, resultKey, 0, fullKeySize);
 
-        context.write(outputKey, value);
+        return new Text(resultKey);
     }
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
index 90c52c2..a13f96a 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
@@ -96,6 +96,14 @@ public class KylinKryoRegistrator implements KryoRegistrator {
 
         kyroClasses.add(Class.class);
 
+        //shaded classes
+        addClassQuitely(kyroClasses, "org.apache.kylin.job.shaded.org.roaringbitmap.buffer.MutableRoaringArray");
+        addClassQuitely(kyroClasses, "org.apache.kylin.job.shaded.org.roaringbitmap.buffer.MutableRoaringBitmap");
+        addClassQuitely(kyroClasses, "org.apache.kylin.job.shaded.org.roaringbitmap.buffer.MappeableArrayContainer");
+        addClassQuitely(kyroClasses, "org.apache.kylin.job.shaded.org.roaringbitmap.buffer.MappeableBitmapContainer");
+        addClassQuitely(kyroClasses, "org.apache.kylin.job.shaded.org.roaringbitmap.buffer.ImmutableRoaringBitmap");
+        addClassQuitely(kyroClasses, "org.apache.kylin.job.shaded.org.roaringbitmap.buffer.ImmutableRoaringArray");
+
         addClassQuitely(kyroClasses, "com.google.common.collect.EmptyImmutableList");
         addClassQuitely(kyroClasses, "java.nio.HeapShortBuffer");
         addClassQuitely(kyroClasses, "java.nio.HeapLongBuffer");
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinSparkJobListener.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinSparkJobListener.java
index 7976c3b..b8821b6 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinSparkJobListener.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinSparkJobListener.java
@@ -29,7 +29,7 @@ public class KylinSparkJobListener extends SparkListener implements Serializable
 
     @Override
     public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
-        if (taskEnd.taskMetrics().outputMetrics() != null) {
+        if (taskEnd != null && taskEnd.taskMetrics() != null && taskEnd.taskMetrics().outputMetrics() != null) {
             metrics.bytesWritten += taskEnd.taskMetrics().outputMetrics().bytesWritten();
             metrics.recordsWritten += taskEnd.taskMetrics().outputMetrics().recordsWritten();
         }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
index b68f6e86..97861a3 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
@@ -18,11 +18,17 @@
 
 package org.apache.kylin.engine.spark;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.job.constant.ExecutableConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +67,9 @@ public class SparkBatchMergeJobBuilder2 extends JobBuilderSupport {
         result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
         outputSide.addStepPhase1_MergeDictionary(result);
 
+        // merge cube
+        result.addTask(createMergeCuboidDataStep(cubeSegment, mergingSegments, jobId));
+
         // Phase 2: Merge Cube Files
         outputSide.addStepPhase2_BuildCube(seg, mergingSegments, result);
 
@@ -70,5 +79,39 @@ public class SparkBatchMergeJobBuilder2 extends JobBuilderSupport {
 
         return result;
     }
-    
+
+    public SparkExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments, String jobID) {
+
+        final List<String> mergingCuboidPaths = Lists.newArrayList();
+        for (CubeSegment merging : mergingSegments) {
+            mergingCuboidPaths.add(getCuboidRootPath(merging));
+        }
+        String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
+        String outputPath = getCuboidRootPath(jobID);
+
+        final SparkExecutable sparkExecutable = new SparkExecutable();
+        sparkExecutable.setClassName(SparkCubingMerge.class.getName());
+        sparkExecutable.setParam(SparkCubingMerge.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+        sparkExecutable.setParam(SparkCubingMerge.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+        sparkExecutable.setParam(SparkCubingMerge.OPTION_INPUT_PATH.getOpt(), formattedPath);
+        sparkExecutable.setParam(SparkCubingMerge.OPTION_META_URL.getOpt(),
+                getSegmentMetadataUrl(seg.getConfig(), jobID));
+        sparkExecutable.setParam(SparkCubingMerge.OPTION_OUTPUT_PATH.getOpt(), outputPath);
+
+        sparkExecutable.setJobId(jobID);
+        sparkExecutable.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+
+        StringBuilder jars = new StringBuilder();
+
+        StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
+        sparkExecutable.setJars(jars.toString());
+
+        return sparkExecutable;
+    }
+
+    public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
+        Map<String, String> param = new HashMap<>();
+        param.put("path", getDumpMetadataPath(jobId));
+        return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "hdfs", param).toString();
+    }
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index f842fd5..0cc8a3b 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,7 +17,6 @@
 */
 package org.apache.kylin.engine.spark;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -34,7 +33,6 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
@@ -47,7 +45,6 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
 import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -137,7 +134,9 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
         conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
 
+        KylinSparkJobListener jobListener = new KylinSparkJobListener();
         JavaSparkContext sc = new JavaSparkContext(conf);
+        sc.sc().addSparkListener(jobListener);
         HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
         final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
 
@@ -221,7 +220,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
         JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels + 1];
         int level = 0;
-        int partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig);
+        int partition = SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
 
         // aggregate to calculate base cuboid
         allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, partition).persist(storageLevel);
@@ -230,7 +229,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
 
         // aggregate to ND cuboids
         for (level = 1; level <= totalLevels; level++) {
-            partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig);
+            partition = SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
             allRDDs[level] = allRDDs[level - 1].flatMapToPair(new CuboidFlatMap(cubeName, segmentId, metaUrl, sConf))
                     .reduceByKey(reducerFunction2, partition).persist(storageLevel);
             if (envConfig.isSparkSanityCheckEnabled() == true) {
@@ -241,7 +240,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         }
         allRDDs[totalLevels].unpersist();
         logger.info("Finished on calculating all level cuboids.");
-        //        deleteHDFSMeta(metaUrl);
+        logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
+        HadoopUtil.deleteHDFSMeta(metaUrl);
     }
 
     protected void setHadoopConf(Job job, CubeSegment segment, String metaUrl) throws Exception {
@@ -249,15 +249,6 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         job.setOutputValueClass(Text.class);
     }
 
-    protected int estimateRDDPartitionNum(int level, CubeStatsReader statsReader, KylinConfig kylinConfig) {
-        double baseCuboidSize = statsReader.estimateLayerSize(level);
-        float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
-        int partition = (int) (baseCuboidSize / rddCut);
-        partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
-        partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
-        logger.info("Partition for spark cubing: {}", partition);
-        return partition;
-    }
 
     protected JavaPairRDD<ByteArray, Object[]> prepareOutput(JavaPairRDD<ByteArray, Object[]> rdd, KylinConfig config,
             CubeSegment segment, int level) {
@@ -417,7 +408,6 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         private String metaUrl;
         private CubeSegment cubeSegment;
         private CubeDesc cubeDesc;
-        private CuboidScheduler cuboidScheduler;
         private NDCuboidBuilder ndCuboidBuilder;
         private RowKeySplitter rowKeySplitter;
         private volatile transient boolean initialized = false;
@@ -435,7 +425,6 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
             CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
             this.cubeSegment = cubeInstance.getSegmentById(segmentId);
             this.cubeDesc = cubeInstance.getDescriptor();
-            this.cuboidScheduler = cubeSegment.getCuboidScheduler();
             this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new RowKeyEncoderProvider(cubeSegment));
             this.rowKeySplitter = new RowKeySplitter(cubeSegment);
         }
@@ -508,9 +497,4 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         return count;
     }
 
-    protected void deleteHDFSMeta(String metaUrl) throws IOException {
-        String realHdfsPath = StorageURL.valueOf(metaUrl).getParameter("path");
-        HadoopUtil.getFileSystem(realHdfsPath).delete(new Path(realHdfsPath), true);
-        logger.info("Delete metadata in HDFS for this job: " + realHdfsPath);
-    }
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
new file mode 100644
index 0000000..8d78920
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringSplitter;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.engine.mr.steps.SegmentReEncoder;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import scala.Tuple2;
+
+/**
+ */
+public class SparkCubingMerge extends AbstractApplication implements Serializable {
+
+    protected static final Logger logger = LoggerFactory.getLogger(SparkCubingMerge.class);
+
+    public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
+            .withDescription("Cube Segment Id").create("segmentId");
+    public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create("metaUrl");
+    public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+            .isRequired(true).withDescription("HFile output path").create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+            .isRequired(true).withDescription("Cuboid files PATH").create(BatchConstants.ARG_INPUT);
+
+    private Options options;
+
+    public SparkCubingMerge() {
+        options = new Options();
+        options.addOption(OPTION_META_URL);
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_OUTPUT_PATH);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        final String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+        final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+
+        Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1") };
+
+        SparkConf conf = new SparkConf().setAppName("Merge segments for cube:" + cubeName + ", segment " + segmentId);
+        //serialization conf
+        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
+        conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
+
+        JavaSparkContext sc = new JavaSparkContext(conf);
+        KylinSparkJobListener jobListener = new KylinSparkJobListener();
+        sc.sc().addSparkListener(jobListener);
+
+        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+        final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+        final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+        final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+        final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig);
+
+        logger.info("Input path: {}", inputPath);
+        logger.info("Output path: {}", outputPath);
+
+        String[] inputFolders = StringSplitter.split(inputPath, ",");
+
+        FileSystem fs = HadoopUtil.getWorkingFileSystem();
+        List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayList();
+        for (int i = 0; i < inputFolders.length; i++) {
+            String path = inputFolders[i];
+            CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
+
+            // each folder will be a rdd, union them
+            List<JavaPairRDD> cuboidRdds = SparkUtil.parseInputPath(path, fs, sc, Text.class, Text.class);
+            JavaPairRDD<Text, Text> segRdd = sc.union(cuboidRdds.toArray(new JavaPairRDD[cuboidRdds.size()]));
+
+            // re-encode with new dictionaries
+            JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
+                    sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
+            mergingSegs.add(newEcoddedRdd);
+        }
+
+        final MeasureAggregators aggregators = new MeasureAggregators(cubeDesc.getMeasures());
+
+        // reduce
+        JavaPairRDD<Text, Object[]> mergedRdd = sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
+                .reduceByKey(new Function2<Object[], Object[], Object[]>() {
+                    @Override
+                    public Object[] call(Object[] input1, Object[] input2) throws Exception {
+                        Object[] measureObjs = new Object[input1.length];
+                        aggregators.aggregate(input1, input2, measureObjs);
+                        return measureObjs;
+                    }
+                }, SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig));
+
+        Configuration confOverwrite = new Configuration(sc.hadoopConfiguration());
+        confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2
+        final Job job = Job.getInstance(confOverwrite);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(Text.class);
+        FileOutputFormat.setOutputPath(job, new Path(outputPath));
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+        mergedRdd.mapToPair(
+                new PairFunction<Tuple2<Text, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
+                    private volatile transient boolean initialized = false;
+                    BufferedMeasureCodec codec;
+
+                    @Override
+                    public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(
+                            Tuple2<Text, Object[]> tuple2) throws Exception {
+
+                        if (initialized == false) {
+                            synchronized (SparkCubingByLayer.class) {
+                                if (initialized == false) {
+                                    KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+                                    CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+                                    codec = new BufferedMeasureCodec(desc.getMeasures());
+                                    initialized = true;
+                                }
+                            }
+                        }
+                        ByteBuffer valueBuf = codec.encode(tuple2._2());
+                        byte[] encodedBytes = new byte[valueBuf.position()];
+                        System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
+                        return new Tuple2<>(tuple2._1(), new org.apache.hadoop.io.Text(encodedBytes));
+                    }
+
+                }).saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+        // output the data size to console, job engine will parse and save the metric
+        // please note: this mechanism won't work when spark.submit.deployMode=cluster
+        logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
+        HadoopUtil.deleteHDFSMeta(metaUrl);
+    }
+
+    static class ReEncodCuboidFunction implements PairFunction<Tuple2<Text, Text>, Text, Object[]> {
+        private volatile transient boolean initialized = false;
+        private String cubeName;
+        private String sourceSegmentId;
+        private String mergedSegmentId;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private transient KylinConfig kylinConfig;
+        private transient SegmentReEncoder segmentReEncoder = null;
+        private transient Pair<Text, Object[]> encodedPari = null;
+
+        ReEncodCuboidFunction(String cubeName, String sourceSegmentId, String mergedSegmentId, String metaUrl,
+                SerializableConfiguration conf) {
+            this.cubeName = cubeName;
+            this.sourceSegmentId = sourceSegmentId;
+            this.mergedSegmentId = mergedSegmentId;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+        }
+
+        private void init() {
+            this.kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+            final CubeDesc cubeDesc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cube.getDescName());
+            final CubeSegment sourceSeg = cube.getSegmentById(sourceSegmentId);
+            final CubeSegment mergedSeg = cube.getSegmentById(mergedSegmentId);
+            this.segmentReEncoder = new SegmentReEncoder(cubeDesc, sourceSeg, mergedSeg, kylinConfig);
+        }
+
+        @Override
+        public Tuple2<Text, Object[]> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
+            if (initialized == false) {
+                synchronized (ReEncodCuboidFunction.class) {
+                    if (initialized == false) {
+                        init();
+                        initialized = true;
+                    }
+                }
+            }
+            encodedPari = segmentReEncoder.reEncode2(textTextTuple2._1, textTextTuple2._2);
+            return new Tuple2(encodedPari.getFirst(), encodedPari.getSecond());
+        }
+    }
+
+    private CubeSegment findSourceSegment(String filePath, CubeInstance cube) {
+        String jobID = JobBuilderSupport.extractJobIDFromPath(filePath);
+        return CubeInstance.findSegmentWithJobId(jobID, cube);
+    }
+
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 90442a4..961fd6c 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -20,7 +20,9 @@ package org.apache.kylin.engine.spark;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -44,6 +46,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.Segments;
 import org.slf4j.LoggerFactory;
 
 /**
@@ -132,9 +135,17 @@ public class SparkExecutable extends AbstractExecutable {
 
         String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
         CubeSegment segment = cube.getSegmentById(segmentID);
+        Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
 
         try {
-            attachSegmentMetadataWithDict(segment);
+            if (mergingSeg == null || mergingSeg.size() == 0) {
+                attachSegmentMetadataWithDict(segment);
+            } else {
+                List<CubeSegment> allRelatedSegs = new ArrayList();
+                allRelatedSegs.add(segment);
+                allRelatedSegs.addAll(mergingSeg);
+                attachSegmentsMetadataWithDict(allRelatedSegs);
+            }
         } catch (IOException e) {
             throw new ExecuteException("meta dump failed");
         }
@@ -197,6 +208,16 @@ public class SparkExecutable extends AbstractExecutable {
         dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig());
     }
 
+    private void attachSegmentsMetadataWithDict(List<CubeSegment> segments) throws IOException {
+        Set<String> dumpList = new LinkedHashSet<>();
+        dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segments.get(0).getCubeInstance()));
+        for (CubeSegment segment : segments) {
+            dumpList.addAll(segment.getDictionaryPaths());
+            dumpList.add(segment.getStatisticsResourcePath());
+        }
+        dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segments.get(0).getConfig());
+    }
+
     private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig)
             throws IOException {
         File tmp = File.createTempFile("kylin_job_meta", "");
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index a4e17c3..da207ee 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -18,17 +18,24 @@
 
 package org.apache.kylin.engine.spark;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.GenericOptionsParser;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.IMROutput2;
-import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.storage.StorageFactory;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import com.google.common.collect.Lists;
 
 public class SparkUtil {
 
@@ -37,10 +44,6 @@ public class SparkUtil {
         return SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchCubingInputSide(flatDesc);
     }
 
-    private static TableDesc getTableDesc(String tableName, String prj) {
-        return TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName, prj);
-    }
-
     public static ISparkOutput.ISparkBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
         return StorageFactory.createEngineAdapter(seg, ISparkOutput.class).getBatchCubingOutputSide(seg);
     }
@@ -57,7 +60,47 @@ public class SparkUtil {
         return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg);
     }
 
-    private static synchronized GenericOptionsParser getParser(Configuration conf, String[] args) throws Exception {
-        return new GenericOptionsParser(conf, args);
+    public static List<JavaPairRDD> parseInputPath(String inputPath, FileSystem fs, JavaSparkContext sc, Class keyClass,
+            Class valueClass) throws IOException {
+        List<JavaPairRDD> inputRDDs = Lists.newArrayList();
+        Path inputHDFSPath = new Path(inputPath);
+        FileStatus[] fileStatuses = fs.listStatus(inputHDFSPath);
+        boolean hasDir = false;
+        for (FileStatus stat : fileStatuses) {
+            if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
+                hasDir = true;
+                inputRDDs.add(sc.sequenceFile(stat.getPath().toString(), keyClass, valueClass));
+            }
+        }
+
+        if (!hasDir) {
+            inputRDDs.add(sc.sequenceFile(inputHDFSPath.toString(), keyClass, valueClass));
+        }
+
+        return inputRDDs;
     }
+
+
+    public static int estimateLayerPartitionNum(int level, CubeStatsReader statsReader, KylinConfig kylinConfig) {
+        double baseCuboidSize = statsReader.estimateLayerSize(level);
+        float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
+        int partition = (int) (baseCuboidSize / rddCut);
+        partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
+        partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
+        return partition;
+    }
+
+
+    public static int estimateTotalPartitionNum(CubeStatsReader statsReader, KylinConfig kylinConfig) {
+        double totalSize = 0;
+        for (double x: statsReader.getCuboidSizeMap().values()) {
+            totalSize += x;
+        }
+        float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
+        int partition = (int) (totalSize / rddCut);
+        partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
+        partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
+        return partition;
+    }
+
 }
diff --git a/examples/sample_cube/template/cube_desc/kylin_sales_cube.json b/examples/sample_cube/template/cube_desc/kylin_sales_cube.json
index 124fc73..aff174e 100644
--- a/examples/sample_cube/template/cube_desc/kylin_sales_cube.json
+++ b/examples/sample_cube/template/cube_desc/kylin_sales_cube.json
@@ -275,7 +275,8 @@
   "engine_type" : %default_engine_type%,
   "storage_type" : %default_storage_type%,
   "override_kylin_properties" : {
-    "kylin.cube.aggrgroup.is-mandatory-only-valid" : "true"
+    "kylin.cube.aggrgroup.is-mandatory-only-valid" : "true",
+    "kylin.engine.spark.rdd-partition-cut-mb" : "500"
   },
   "cuboid_black_list" : [ ],
   "parent_forward" : 3,
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index a3aa0f4..edced8f 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -159,10 +159,10 @@ kylin.security.acl.admin-role=admin
 
 # Help info, format{name|displayName|link}, optional
 kylin.web.help.length=4
-kylin.web.help.0=start|Getting Started|http://kylin.apache.org/docs21/tutorial/kylin_sample.html
-kylin.web.help.1=odbc|ODBC Driver|http://kylin.apache.org/docs21/tutorial/odbc.html
-kylin.web.help.2=tableau|Tableau Guide|http://kylin.apache.org/docs21/tutorial/tableau_91.html
-kylin.web.help.3=onboard|Cube Design Tutorial|http://kylin.apache.org/docs21/howto/howto_optimize_cubes.html
+kylin.web.help.0=start|Getting Started|http://kylin.apache.org/docs/tutorial/kylin_sample.html
+kylin.web.help.1=odbc|ODBC Driver|http://kylin.apache.org/docs/tutorial/odbc.html
+kylin.web.help.2=tableau|Tableau Guide|http://kylin.apache.org/docs/tutorial/tableau_91.html
+kylin.web.help.3=onboard|Cube Design Tutorial|http://kylin.apache.org/docs/howto/howto_optimize_cubes.html
 
 #allow user to export query result
 kylin.web.export-allow-admin=true
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 94a60f8..a68643f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -20,8 +20,6 @@ package org.apache.kylin.storage.hbase.steps;
 
 import java.util.List;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
@@ -35,6 +33,7 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidModeEnum;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.MapReduceUtil;
 import org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper;
@@ -139,8 +138,10 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             }
 
             @Override
-            public void addStepPhase2_BuildCube(CubeSegment seg, List<CubeSegment> mergingSegments, DefaultChainedExecutable jobFlow) {
-                jobFlow.addTask(steps.createMergeCuboidDataStep(seg, mergingSegments, jobFlow.getId(), MergeCuboidJob.class));
+            public void addStepPhase2_BuildCube(CubeSegment seg, List<CubeSegment> mergingSegments,
+                    DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(
+                        steps.createMergeCuboidDataStep(seg, mergingSegments, jobFlow.getId(), MergeCuboidJob.class));
                 jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
                 jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
             }
@@ -157,9 +158,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
         };
     }
 
-    public static class HBaseMergeMROutputFormat implements IMRMergeOutputFormat{
-
-        private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
+    public static class HBaseMergeMROutputFormat implements IMRMergeOutputFormat {
 
         @Override
         public void configureJobInput(Job job, String input) throws Exception {
@@ -181,29 +180,10 @@ public class HBaseMROutput2Transition implements IMROutput2 {
         @Override
         public CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube) {
             String filePath = fileSplit.getPath().toString();
-            String jobID = extractJobIDFromPath(filePath);
-            return findSegmentWithUuid(jobID, cube);
-        }
-
-        private static String extractJobIDFromPath(String path) {
-            Matcher matcher = JOB_NAME_PATTERN.matcher(path);
-            // check the first occurrence
-            if (matcher.find()) {
-                return matcher.group(1);
-            } else {
-                throw new IllegalStateException("Can not extract job ID from file path : " + path);
-            }
+            String jobID = JobBuilderSupport.extractJobIDFromPath(filePath);
+            return CubeInstance.findSegmentWithJobId(jobID, cube);
         }
 
-        private static CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) {
-            for (CubeSegment segment : cubeInstance.getSegments()) {
-                String lastBuildJobID = segment.getLastBuildJobID();
-                if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) {
-                    return segment;
-                }
-            }
-            throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
-        }
     }
 
     public IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide(final CubeSegment seg) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
index 08f58af..e6c3ee8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
@@ -21,7 +21,6 @@ package org.apache.kylin.storage.hbase.steps;
 import java.util.List;
 
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
 import org.apache.kylin.engine.spark.ISparkOutput;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.slf4j.Logger;
@@ -80,8 +79,6 @@ public class HBaseSparkOutputTransition implements ISparkOutput {
             @Override
             public void addStepPhase2_BuildCube(CubeSegment seg, List<CubeSegment> mergingSegments,
                     DefaultChainedExecutable jobFlow) {
-                jobFlow.addTask(
-                        steps.createMergeCuboidDataStep(seg, mergingSegments, jobFlow.getId(), MergeCuboidJob.class));
                 jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
                 jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
             }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
index c2794ff..622a0e8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
@@ -46,19 +46,25 @@ public class HBaseSparkSteps extends HBaseJobSteps {
         sparkExecutable.setParam(SparkCubeHFile.OPTION_META_URL.getOpt(),
                 jobBuilder2.getSegmentMetadataUrl(seg.getConfig(), jobId));
         sparkExecutable.setParam(SparkCubeHFile.OPTION_OUTPUT_PATH.getOpt(), getHFilePath(jobId));
-        sparkExecutable.setParam(SparkCubeHFile.OPTION_PARTITION_FILE_PATH.getOpt(), getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
+        sparkExecutable.setParam(SparkCubeHFile.OPTION_PARTITION_FILE_PATH.getOpt(),
+                getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
 
         sparkExecutable.setJobId(jobId);
 
         StringBuilder jars = new StringBuilder();
         StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.KeyValue.class));
-        StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.class));
-        StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.regionserver.BloomType.class));
-        StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.protobuf.generated.HFileProtos.class)); //hbase-protocal.jar
-        StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.CompatibilityFactory.class)); //hbase-hadoop-compact.jar
+        StringUtil.appendWithSeparator(jars,
+                ClassUtil.findContainingJar(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.class));
+        StringUtil.appendWithSeparator(jars,
+                ClassUtil.findContainingJar(org.apache.hadoop.hbase.regionserver.BloomType.class));
+        StringUtil.appendWithSeparator(jars,
+                ClassUtil.findContainingJar(org.apache.hadoop.hbase.protobuf.generated.HFileProtos.class)); //hbase-protocal.jar
+        StringUtil.appendWithSeparator(jars,
+                ClassUtil.findContainingJar(org.apache.hadoop.hbase.CompatibilityFactory.class)); //hbase-hadoop-compact.jar
         StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar("org.htrace.HTraceConfiguration", null)); // htrace-core.jar
         StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar("org.apache.htrace.Trace", null)); // htrace-core.jar
-        StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar("com.yammer.metrics.core.MetricsRegistry", null)); // metrics-core.jar
+        StringUtil.appendWithSeparator(jars,
+                ClassUtil.findContainingJar("com.yammer.metrics.core.MetricsRegistry", null)); // metrics-core.jar
 
         StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
         sparkExecutable.setJars(jars.toString());
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index a23156c..0136d4e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -30,7 +30,6 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
@@ -42,8 +41,8 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
@@ -57,6 +56,7 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.SerializableConfiguration;
 import org.apache.kylin.engine.spark.KylinSparkJobListener;
+import org.apache.kylin.engine.spark.SparkUtil;
 import org.apache.kylin.measure.MeasureCodec;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.spark.Partitioner;
@@ -94,8 +94,6 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
 
     private Options options;
 
-    private KylinSparkJobListener jobListener;
-
     public SparkCubeHFile() {
         options = new Options();
         options.addOption(OPTION_INPUT_PATH);
@@ -104,7 +102,6 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
         options.addOption(OPTION_META_URL);
         options.addOption(OPTION_OUTPUT_PATH);
         options.addOption(OPTION_PARTITION_FILE_PATH);
-        jobListener = new KylinSparkJobListener();
     }
 
     @Override
@@ -130,6 +127,7 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
         conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
         conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
 
+        KylinSparkJobListener jobListener = new KylinSparkJobListener();
         JavaSparkContext sc = new JavaSparkContext(conf);
         sc.sc().addSparkListener(jobListener);
         final FileSystem fs = partitionFilePath.getFileSystem(sc.hadoopConfiguration());
@@ -161,7 +159,7 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
         logger.info("Input path: {}", inputPath);
         logger.info("Output path: {}", outputPath);
 
-        List<JavaPairRDD> inputRDDs = parseInputPath(inputPath, fs, sc);
+        List<JavaPairRDD> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
         final JavaPairRDD<Text, Text> allCuboidFile = sc.union(inputRDDs.toArray(new JavaPairRDD[inputRDDs.size()]));
         final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
         if (quickPath) {
@@ -236,33 +234,15 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
             logger.debug(ioe.getMessage());
         }
 
-        hfilerdd2.saveAsNewAPIHadoopFile(outputPath, ImmutableBytesWritable.class, KeyValue.class,
-                HFileOutputFormat2.class, job.getConfiguration());
+        FileOutputFormat.setOutputPath(job, new Path(outputPath));
+        hfilerdd2.saveAsNewAPIHadoopDataset(job.getConfiguration());
 
         // output the data size to console, job engine will parse and save the metric
         // please note: this mechanism won't work when spark.submit.deployMode=cluster
-        System.out.println("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
-        deleteHDFSMeta(metaUrl);
+        logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
+        HadoopUtil.deleteHDFSMeta(metaUrl);
     }
 
-    private List<JavaPairRDD> parseInputPath(String inputPath, FileSystem fs, JavaSparkContext sc) throws IOException {
-        List<JavaPairRDD> inputRDDs = Lists.newArrayList();
-        Path inputHDFSPath = new Path(inputPath);
-        FileStatus[] fileStatuses = fs.listStatus(inputHDFSPath);
-        boolean hasDir = false;
-        for (FileStatus stat : fileStatuses) {
-            if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
-                hasDir = true;
-                inputRDDs.add(sc.sequenceFile(stat.getPath().toString(), Text.class, Text.class));
-            }
-        }
-
-        if (!hasDir) {
-            inputRDDs.add(sc.sequenceFile(inputHDFSPath.toString(), Text.class, Text.class));
-        }
-
-        return inputRDDs;
-    }
 
     static class HFilePartitioner extends Partitioner {
         private List<RowKeyWritable> keys;
@@ -298,10 +278,4 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
         }
     }
 
-    protected void deleteHDFSMeta(String metaUrl) throws IOException {
-        String realHdfsPath = StorageURL.valueOf(metaUrl).getParameter("path");
-        HadoopUtil.getFileSystem(realHdfsPath).delete(new Path(realHdfsPath), true);
-        logger.info("Delete metadata in HDFS for this job: " + realHdfsPath);
-    }
-
 }