You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/16 15:03:02 UTC

[kylin] 01/02: KYLIN-3441 Merge cube segments in Spark

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 4bda69ebbe5f25cfa9f94310efcdbb734df486b3
Author: shaofengshi <sh...@apache.org>
AuthorDate: Wed Jul 11 18:40:35 2018 +0800

    KYLIN-3441 Merge cube segments in Spark
---
 .../java/org/apache/kylin/common/KylinConfig.java  |   4 +
 .../org/apache/kylin/common/util/HadoopUtil.java   |   7 +
 .../java/org/apache/kylin/cube/CubeInstance.java   |  10 +
 .../org/apache/kylin/cube/DimensionRangeInfo.java  |   3 +-
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |  13 ++
 .../kylin/engine/mr/steps/MergeCuboidMapper.java   | 196 ++--------------
 ...ergeCuboidMapper.java => SegmentReEncoder.java} | 178 ++++++++-------
 .../kylin/engine/spark/KylinKryoRegistrator.java   |   8 +
 .../kylin/engine/spark/KylinSparkJobListener.java  |   2 +-
 .../engine/spark/SparkBatchMergeJobBuilder2.java   |  45 +++-
 .../kylin/engine/spark/SparkCubingByLayer.java     |  28 +--
 .../kylin/engine/spark/SparkCubingMerge.java       | 251 +++++++++++++++++++++
 .../apache/kylin/engine/spark/SparkExecutable.java |  23 +-
 .../org/apache/kylin/engine/spark/SparkUtil.java   |  63 +++++-
 .../template/cube_desc/kylin_sales_cube.json       |   3 +-
 examples/test_case_data/sandbox/kylin.properties   |   8 +-
 .../hbase/steps/HBaseMROutput2Transition.java      |  36 +--
 .../hbase/steps/HBaseSparkOutputTransition.java    |   3 -
 .../kylin/storage/hbase/steps/HBaseSparkSteps.java |  18 +-
 .../kylin/storage/hbase/steps/SparkCubeHFile.java  |  42 +---
 20 files changed, 570 insertions(+), 371 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 4b3b7c3..e09ce26 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -414,6 +414,10 @@ public class KylinConfig extends KylinConfigBase {
         KylinConfig base = base();
         if (base != this)
             return base.getManager(clz);
+
+        if (managersCache == null) {
+            managersCache = new ConcurrentHashMap<>();
+        }
         
         Object mgr = managersCache.get(clz);
         if (mgr != null)
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index ed6ef89..3cb43fc 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Writable;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -186,4 +187,10 @@ public class HadoopUtil {
         }
         return result;
     }
+
+    public static void deleteHDFSMeta(String metaUrl) throws IOException {
+        String realHdfsPath = StorageURL.valueOf(metaUrl).getParameter("path");
+        HadoopUtil.getFileSystem(realHdfsPath).delete(new Path(realHdfsPath), true);
+        logger.info("Delete metadata in HDFS for this job: " + realHdfsPath);
+    }
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index c0f3536..ff86e7d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -707,4 +707,14 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
         return newCube;
     }
 
+    public static CubeSegment findSegmentWithJobId(String jobID, CubeInstance cubeInstance) {
+        for (CubeSegment segment : cubeInstance.getSegments()) {
+            String lastBuildJobID = segment.getLastBuildJobID();
+            if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) {
+                return segment;
+            }
+        }
+        throw new IllegalStateException("No segment's last build job ID equals " + jobID);
+    }
+
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
index 0b0d1c4..cde8d3f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.cube;
 
+import java.io.Serializable;
 import java.util.Map;
 
 import org.apache.kylin.metadata.datatype.DataTypeOrder;
@@ -31,7 +32,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.Maps;
 
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class DimensionRangeInfo {
+public class DimensionRangeInfo implements Serializable {
 
     private static final Logger logger = LoggerFactory.getLogger(DimensionRangeInfo.class);
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index a1b2cfe..42e0f42 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -20,6 +20,8 @@ package org.apache.kylin.engine.mr;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidModeEnum;
@@ -57,6 +59,7 @@ public class JobBuilderSupport {
     final public static String PathNameCuboidBase = "base_cuboid";
     final public static String PathNameCuboidOld = "old";
     final public static String PathNameCuboidInMem = "in_memory";
+    final public static Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
 
     public JobBuilderSupport(CubeSegment seg, String submitter) {
         Preconditions.checkNotNull(seg, "segment cannot be null");
@@ -324,4 +327,14 @@ public class JobBuilderSupport {
         return getRealizationRootPath(jobId) + "/metadata";
     }
 
+
+    public static String extractJobIDFromPath(String path) {
+        Matcher matcher = JOB_NAME_PATTERN.matcher(path);
+        // check the first occurrence
+        if (matcher.find()) {
+            return matcher.group(1);
+        } else {
+            throw new IllegalStateException("Can not extract job ID from file path : " + path);
+        }
+    }
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 0283c21..d63c8b8 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -19,41 +19,20 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SplittedBytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.common.RowKeySplitter;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.kv.RowKeyEncoder;
-import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.engine.mr.IMROutput2;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 /**
  * @author ysong1, honma
@@ -61,179 +40,34 @@ import com.google.common.collect.Maps;
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
-    private KylinConfig config;
-    private String cubeName;
-    private String segmentID;
-    private CubeManager cubeManager;
-    private CubeInstance cube;
-    private CubeDesc cubeDesc;
-    private CubeSegment mergedCubeSegment;
-    private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle
-
-    private Text outputKey = new Text();
-
-    private byte[] newKeyBodyBuf;
-    private ByteArray newKeyBuf;
-    private RowKeySplitter rowKeySplitter;
-    private RowKeyEncoderProvider rowKeyEncoderProvider;
-
-
-    // for re-encode measures that use dictionary
-    private List<Pair<Integer, MeasureIngester>> dictMeasures;
-    private Map<TblColRef, Dictionary<String>> oldDicts;
-    private Map<TblColRef, Dictionary<String>> newDicts;
-    private List<MeasureDesc> measureDescs;
-    private BufferedMeasureCodec codec;
-    private Object[] measureObjs;
-    private Text outputValue;
+    private SegmentReEncoder reEncoder;
+    private Pair<Text, Text> newPair;
 
     @Override
     protected void doSetup(Context context) throws IOException, InterruptedException {
         super.bindCurrentConfiguration(context.getConfiguration());
 
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
-        segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
-
-        config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
+        String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
 
-        cubeManager = CubeManager.getInstance(config);
-        cube = cubeManager.getCube(cubeName);
-        cubeDesc = cube.getDescriptor();
-        mergedCubeSegment = cube.getSegmentById(segmentID);
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
-        // int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
-        newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];// size will auto-grow
-        newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
+        CubeManager cubeManager = CubeManager.getInstance(config);
+        CubeInstance cube = cubeManager.getCube(cubeName);
+        CubeDesc cubeDesc = cube.getDescriptor();
+        CubeSegment mergedCubeSegment = cube.getSegmentById(segmentID);
 
         // decide which source segment
         FileSplit fileSplit = (FileSplit) context.getInputSplit();
-        IMROutput2.IMRMergeOutputFormat outputFormat = MRUtil.getBatchMergeOutputSide2(mergedCubeSegment).getOuputFormat();
-        sourceCubeSegment = outputFormat.findSourceSegment(fileSplit, cube);
-
-        rowKeySplitter = new RowKeySplitter(sourceCubeSegment);
-        rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
-
-        measureDescs = cubeDesc.getMeasures();
-        codec = new BufferedMeasureCodec(measureDescs);
-        measureObjs = new Object[measureDescs.size()];
-        outputValue = new Text();
-
-        dictMeasures = Lists.newArrayList();
-        oldDicts = Maps.newHashMap();
-        newDicts = Maps.newHashMap();
-        for (int i = 0; i < measureDescs.size(); i++) {
-            MeasureDesc measureDesc = measureDescs.get(i);
-            MeasureType measureType = measureDesc.getFunction().getMeasureType();
-            List<TblColRef> columns = measureType.getColumnsNeedDictionary(measureDesc.getFunction());
-            boolean needReEncode = false;
-            for (TblColRef col : columns) {
-                //handle the column that all records is null
-                if (sourceCubeSegment.getDictionary(col) == null) {
-                    continue;
-                }
-
-                oldDicts.put(col, sourceCubeSegment.getDictionary(col));
-                newDicts.put(col, mergedCubeSegment.getDictionary(col));
-                if (!sourceCubeSegment.getDictionary(col).equals(mergedCubeSegment.getDictionary(col))) {
-                    needReEncode = true;
-                }
-            }
-            if (needReEncode) {
-                dictMeasures.add(Pair.newPair(i, measureType.newIngester()));
-            }
-        }
+        IMROutput2.IMRMergeOutputFormat outputFormat = MRUtil.getBatchMergeOutputSide2(mergedCubeSegment)
+                .getOuputFormat();
+        CubeSegment sourceCubeSegment = outputFormat.findSourceSegment(fileSplit, cube);
+        reEncoder = new SegmentReEncoder(cubeDesc, sourceCubeSegment, mergedCubeSegment, config);
     }
 
     @Override
     public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
-        long cuboidID = rowKeySplitter.split(key.getBytes());
-        Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID);
-        RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
-
-        SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
-        int bufOffset = 0;
-        int bodySplitOffset = rowKeySplitter.getBodySplitOffset();
-
-        for (int i = 0; i < cuboid.getColumns().size(); ++i) {
-            int useSplit = i + bodySplitOffset;
-            TblColRef col = cuboid.getColumns().get(i);
-
-            if (cubeDesc.getRowkey().isUseDictionary(col)) {
-                // if dictionary on fact table column, needs rewrite
-                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-                Dictionary<String> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
-
-                // handle the dict of all merged segments is null
-                if (mergedDict == null) {
-                    continue;
-                }
-
-                Dictionary<String> sourceDict;
-                // handle the column that all records is null
-                if (sourceCubeSegment.getDictionary(col) == null) {
-                    BytesUtil.writeUnsigned(mergedDict.nullId(), newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-                    bufOffset += mergedDict.getSizeOfId();
-                    continue;
-                } else {
-                    sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
-                }
-
-                while (sourceDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || //
-                        mergedDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || //
-                        mergedDict.getSizeOfId() > newKeyBodyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBodyBuf;
-                    newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
-                }
-
-                int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0, splittedByteses[useSplit].length);
-                int idInMergedDict;
-
-                //int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
-                String v = sourceDict.getValueFromId(idInSourceDict);
-                if (v == null) {
-                    idInMergedDict = mergedDict.nullId();
-                } else {
-                    idInMergedDict = mergedDict.getIdFromValue(v);
-                }
-
-                BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-                bufOffset += mergedDict.getSizeOfId();
-            } else {
-                // keep as it is
-                while (splittedByteses[useSplit].length > newKeyBodyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBodyBuf;
-                    newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
-                }
-
-                System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset, splittedByteses[useSplit].length);
-                bufOffset += splittedByteses[useSplit].length;
-            }
-        }
-
-        int fullKeySize = rowkeyEncoder.getBytesLength();
-        while (newKeyBuf.array().length < fullKeySize) {
-            newKeyBuf = new ByteArray(newKeyBuf.length() * 2);
-        }
-        newKeyBuf.setLength(fullKeySize);
-
-        rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf);
-        outputKey.set(newKeyBuf.array(), 0, fullKeySize);
-
-        // re-encode measures if dictionary is used
-        if (dictMeasures.size() > 0) {
-            codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
-            for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
-                int i = pair.getFirst();
-                MeasureIngester ingester = pair.getSecond();
-                measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
-            }
-            ByteBuffer valueBuf = codec.encode(measureObjs);
-            outputValue.set(valueBuf.array(), 0, valueBuf.position());
-            value = outputValue;
-        }
-
-        context.write(outputKey, value);
+        newPair = reEncoder.reEncode(key, value);
+        context.write(newPair.getFirst(), newPair.getSecond());
     }
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java
old mode 100755
new mode 100644
similarity index 68%
copy from engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
copy to engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java
index 0283c21..856a59f
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java
@@ -6,33 +6,31 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -41,11 +39,6 @@ import org.apache.kylin.cube.kv.RowKeyEncoder;
 import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.engine.mr.IMROutput2;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.measure.MeasureType;
@@ -56,67 +49,44 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
- * @author ysong1, honma
+ * Re-encode the cuboid from old segment (before merge) to new segment (after merge).
  */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
-
-    private KylinConfig config;
-    private String cubeName;
-    private String segmentID;
-    private CubeManager cubeManager;
-    private CubeInstance cube;
-    private CubeDesc cubeDesc;
-    private CubeSegment mergedCubeSegment;
-    private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle
-
-    private Text outputKey = new Text();
+public class SegmentReEncoder implements Serializable {
+    private volatile transient boolean initialized = false;
+    private CubeSegment mergingSeg;
+    private CubeSegment mergedSeg;
 
     private byte[] newKeyBodyBuf;
     private ByteArray newKeyBuf;
     private RowKeySplitter rowKeySplitter;
     private RowKeyEncoderProvider rowKeyEncoderProvider;
 
-
     // for re-encode measures that use dictionary
     private List<Pair<Integer, MeasureIngester>> dictMeasures;
     private Map<TblColRef, Dictionary<String>> oldDicts;
     private Map<TblColRef, Dictionary<String>> newDicts;
     private List<MeasureDesc> measureDescs;
     private BufferedMeasureCodec codec;
-    private Object[] measureObjs;
-    private Text outputValue;
-
-    @Override
-    protected void doSetup(Context context) throws IOException, InterruptedException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
-        segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
-
-        config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
-        cubeManager = CubeManager.getInstance(config);
-        cube = cubeManager.getCube(cubeName);
-        cubeDesc = cube.getDescriptor();
-        mergedCubeSegment = cube.getSegmentById(segmentID);
+    private CubeDesc cubeDesc;
+    private KylinConfig kylinConfig;
+
+    public SegmentReEncoder(CubeDesc cubeDesc, CubeSegment mergingSeg, CubeSegment mergedSeg, KylinConfig kylinConfig) {
+        this.cubeDesc = cubeDesc;
+        this.mergingSeg = mergingSeg;
+        this.mergedSeg = mergedSeg;
+        this.kylinConfig = kylinConfig;
+        init();
+    }
 
-        // int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
+    private void init() {
         newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];// size will auto-grow
         newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
 
-        // decide which source segment
-        FileSplit fileSplit = (FileSplit) context.getInputSplit();
-        IMROutput2.IMRMergeOutputFormat outputFormat = MRUtil.getBatchMergeOutputSide2(mergedCubeSegment).getOuputFormat();
-        sourceCubeSegment = outputFormat.findSourceSegment(fileSplit, cube);
-
-        rowKeySplitter = new RowKeySplitter(sourceCubeSegment);
-        rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
+        rowKeySplitter = new RowKeySplitter(mergingSeg);
+        rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedSeg);
 
         measureDescs = cubeDesc.getMeasures();
         codec = new BufferedMeasureCodec(measureDescs);
-        measureObjs = new Object[measureDescs.size()];
-        outputValue = new Text();
 
         dictMeasures = Lists.newArrayList();
         oldDicts = Maps.newHashMap();
@@ -128,13 +98,13 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
             boolean needReEncode = false;
             for (TblColRef col : columns) {
                 //handle the column that all records is null
-                if (sourceCubeSegment.getDictionary(col) == null) {
+                if (mergingSeg.getDictionary(col) == null) {
                     continue;
                 }
 
-                oldDicts.put(col, sourceCubeSegment.getDictionary(col));
-                newDicts.put(col, mergedCubeSegment.getDictionary(col));
-                if (!sourceCubeSegment.getDictionary(col).equals(mergedCubeSegment.getDictionary(col))) {
+                oldDicts.put(col, mergingSeg.getDictionary(col));
+                newDicts.put(col, mergedSeg.getDictionary(col));
+                if (!mergingSeg.getDictionary(col).equals(mergedSeg.getDictionary(col))) {
                     needReEncode = true;
                 }
             }
@@ -142,10 +112,71 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
                 dictMeasures.add(Pair.newPair(i, measureType.newIngester()));
             }
         }
+        initialized = true;
+    }
+
+    /**
+     * Re-encode with both dimension and measure in encoded (Text) format.
+     * @param key
+     * @param value
+     * @return
+     * @throws IOException
+     */
+    public Pair<Text, Text> reEncode(Text key, Text value) throws IOException {
+        if (initialized == false) {
+            throw new IllegalStateException("Not initialized");
+        }
+        Object[] measureObjs = new Object[measureDescs.size()];
+        // re-encode measures if dictionary is used
+        if (dictMeasures.size() > 0) {
+            codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
+            for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
+                int i = pair.getFirst();
+                MeasureIngester ingester = pair.getSecond();
+                measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
+            }
+
+            ByteBuffer valueBuf = codec.encode(measureObjs);
+            byte[] resultValue = new byte[valueBuf.position()];
+            System.arraycopy(valueBuf.array(), 0, resultValue, 0, valueBuf.position());
+
+            return Pair.newPair(processKey(key), new Text(resultValue));
+        } else {
+            return Pair.newPair(processKey(key), value);
+        }
     }
 
-    @Override
-    public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
+    /**
+     * Re-encode with measures in Object[] format.
+     * @param key
+     * @param value
+     * @return
+     * @throws IOException
+     */
+    public Pair<Text, Object[]> reEncode2(Text key, Text value) throws IOException {
+        if (initialized == false) {
+            throw new IllegalStateException("Not initialized");
+        }
+
+        Object[] measureObjs = new Object[measureDescs.size()];
+        codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
+        // re-encode measures if dictionary is used
+        if (dictMeasures.size() > 0) {
+            for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
+                int i = pair.getFirst();
+                MeasureIngester ingester = pair.getSecond();
+                measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
+            }
+
+            ByteBuffer valueBuf = codec.encode(measureObjs);
+            byte[] resultValue = new byte[valueBuf.position()];
+            System.arraycopy(valueBuf.array(), 0, resultValue, 0, valueBuf.position());
+
+        }
+        return Pair.newPair(processKey(key), measureObjs);
+    }
+
+    private Text processKey(Text key) throws IOException {
         long cuboidID = rowKeySplitter.split(key.getBytes());
         Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID);
         RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
@@ -160,8 +191,8 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
             if (cubeDesc.getRowkey().isUseDictionary(col)) {
                 // if dictionary on fact table column, needs rewrite
-                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-                Dictionary<String> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
+                DictionaryManager dictMgr = DictionaryManager.getInstance(kylinConfig);
+                Dictionary<String> mergedDict = dictMgr.getDictionary(mergedSeg.getDictResPath(col));
 
                 // handle the dict of all merged segments is null
                 if (mergedDict == null) {
@@ -170,12 +201,12 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
                 Dictionary<String> sourceDict;
                 // handle the column that all records is null
-                if (sourceCubeSegment.getDictionary(col) == null) {
+                if (mergingSeg.getDictionary(col) == null) {
                     BytesUtil.writeUnsigned(mergedDict.nullId(), newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
                     bufOffset += mergedDict.getSizeOfId();
                     continue;
                 } else {
-                    sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
+                    sourceDict = dictMgr.getDictionary(mergingSeg.getDictResPath(col));
                 }
 
                 while (sourceDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || //
@@ -186,7 +217,8 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
                     System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
                 }
 
-                int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0, splittedByteses[useSplit].length);
+                int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0,
+                        splittedByteses[useSplit].length);
                 int idInMergedDict;
 
                 //int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
@@ -207,7 +239,8 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
                     System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
                 }
 
-                System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset, splittedByteses[useSplit].length);
+                System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset,
+                        splittedByteses[useSplit].length);
                 bufOffset += splittedByteses[useSplit].length;
             }
         }
@@ -219,21 +252,10 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         newKeyBuf.setLength(fullKeySize);
 
         rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf);
-        outputKey.set(newKeyBuf.array(), 0, fullKeySize);
 
-        // re-encode measures if dictionary is used
-        if (dictMeasures.size() > 0) {
-            codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
-            for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
-                int i = pair.getFirst();
-                MeasureIngester ingester = pair.getSecond();
-                measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
-            }
-            ByteBuffer valueBuf = codec.encode(measureObjs);
-            outputValue.set(valueBuf.array(), 0, valueBuf.position());
-            value = outputValue;
-        }
+        byte[] resultKey = new byte[fullKeySize];
+        System.arraycopy(newKeyBuf.array(), 0, resultKey, 0, fullKeySize);
 
-        context.write(outputKey, value);
+        return new Text(resultKey);
     }
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
index 90c52c2..a13f96a 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
@@ -96,6 +96,14 @@ public class KylinKryoRegistrator implements KryoRegistrator {
 
         kyroClasses.add(Class.class);
 
+        //shaded classes
+        addClassQuitely(kyroClasses, "org.apache.kylin.job.shaded.org.roaringbitmap.buffer.MutableRoaringArray");
+        addClassQuitely(kyroClasses, "org.apache.kylin.job.shaded.org.roaringbitmap.buffer.MutableRoaringBitmap");
+        addClassQuitely(kyroClasses, "org.apache.kylin.job.shaded.org.roaringbitmap.buffer.MappeableArrayContainer");
+        addClassQuitely(kyroClasses, "org.apache.kylin.job.shaded.org.roaringbitmap.buffer.MappeableBitmapContainer");
+        addClassQuitely(kyroClasses, "org.apache.kylin.job.shaded.org.roaringbitmap.buffer.ImmutableRoaringBitmap");
+        addClassQuitely(kyroClasses, "org.apache.kylin.job.shaded.org.roaringbitmap.buffer.ImmutableRoaringArray");
+
         addClassQuitely(kyroClasses, "com.google.common.collect.EmptyImmutableList");
         addClassQuitely(kyroClasses, "java.nio.HeapShortBuffer");
         addClassQuitely(kyroClasses, "java.nio.HeapLongBuffer");
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinSparkJobListener.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinSparkJobListener.java
index 7976c3b..b8821b6 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinSparkJobListener.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinSparkJobListener.java
@@ -29,7 +29,7 @@ public class KylinSparkJobListener extends SparkListener implements Serializable
 
     @Override
     public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
-        if (taskEnd.taskMetrics().outputMetrics() != null) {
+        if (taskEnd != null && taskEnd.taskMetrics() != null && taskEnd.taskMetrics().outputMetrics() != null) {
             metrics.bytesWritten += taskEnd.taskMetrics().outputMetrics().bytesWritten();
             metrics.recordsWritten += taskEnd.taskMetrics().outputMetrics().recordsWritten();
         }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
index b68f6e86..97861a3 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
@@ -18,11 +18,17 @@
 
 package org.apache.kylin.engine.spark;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.job.constant.ExecutableConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +67,9 @@ public class SparkBatchMergeJobBuilder2 extends JobBuilderSupport {
         result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
         outputSide.addStepPhase1_MergeDictionary(result);
 
+        // merge cube
+        result.addTask(createMergeCuboidDataStep(cubeSegment, mergingSegments, jobId));
+
         // Phase 2: Merge Cube Files
         outputSide.addStepPhase2_BuildCube(seg, mergingSegments, result);
 
@@ -70,5 +79,39 @@ public class SparkBatchMergeJobBuilder2 extends JobBuilderSupport {
 
         return result;
     }
-    
+
+    public SparkExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments, String jobID) {
+
+        final List<String> mergingCuboidPaths = Lists.newArrayList();
+        for (CubeSegment merging : mergingSegments) {
+            mergingCuboidPaths.add(getCuboidRootPath(merging));
+        }
+        String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
+        String outputPath = getCuboidRootPath(jobID);
+
+        final SparkExecutable sparkExecutable = new SparkExecutable();
+        sparkExecutable.setClassName(SparkCubingMerge.class.getName());
+        sparkExecutable.setParam(SparkCubingMerge.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+        sparkExecutable.setParam(SparkCubingMerge.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+        sparkExecutable.setParam(SparkCubingMerge.OPTION_INPUT_PATH.getOpt(), formattedPath);
+        sparkExecutable.setParam(SparkCubingMerge.OPTION_META_URL.getOpt(),
+                getSegmentMetadataUrl(seg.getConfig(), jobID));
+        sparkExecutable.setParam(SparkCubingMerge.OPTION_OUTPUT_PATH.getOpt(), outputPath);
+
+        sparkExecutable.setJobId(jobID);
+        sparkExecutable.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+
+        StringBuilder jars = new StringBuilder();
+
+        StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
+        sparkExecutable.setJars(jars.toString());
+
+        return sparkExecutable;
+    }
+
+    public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
+        Map<String, String> param = new HashMap<>();
+        param.put("path", getDumpMetadataPath(jobId));
+        return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "hdfs", param).toString();
+    }
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index f842fd5..0cc8a3b 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,7 +17,6 @@
 */
 package org.apache.kylin.engine.spark;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -34,7 +33,6 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
@@ -47,7 +45,6 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
 import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -137,7 +134,9 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
         conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
 
+        KylinSparkJobListener jobListener = new KylinSparkJobListener();
         JavaSparkContext sc = new JavaSparkContext(conf);
+        sc.sc().addSparkListener(jobListener);
         HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
         final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
 
@@ -221,7 +220,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
         JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels + 1];
         int level = 0;
-        int partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig);
+        int partition = SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
 
         // aggregate to calculate base cuboid
         allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, partition).persist(storageLevel);
@@ -230,7 +229,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
 
         // aggregate to ND cuboids
         for (level = 1; level <= totalLevels; level++) {
-            partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig);
+            partition = SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
             allRDDs[level] = allRDDs[level - 1].flatMapToPair(new CuboidFlatMap(cubeName, segmentId, metaUrl, sConf))
                     .reduceByKey(reducerFunction2, partition).persist(storageLevel);
             if (envConfig.isSparkSanityCheckEnabled() == true) {
@@ -241,7 +240,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         }
         allRDDs[totalLevels].unpersist();
         logger.info("Finished on calculating all level cuboids.");
-        //        deleteHDFSMeta(metaUrl);
+        logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
+        HadoopUtil.deleteHDFSMeta(metaUrl);
     }
 
     protected void setHadoopConf(Job job, CubeSegment segment, String metaUrl) throws Exception {
@@ -249,15 +249,6 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         job.setOutputValueClass(Text.class);
     }
 
-    protected int estimateRDDPartitionNum(int level, CubeStatsReader statsReader, KylinConfig kylinConfig) {
-        double baseCuboidSize = statsReader.estimateLayerSize(level);
-        float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
-        int partition = (int) (baseCuboidSize / rddCut);
-        partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
-        partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
-        logger.info("Partition for spark cubing: {}", partition);
-        return partition;
-    }
 
     protected JavaPairRDD<ByteArray, Object[]> prepareOutput(JavaPairRDD<ByteArray, Object[]> rdd, KylinConfig config,
             CubeSegment segment, int level) {
@@ -417,7 +408,6 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         private String metaUrl;
         private CubeSegment cubeSegment;
         private CubeDesc cubeDesc;
-        private CuboidScheduler cuboidScheduler;
         private NDCuboidBuilder ndCuboidBuilder;
         private RowKeySplitter rowKeySplitter;
         private volatile transient boolean initialized = false;
@@ -435,7 +425,6 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
             CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
             this.cubeSegment = cubeInstance.getSegmentById(segmentId);
             this.cubeDesc = cubeInstance.getDescriptor();
-            this.cuboidScheduler = cubeSegment.getCuboidScheduler();
             this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new RowKeyEncoderProvider(cubeSegment));
             this.rowKeySplitter = new RowKeySplitter(cubeSegment);
         }
@@ -508,9 +497,4 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         return count;
     }
 
-    protected void deleteHDFSMeta(String metaUrl) throws IOException {
-        String realHdfsPath = StorageURL.valueOf(metaUrl).getParameter("path");
-        HadoopUtil.getFileSystem(realHdfsPath).delete(new Path(realHdfsPath), true);
-        logger.info("Delete metadata in HDFS for this job: " + realHdfsPath);
-    }
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
new file mode 100644
index 0000000..8d78920
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringSplitter;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.engine.mr.steps.SegmentReEncoder;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import scala.Tuple2;
+
+/**
+ */
+public class SparkCubingMerge extends AbstractApplication implements Serializable {
+
+    protected static final Logger logger = LoggerFactory.getLogger(SparkCubingMerge.class);
+
+    public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
+            .withDescription("Cube Segment Id").create("segmentId");
+    public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create("metaUrl");
+    public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+            .isRequired(true).withDescription("HFile output path").create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+            .isRequired(true).withDescription("Cuboid files PATH").create(BatchConstants.ARG_INPUT);
+
+    private Options options;
+
+    public SparkCubingMerge() {
+        options = new Options();
+        options.addOption(OPTION_META_URL);
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_OUTPUT_PATH);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        final String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+        final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+
+        Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1") };
+
+        SparkConf conf = new SparkConf().setAppName("Merge segments for cube:" + cubeName + ", segment " + segmentId);
+        //serialization conf
+        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
+        conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
+
+        JavaSparkContext sc = new JavaSparkContext(conf);
+        KylinSparkJobListener jobListener = new KylinSparkJobListener();
+        sc.sc().addSparkListener(jobListener);
+
+        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+        final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+        final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+        final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+        final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig);
+
+        logger.info("Input path: {}", inputPath);
+        logger.info("Output path: {}", outputPath);
+
+        String[] inputFolders = StringSplitter.split(inputPath, ",");
+
+        FileSystem fs = HadoopUtil.getWorkingFileSystem();
+        List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayList();
+        for (int i = 0; i < inputFolders.length; i++) {
+            String path = inputFolders[i];
+            CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
+
+            // each folder will be a rdd, union them
+            List<JavaPairRDD> cuboidRdds = SparkUtil.parseInputPath(path, fs, sc, Text.class, Text.class);
+            JavaPairRDD<Text, Text> segRdd = sc.union(cuboidRdds.toArray(new JavaPairRDD[cuboidRdds.size()]));
+
+            // re-encode with new dictionaries
+            JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
+                    sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
+            mergingSegs.add(newEcoddedRdd);
+        }
+
+        final MeasureAggregators aggregators = new MeasureAggregators(cubeDesc.getMeasures());
+
+        // reduce
+        JavaPairRDD<Text, Object[]> mergedRdd = sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
+                .reduceByKey(new Function2<Object[], Object[], Object[]>() {
+                    @Override
+                    public Object[] call(Object[] input1, Object[] input2) throws Exception {
+                        Object[] measureObjs = new Object[input1.length];
+                        aggregators.aggregate(input1, input2, measureObjs);
+                        return measureObjs;
+                    }
+                }, SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig));
+
+        Configuration confOverwrite = new Configuration(sc.hadoopConfiguration());
+        confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2
+        final Job job = Job.getInstance(confOverwrite);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(Text.class);
+        FileOutputFormat.setOutputPath(job, new Path(outputPath));
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+        mergedRdd.mapToPair(
+                new PairFunction<Tuple2<Text, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
+                    private volatile transient boolean initialized = false;
+                    BufferedMeasureCodec codec;
+
+                    @Override
+                    public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(
+                            Tuple2<Text, Object[]> tuple2) throws Exception {
+
+                        if (initialized == false) {
+                            synchronized (SparkCubingByLayer.class) {
+                                if (initialized == false) {
+                                    KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+                                    CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+                                    codec = new BufferedMeasureCodec(desc.getMeasures());
+                                    initialized = true;
+                                }
+                            }
+                        }
+                        ByteBuffer valueBuf = codec.encode(tuple2._2());
+                        byte[] encodedBytes = new byte[valueBuf.position()];
+                        System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
+                        return new Tuple2<>(tuple2._1(), new org.apache.hadoop.io.Text(encodedBytes));
+                    }
+
+                }).saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+        // output the data size to console, job engine will parse and save the metric
+        // please note: this mechanism won't work when spark.submit.deployMode=cluster
+        logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
+        HadoopUtil.deleteHDFSMeta(metaUrl);
+    }
+
+    static class ReEncodCuboidFunction implements PairFunction<Tuple2<Text, Text>, Text, Object[]> {
+        private volatile transient boolean initialized = false;
+        private String cubeName;
+        private String sourceSegmentId;
+        private String mergedSegmentId;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private transient KylinConfig kylinConfig;
+        private transient SegmentReEncoder segmentReEncoder = null;
+        private transient Pair<Text, Object[]> encodedPari = null;
+
+        ReEncodCuboidFunction(String cubeName, String sourceSegmentId, String mergedSegmentId, String metaUrl,
+                SerializableConfiguration conf) {
+            this.cubeName = cubeName;
+            this.sourceSegmentId = sourceSegmentId;
+            this.mergedSegmentId = mergedSegmentId;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+        }
+
+        private void init() {
+            this.kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+            final CubeDesc cubeDesc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cube.getDescName());
+            final CubeSegment sourceSeg = cube.getSegmentById(sourceSegmentId);
+            final CubeSegment mergedSeg = cube.getSegmentById(mergedSegmentId);
+            this.segmentReEncoder = new SegmentReEncoder(cubeDesc, sourceSeg, mergedSeg, kylinConfig);
+        }
+
+        @Override
+        public Tuple2<Text, Object[]> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
+            if (initialized == false) {
+                synchronized (ReEncodCuboidFunction.class) {
+                    if (initialized == false) {
+                        init();
+                        initialized = true;
+                    }
+                }
+            }
+            encodedPari = segmentReEncoder.reEncode2(textTextTuple2._1, textTextTuple2._2);
+            return new Tuple2(encodedPari.getFirst(), encodedPari.getSecond());
+        }
+    }
+
+    private CubeSegment findSourceSegment(String filePath, CubeInstance cube) {
+        String jobID = JobBuilderSupport.extractJobIDFromPath(filePath);
+        return CubeInstance.findSegmentWithJobId(jobID, cube);
+    }
+
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 90442a4..961fd6c 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -20,7 +20,9 @@ package org.apache.kylin.engine.spark;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -44,6 +46,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.Segments;
 import org.slf4j.LoggerFactory;
 
 /**
@@ -132,9 +135,17 @@ public class SparkExecutable extends AbstractExecutable {
 
         String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
         CubeSegment segment = cube.getSegmentById(segmentID);
+        Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
 
         try {
-            attachSegmentMetadataWithDict(segment);
+            if (mergingSeg == null || mergingSeg.size() == 0) {
+                attachSegmentMetadataWithDict(segment);
+            } else {
+                List<CubeSegment> allRelatedSegs = new ArrayList();
+                allRelatedSegs.add(segment);
+                allRelatedSegs.addAll(mergingSeg);
+                attachSegmentsMetadataWithDict(allRelatedSegs);
+            }
         } catch (IOException e) {
             throw new ExecuteException("meta dump failed");
         }
@@ -197,6 +208,16 @@ public class SparkExecutable extends AbstractExecutable {
         dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig());
     }
 
+    private void attachSegmentsMetadataWithDict(List<CubeSegment> segments) throws IOException {
+        Set<String> dumpList = new LinkedHashSet<>();
+        dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segments.get(0).getCubeInstance()));
+        for (CubeSegment segment : segments) {
+            dumpList.addAll(segment.getDictionaryPaths());
+            dumpList.add(segment.getStatisticsResourcePath());
+        }
+        dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segments.get(0).getConfig());
+    }
+
     private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig)
             throws IOException {
         File tmp = File.createTempFile("kylin_job_meta", "");
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index a4e17c3..da207ee 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -18,17 +18,24 @@
 
 package org.apache.kylin.engine.spark;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.GenericOptionsParser;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.IMROutput2;
-import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.storage.StorageFactory;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import com.google.common.collect.Lists;
 
 public class SparkUtil {
 
@@ -37,10 +44,6 @@ public class SparkUtil {
         return SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchCubingInputSide(flatDesc);
     }
 
-    private static TableDesc getTableDesc(String tableName, String prj) {
-        return TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName, prj);
-    }
-
     public static ISparkOutput.ISparkBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
         return StorageFactory.createEngineAdapter(seg, ISparkOutput.class).getBatchCubingOutputSide(seg);
     }
@@ -57,7 +60,47 @@ public class SparkUtil {
         return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg);
     }
 
-    private static synchronized GenericOptionsParser getParser(Configuration conf, String[] args) throws Exception {
-        return new GenericOptionsParser(conf, args);
+    public static List<JavaPairRDD> parseInputPath(String inputPath, FileSystem fs, JavaSparkContext sc, Class keyClass,
+            Class valueClass) throws IOException {
+        List<JavaPairRDD> inputRDDs = Lists.newArrayList();
+        Path inputHDFSPath = new Path(inputPath);
+        FileStatus[] fileStatuses = fs.listStatus(inputHDFSPath);
+        boolean hasDir = false;
+        for (FileStatus stat : fileStatuses) {
+            if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
+                hasDir = true;
+                inputRDDs.add(sc.sequenceFile(stat.getPath().toString(), keyClass, valueClass));
+            }
+        }
+
+        if (!hasDir) {
+            inputRDDs.add(sc.sequenceFile(inputHDFSPath.toString(), keyClass, valueClass));
+        }
+
+        return inputRDDs;
     }
+
+
+    public static int estimateLayerPartitionNum(int level, CubeStatsReader statsReader, KylinConfig kylinConfig) {
+        double baseCuboidSize = statsReader.estimateLayerSize(level);
+        float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
+        int partition = (int) (baseCuboidSize / rddCut);
+        partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
+        partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
+        return partition;
+    }
+
+
+    public static int estimateTotalPartitionNum(CubeStatsReader statsReader, KylinConfig kylinConfig) {
+        double totalSize = 0;
+        for (double x: statsReader.getCuboidSizeMap().values()) {
+            totalSize += x;
+        }
+        float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
+        int partition = (int) (totalSize / rddCut);
+        partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
+        partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
+        return partition;
+    }
+
 }
diff --git a/examples/sample_cube/template/cube_desc/kylin_sales_cube.json b/examples/sample_cube/template/cube_desc/kylin_sales_cube.json
index 124fc73..aff174e 100644
--- a/examples/sample_cube/template/cube_desc/kylin_sales_cube.json
+++ b/examples/sample_cube/template/cube_desc/kylin_sales_cube.json
@@ -275,7 +275,8 @@
   "engine_type" : %default_engine_type%,
   "storage_type" : %default_storage_type%,
   "override_kylin_properties" : {
-    "kylin.cube.aggrgroup.is-mandatory-only-valid" : "true"
+    "kylin.cube.aggrgroup.is-mandatory-only-valid" : "true",
+    "kylin.engine.spark.rdd-partition-cut-mb" : "500"
   },
   "cuboid_black_list" : [ ],
   "parent_forward" : 3,
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index a3aa0f4..edced8f 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -159,10 +159,10 @@ kylin.security.acl.admin-role=admin
 
 # Help info, format{name|displayName|link}, optional
 kylin.web.help.length=4
-kylin.web.help.0=start|Getting Started|http://kylin.apache.org/docs21/tutorial/kylin_sample.html
-kylin.web.help.1=odbc|ODBC Driver|http://kylin.apache.org/docs21/tutorial/odbc.html
-kylin.web.help.2=tableau|Tableau Guide|http://kylin.apache.org/docs21/tutorial/tableau_91.html
-kylin.web.help.3=onboard|Cube Design Tutorial|http://kylin.apache.org/docs21/howto/howto_optimize_cubes.html
+kylin.web.help.0=start|Getting Started|http://kylin.apache.org/docs/tutorial/kylin_sample.html
+kylin.web.help.1=odbc|ODBC Driver|http://kylin.apache.org/docs/tutorial/odbc.html
+kylin.web.help.2=tableau|Tableau Guide|http://kylin.apache.org/docs/tutorial/tableau_91.html
+kylin.web.help.3=onboard|Cube Design Tutorial|http://kylin.apache.org/docs/howto/howto_optimize_cubes.html
 
 #allow user to export query result
 kylin.web.export-allow-admin=true
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 94a60f8..a68643f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -20,8 +20,6 @@ package org.apache.kylin.storage.hbase.steps;
 
 import java.util.List;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
@@ -35,6 +33,7 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidModeEnum;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.MapReduceUtil;
 import org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper;
@@ -139,8 +138,10 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             }
 
             @Override
-            public void addStepPhase2_BuildCube(CubeSegment seg, List<CubeSegment> mergingSegments, DefaultChainedExecutable jobFlow) {
-                jobFlow.addTask(steps.createMergeCuboidDataStep(seg, mergingSegments, jobFlow.getId(), MergeCuboidJob.class));
+            public void addStepPhase2_BuildCube(CubeSegment seg, List<CubeSegment> mergingSegments,
+                    DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(
+                        steps.createMergeCuboidDataStep(seg, mergingSegments, jobFlow.getId(), MergeCuboidJob.class));
                 jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
                 jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
             }
@@ -157,9 +158,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
         };
     }
 
-    public static class HBaseMergeMROutputFormat implements IMRMergeOutputFormat{
-
-        private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
+    public static class HBaseMergeMROutputFormat implements IMRMergeOutputFormat {
 
         @Override
         public void configureJobInput(Job job, String input) throws Exception {
@@ -181,29 +180,10 @@ public class HBaseMROutput2Transition implements IMROutput2 {
         @Override
         public CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube) {
             String filePath = fileSplit.getPath().toString();
-            String jobID = extractJobIDFromPath(filePath);
-            return findSegmentWithUuid(jobID, cube);
-        }
-
-        private static String extractJobIDFromPath(String path) {
-            Matcher matcher = JOB_NAME_PATTERN.matcher(path);
-            // check the first occurrence
-            if (matcher.find()) {
-                return matcher.group(1);
-            } else {
-                throw new IllegalStateException("Can not extract job ID from file path : " + path);
-            }
+            String jobID = JobBuilderSupport.extractJobIDFromPath(filePath);
+            return CubeInstance.findSegmentWithJobId(jobID, cube);
         }
 
-        private static CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) {
-            for (CubeSegment segment : cubeInstance.getSegments()) {
-                String lastBuildJobID = segment.getLastBuildJobID();
-                if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) {
-                    return segment;
-                }
-            }
-            throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
-        }
     }
 
     public IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide(final CubeSegment seg) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
index 08f58af..e6c3ee8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
@@ -21,7 +21,6 @@ package org.apache.kylin.storage.hbase.steps;
 import java.util.List;
 
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
 import org.apache.kylin.engine.spark.ISparkOutput;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.slf4j.Logger;
@@ -80,8 +79,6 @@ public class HBaseSparkOutputTransition implements ISparkOutput {
             @Override
             public void addStepPhase2_BuildCube(CubeSegment seg, List<CubeSegment> mergingSegments,
                     DefaultChainedExecutable jobFlow) {
-                jobFlow.addTask(
-                        steps.createMergeCuboidDataStep(seg, mergingSegments, jobFlow.getId(), MergeCuboidJob.class));
                 jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
                 jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
             }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
index c2794ff..622a0e8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
@@ -46,19 +46,25 @@ public class HBaseSparkSteps extends HBaseJobSteps {
         sparkExecutable.setParam(SparkCubeHFile.OPTION_META_URL.getOpt(),
                 jobBuilder2.getSegmentMetadataUrl(seg.getConfig(), jobId));
         sparkExecutable.setParam(SparkCubeHFile.OPTION_OUTPUT_PATH.getOpt(), getHFilePath(jobId));
-        sparkExecutable.setParam(SparkCubeHFile.OPTION_PARTITION_FILE_PATH.getOpt(), getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
+        sparkExecutable.setParam(SparkCubeHFile.OPTION_PARTITION_FILE_PATH.getOpt(),
+                getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
 
         sparkExecutable.setJobId(jobId);
 
         StringBuilder jars = new StringBuilder();
         StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.KeyValue.class));
-        StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.class));
-        StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.regionserver.BloomType.class));
-        StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.protobuf.generated.HFileProtos.class)); //hbase-protocal.jar
-        StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.CompatibilityFactory.class)); //hbase-hadoop-compact.jar
+        StringUtil.appendWithSeparator(jars,
+                ClassUtil.findContainingJar(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.class));
+        StringUtil.appendWithSeparator(jars,
+                ClassUtil.findContainingJar(org.apache.hadoop.hbase.regionserver.BloomType.class));
+        StringUtil.appendWithSeparator(jars,
+                ClassUtil.findContainingJar(org.apache.hadoop.hbase.protobuf.generated.HFileProtos.class)); //hbase-protocal.jar
+        StringUtil.appendWithSeparator(jars,
+                ClassUtil.findContainingJar(org.apache.hadoop.hbase.CompatibilityFactory.class)); //hbase-hadoop-compact.jar
         StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar("org.htrace.HTraceConfiguration", null)); // htrace-core.jar
         StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar("org.apache.htrace.Trace", null)); // htrace-core.jar
-        StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar("com.yammer.metrics.core.MetricsRegistry", null)); // metrics-core.jar
+        StringUtil.appendWithSeparator(jars,
+                ClassUtil.findContainingJar("com.yammer.metrics.core.MetricsRegistry", null)); // metrics-core.jar
 
         StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
         sparkExecutable.setJars(jars.toString());
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index a23156c..0136d4e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -30,7 +30,6 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
@@ -42,8 +41,8 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
@@ -57,6 +56,7 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.SerializableConfiguration;
 import org.apache.kylin.engine.spark.KylinSparkJobListener;
+import org.apache.kylin.engine.spark.SparkUtil;
 import org.apache.kylin.measure.MeasureCodec;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.spark.Partitioner;
@@ -94,8 +94,6 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
 
     private Options options;
 
-    private KylinSparkJobListener jobListener;
-
     public SparkCubeHFile() {
         options = new Options();
         options.addOption(OPTION_INPUT_PATH);
@@ -104,7 +102,6 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
         options.addOption(OPTION_META_URL);
         options.addOption(OPTION_OUTPUT_PATH);
         options.addOption(OPTION_PARTITION_FILE_PATH);
-        jobListener = new KylinSparkJobListener();
     }
 
     @Override
@@ -130,6 +127,7 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
         conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
         conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
 
+        KylinSparkJobListener jobListener = new KylinSparkJobListener();
         JavaSparkContext sc = new JavaSparkContext(conf);
         sc.sc().addSparkListener(jobListener);
         final FileSystem fs = partitionFilePath.getFileSystem(sc.hadoopConfiguration());
@@ -161,7 +159,7 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
         logger.info("Input path: {}", inputPath);
         logger.info("Output path: {}", outputPath);
 
-        List<JavaPairRDD> inputRDDs = parseInputPath(inputPath, fs, sc);
+        List<JavaPairRDD> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
         final JavaPairRDD<Text, Text> allCuboidFile = sc.union(inputRDDs.toArray(new JavaPairRDD[inputRDDs.size()]));
         final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
         if (quickPath) {
@@ -236,33 +234,15 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
             logger.debug(ioe.getMessage());
         }
 
-        hfilerdd2.saveAsNewAPIHadoopFile(outputPath, ImmutableBytesWritable.class, KeyValue.class,
-                HFileOutputFormat2.class, job.getConfiguration());
+        FileOutputFormat.setOutputPath(job, new Path(outputPath));
+        hfilerdd2.saveAsNewAPIHadoopDataset(job.getConfiguration());
 
         // output the data size to console, job engine will parse and save the metric
         // please note: this mechanism won't work when spark.submit.deployMode=cluster
-        System.out.println("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
-        deleteHDFSMeta(metaUrl);
+        logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
+        HadoopUtil.deleteHDFSMeta(metaUrl);
     }
 
-    private List<JavaPairRDD> parseInputPath(String inputPath, FileSystem fs, JavaSparkContext sc) throws IOException {
-        List<JavaPairRDD> inputRDDs = Lists.newArrayList();
-        Path inputHDFSPath = new Path(inputPath);
-        FileStatus[] fileStatuses = fs.listStatus(inputHDFSPath);
-        boolean hasDir = false;
-        for (FileStatus stat : fileStatuses) {
-            if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
-                hasDir = true;
-                inputRDDs.add(sc.sequenceFile(stat.getPath().toString(), Text.class, Text.class));
-            }
-        }
-
-        if (!hasDir) {
-            inputRDDs.add(sc.sequenceFile(inputHDFSPath.toString(), Text.class, Text.class));
-        }
-
-        return inputRDDs;
-    }
 
     static class HFilePartitioner extends Partitioner {
         private List<RowKeyWritable> keys;
@@ -298,10 +278,4 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
         }
     }
 
-    protected void deleteHDFSMeta(String metaUrl) throws IOException {
-        String realHdfsPath = StorageURL.valueOf(metaUrl).getParameter("path");
-        HadoopUtil.getFileSystem(realHdfsPath).delete(new Path(realHdfsPath), true);
-        logger.info("Delete metadata in HDFS for this job: " + realHdfsPath);
-    }
-
 }