You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/26 02:48:51 UTC

[kylin] branch master updated: KYLIN-3452 Optimize spark cubing memory footprint

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 312eaae  KYLIN-3452 Optimize spark cubing memory footprint
312eaae is described below

commit 312eaae7d469ed90984fc32f11d9b772e5f5bb89
Author: shaofengshi <sh...@apache.org>
AuthorDate: Sun Jul 15 18:01:33 2018 +0800

    KYLIN-3452 Optimize spark cubing memory footprint
---
 .../src/main/resources/kylin-defaults.properties   |   8 +-
 .../java/org/apache/kylin/cube/CubeManager.java    |   2 +-
 .../apache/kylin/cube/common/RowKeySplitter.java   |  38 ++++----
 .../java/org/apache/kylin/cube/cuboid/Cuboid.java  |   2 +-
 .../apache/kylin/cube/cuboid/CuboidManager.java    |  15 ++++
 .../org/apache/kylin/cube/kv/FuzzyMaskEncoder.java |   2 +-
 .../org/apache/kylin/cube/kv/RowKeyDecoder.java    |  10 +--
 .../org/apache/kylin/cube/kv/RowKeyEncoder.java    |   4 +-
 .../kylin/engine/mr/common/AbstractHadoopJob.java  |  11 ++-
 .../kylin/engine/mr/common/NDCuboidBuilder.java    |  54 +++++++----
 .../kylin/engine/mr/steps/NDCuboidMapper.java      |   3 +-
 .../kylin/engine/mr/steps/SegmentReEncoder.java    |  15 ++--
 .../mr/steps/UpdateOldCuboidShardMapper.java       |   7 +-
 .../kylin/engine/spark/SparkCubingByLayer.java     |  46 +++++-----
 .../kylin/engine/spark/SparkCubingMerge.java       |  29 +++---
 .../org/apache/kylin/engine/spark/SparkUtil.java   |  18 ++++
 .../common/coprocessor/CoprocessorProjector.java   |   2 +-
 .../kylin/storage/hbase/steps/SparkCubeHFile.java  | 100 ++++++++++-----------
 18 files changed, 208 insertions(+), 158 deletions(-)

diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index b391b5e..c01d3a8 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -290,9 +290,11 @@ kylin.engine.spark.max-partition=5000
 kylin.engine.spark-conf.spark.master=yarn
 #kylin.engine.spark-conf.spark.submit.deployMode=cluster
 kylin.engine.spark-conf.spark.yarn.queue=default
-kylin.engine.spark-conf.spark.executor.memory=1G
-kylin.engine.spark-conf.spark.executor.cores=2
-kylin.engine.spark-conf.spark.executor.instances=1
+kylin.engine.spark-conf.spark.driver.memory=2G
+kylin.engine.spark-conf.spark.executor.memory=4G
+kylin.engine.spark-conf.spark.executor.instances=40
+kylin.engine.spark-conf.spark.yarn.executor.memoryOverhead=1024
+kylin.engine.spark-conf.spark.shuffle.service.enabled=true
 kylin.engine.spark-conf.spark.eventLog.enabled=true
 kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
 kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 9d377d9..a4ed9c3 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -125,7 +125,7 @@ public class CubeManager implements IRealizationProvider {
     private DictionaryAssist dictAssist = new DictionaryAssist();
 
     private CubeManager(KylinConfig cfg) throws IOException {
-        logger.info("Initializing CubeManager with config " + config);
+        logger.info("Initializing CubeManager with config " + cfg);
         this.config = cfg;
         this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, "cube");
         this.crud = new CachedCrudAssist<CubeInstance>(getStore(), ResourceStore.CUBE_RESOURCE_ROOT, CubeInstance.class,
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
index d0015a7..264c7a5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
@@ -18,8 +18,8 @@
 
 package org.apache.kylin.cube.common;
 
+import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.SplittedBytes;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.CubeDimEncMap;
@@ -32,18 +32,17 @@ import org.apache.kylin.metadata.model.TblColRef;
 
 public class RowKeySplitter implements java.io.Serializable {
 
-    private CubeSegment cubeSegment;
     private CubeDesc cubeDesc;
     private RowKeyColumnIO colIO;
 
-    private SplittedBytes[] splitBuffers;
+    private ByteArray[] splitBuffers;
     private int[] splitOffsets;
     private int bufferSize;
 
     private boolean enableSharding;
     private short shardId;
 
-    public SplittedBytes[] getSplitBuffers() {
+    public ByteArray[] getSplitBuffers() {
         return splitBuffers;
     }
 
@@ -67,7 +66,6 @@ public class RowKeySplitter implements java.io.Serializable {
         this(cubeSeg, cubeSeg.getCubeDesc().getRowkey().getRowKeyColumns().length + 2, cubeSeg.getConfig().getDimensionEncodingMaxLength());
     }
     public RowKeySplitter(CubeSegment cubeSeg, int splitLen, int bytesLen) {
-        this.cubeSegment = cubeSeg;
         this.enableSharding = cubeSeg.isEnableSharding();
         this.cubeDesc = cubeSeg.getCubeDesc();
         IDimensionEncodingMap dimEncoding = new CubeDimEncMap(cubeSeg);
@@ -78,11 +76,8 @@ public class RowKeySplitter implements java.io.Serializable {
 
         this.colIO = new RowKeyColumnIO(dimEncoding);
 
-        this.splitBuffers = new SplittedBytes[splitLen];
+        this.splitBuffers = new ByteArray[splitLen];
         this.splitOffsets = new int[splitLen];
-        for (int i = 0; i < splitLen; i++) {
-            this.splitBuffers[i] = new SplittedBytes(bytesLen);
-        }
         this.bufferSize = 0;
     }
 
@@ -93,6 +88,11 @@ public class RowKeySplitter implements java.io.Serializable {
         return null;
     }
 
+
+    public long parseCuboid(byte[] bytes) {
+        int offset = enableSharding ? RowConstants.ROWKEY_SHARDID_LEN : 0;
+        return Bytes.toLong(bytes, offset, RowConstants.ROWKEY_CUBOIDID_LEN);
+    }
     /**
      * @param bytes
      * @return cuboid ID
@@ -103,21 +103,18 @@ public class RowKeySplitter implements java.io.Serializable {
 
         if (enableSharding) {
             // extract shard
-            SplittedBytes shardSplit = this.splitBuffers[this.bufferSize++];
-            shardSplit.length = RowConstants.ROWKEY_SHARDID_LEN;
-            System.arraycopy(bytes, offset, shardSplit.value, 0, RowConstants.ROWKEY_SHARDID_LEN);
+            ByteArray shardSplit = new ByteArray(bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
+            this.splitBuffers[this.bufferSize++] = shardSplit;
             offset += RowConstants.ROWKEY_SHARDID_LEN;
             //lastSplittedShard = Bytes.toShort(shardSplit.value, 0, shardSplit.length);
-            shardId = Bytes.toShort(shardSplit.value);
+            shardId = Bytes.toShort(shardSplit.array(), shardSplit.offset());
         }
 
         // extract cuboid id
-        SplittedBytes cuboidIdSplit = this.splitBuffers[this.bufferSize++];
-        cuboidIdSplit.length = RowConstants.ROWKEY_CUBOIDID_LEN;
-        System.arraycopy(bytes, offset, cuboidIdSplit.value, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
+        ByteArray cuboidIdSplit = new ByteArray(bytes, offset, RowConstants.ROWKEY_CUBOIDID_LEN);
+        this.splitBuffers[this.bufferSize++] = cuboidIdSplit;
         offset += RowConstants.ROWKEY_CUBOIDID_LEN;
-
-        long lastSplittedCuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length);
+        long lastSplittedCuboidId = Bytes.toLong(cuboidIdSplit.array(), cuboidIdSplit.offset(), RowConstants.ROWKEY_CUBOIDID_LEN);
         Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, lastSplittedCuboidId);
 
         // rowkey columns
@@ -125,9 +122,8 @@ public class RowKeySplitter implements java.io.Serializable {
             splitOffsets[i] = offset;
             TblColRef col = cuboid.getColumns().get(i);
             int colLength = colIO.getColumnLength(col);
-            SplittedBytes split = this.splitBuffers[this.bufferSize++];
-            split.length = colLength;
-            System.arraycopy(bytes, offset, split.value, 0, colLength);
+            ByteArray split = new ByteArray(bytes, offset, colLength);
+            this.splitBuffers[this.bufferSize++] = split;
             offset += colLength;
         }
 
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
index 7d332bb..699b865 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
@@ -54,7 +54,7 @@ public class Cuboid implements Comparable<Cuboid>, Serializable {
 
     // for mandatory cuboid, no need to translate cuboid
     public static Cuboid findForMandatory(CubeDesc cube, long cuboidID) {
-        return new Cuboid(cube, cuboidID, cuboidID);
+        return CuboidManager.getInstance(cube.getConfig()).findMandatoryId(cube, cuboidID);
     }
     
     public static Cuboid findCuboid(CuboidScheduler cuboidScheduler, Set<TblColRef> dimensions,
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidManager.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidManager.java
index 6efc87f..648447c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidManager.java
@@ -25,6 +25,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 
 import com.google.common.collect.Maps;
+import org.apache.kylin.cube.model.CubeDesc;
 
 /**
  * A cuboid cache.
@@ -65,6 +66,20 @@ public class CuboidManager {
         return cuboid;
     }
 
+    public Cuboid findMandatoryId(CubeDesc cubeDesc, long cuboidID) {
+        Map<Long, Cuboid> cubeCache = schedulerCuboidCache.get(cubeDesc.getName());
+        if (cubeCache == null) {
+            cubeCache = Maps.newConcurrentMap();
+            schedulerCuboidCache.put(cubeDesc.getName(), cubeCache);
+        }
+        Cuboid cuboid = cubeCache.get(cuboidID);
+        if (cuboid == null) {
+            cuboid = new Cuboid(cubeDesc, cuboidID, cuboidID);
+            cubeCache.put(cuboidID, cuboid);
+        }
+        return cuboid;
+    }
+
     public void clearCache(String cacheKey) {
         schedulerCuboidCache.remove(cacheKey);
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
index 0cbb7d2..dd7157d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
@@ -65,7 +65,7 @@ public class FuzzyMaskEncoder extends RowKeyEncoder {
     }
 
     @Override
-    protected void fillHeader(byte[] bytes) {
+    public void fillHeader(byte[] bytes) {
         int offset = 0;
         if (enableSharding) {
             Arrays.fill(bytes, 0, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE);
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
index 923ddb8..71ad4bf 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -56,13 +56,13 @@ public class RowKeyDecoder {
         long cuboidId = rowKeySplitter.split(bytes);
         initCuboid(cuboidId);
 
-        SplittedBytes[] splits = rowKeySplitter.getSplitBuffers();
+        ByteArray[] splits = rowKeySplitter.getSplitBuffers();
 
         int offset = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboid id part
 
         for (int i = 0; i < this.cuboid.getColumns().size(); i++) {
             TblColRef col = this.cuboid.getColumns().get(i);
-            collectValue(col, splits[offset].value, splits[offset].length);
+            collectValue(col, splits[offset].array(), splits[offset].offset(), splits[offset].length());
             offset++;
         }
 
@@ -76,8 +76,8 @@ public class RowKeyDecoder {
         this.cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID);
     }
 
-    private void collectValue(TblColRef col, byte[] valueBytes, int length) throws IOException {
-        String strValue = colIO.readColumnString(col, valueBytes, 0, length);
+    private void collectValue(TblColRef col, byte[] valueBytes, int offset, int length) throws IOException {
+        String strValue = colIO.readColumnString(col, valueBytes, offset, length);
         values.add(strValue);
     }
 
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
index 40cda76..8a2c484 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
@@ -158,9 +158,9 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder implements java.io.Seri
         return bytes;
     }
 
-    protected void fillHeader(byte[] bytes) {
-        int offset = 0;
 
+    public void fillHeader(byte[] bytes) {
+        int offset = 0;
         if (enableSharding) {
             short shard = calculateShard(bytes);
             BytesUtil.writeShort(shard, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index fa3c22e..716eafe 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -75,6 +75,8 @@ import org.apache.kylin.metadata.project.ProjectManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
+
 @SuppressWarnings("static-access")
 public abstract class AbstractHadoopJob extends Configured implements Tool {
     private static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class);
@@ -127,6 +129,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
 
     private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
 
+    private static final Map<String, KylinConfig> kylinConfigCache = Maps.newConcurrentMap();
+
     protected static void runJob(Tool job, String[] args) {
         try {
             int exitCode = ToolRunner.run(job, args);
@@ -488,6 +492,11 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         if (!uri.contains("@hdfs"))
             throw new IllegalArgumentException("meta url should like @hdfs schema");
 
+        if (kylinConfigCache.get(uri) != null) {
+            logger.info("KylinConfig cached for : {}", uri);
+            return kylinConfigCache.get(uri);
+        }
+
         logger.info("Ready to load KylinConfig from uri: {}", uri);
         KylinConfig config;
         FileSystem fs;
@@ -505,7 +514,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         // limitation of MR API. It works because MR task runs its own process. Do not copy.
         @SuppressWarnings("unused")
         SetAndUnsetThreadLocalConfig shouldAutoClose = KylinConfig.setAndUnsetThreadLocalConfig(config);
-        
+        kylinConfigCache.put(uri, config);
         return config;
     }
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java
index 4d9885e..6ad2619 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SplittedBytes;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -43,8 +42,7 @@ public class NDCuboidBuilder implements Serializable {
     protected CubeSegment cubeSegment;
     private RowKeySplitter rowKeySplitter;
     private RowKeyEncoderProvider rowKeyEncoderProvider;
-    private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];
-    private ByteArray newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
+    private ByteArray newKeyBodyBuf = null;
 
     public NDCuboidBuilder(CubeSegment cubeSegment) {
         this(cubeSegment, new RowKeyEncoderProvider(cubeSegment));
@@ -56,11 +54,42 @@ public class NDCuboidBuilder implements Serializable {
         this.rowKeySplitter = new RowKeySplitter(cubeSegment);
     }
 
+    /**
+     * Build the new key, return a reused ByteArray object. Suitable for MR
+     * @param parentCuboid
+     * @param childCuboid
+     * @param splitBuffers
+     * @return
+     */
+    public Pair<Integer, ByteArray> buildKey(Cuboid parentCuboid, Cuboid childCuboid, ByteArray[] splitBuffers) {
+        RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(childCuboid);
+        int fullKeySize = rowkeyEncoder.getBytesLength();
+        if (newKeyBodyBuf == null || newKeyBodyBuf.length() < fullKeySize) {
+            newKeyBodyBuf = new ByteArray(fullKeySize);
+        }
+
+        buildKeyInternal(parentCuboid, childCuboid, splitBuffers, newKeyBodyBuf);
+        return new Pair<>(Integer.valueOf(fullKeySize), newKeyBodyBuf);
 
-    public Pair<Integer, ByteArray> buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) {
+    }
+
+    /**
+     * Build the new key, return a new ByteArray object each time. Suitable for spark
+     * @param parentCuboid
+     * @param childCuboid
+     * @param splitBuffers
+     * @return
+     */
+    public ByteArray buildKey2(Cuboid parentCuboid, Cuboid childCuboid, ByteArray[] splitBuffers) {
         RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(childCuboid);
+        int fullKeySize = rowkeyEncoder.getBytesLength();
+        ByteArray newKey = new ByteArray(fullKeySize);
+        buildKeyInternal(parentCuboid, childCuboid, splitBuffers, newKey);
+        return newKey;
+    }
 
-        int offset = 0;
+    private void buildKeyInternal(Cuboid parentCuboid, Cuboid childCuboid, ByteArray[] splitBuffers, ByteArray newKeyBodyBuf) {
+        RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(childCuboid);
 
         // rowkey columns
         long mask = Long.highestOneBit(parentCuboid.getId());
@@ -68,28 +97,21 @@ public class NDCuboidBuilder implements Serializable {
         long childCuboidId = childCuboid.getId();
         long parentCuboidIdActualLength = (long)Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId());
         int index = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId
+        int offset = RowConstants.ROWKEY_SHARDID_LEN + RowConstants.ROWKEY_CUBOIDID_LEN; // skip shard and cuboidId
         for (int i = 0; i < parentCuboidIdActualLength; i++) {
             if ((mask & parentCuboidId) > 0) {// if the this bit position equals
                 // 1
                 if ((mask & childCuboidId) > 0) {// if the child cuboid has this
                     // column
-                    System.arraycopy(splitBuffers[index].value, 0, newKeyBodyBuf, offset, splitBuffers[index].length);
-                    offset += splitBuffers[index].length;
+                    System.arraycopy(splitBuffers[index].array(), splitBuffers[index].offset(), newKeyBodyBuf.array(), offset, splitBuffers[index].length());
+                    offset += splitBuffers[index].length();
                 }
                 index++;
             }
             mask = mask >> 1;
         }
 
-        int fullKeySize = rowkeyEncoder.getBytesLength();
-        while (newKeyBuf.array().length < fullKeySize) {
-            newKeyBuf = new ByteArray(newKeyBuf.length() * 2);
-        }
-        newKeyBuf.setLength(fullKeySize);
-
-        rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), newKeyBuf);
-
-        return new Pair<>(Integer.valueOf(fullKeySize), newKeyBuf);
+        rowkeyEncoder.fillHeader(newKeyBodyBuf.array());
     }
 
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index f2ec34c..0581f4e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -106,9 +106,10 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
             logger.info("Parent cuboid: " + parentCuboid.getId() + "; Children: " + myChildren);
         }
 
+        Pair<Integer, ByteArray> result;
         for (Long child : myChildren) {
             Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, child);
-            Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
+            result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
             outputKey.set(result.getSecond().array(), 0, result.getFirst());
             context.write(outputKey, value);
         }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java
index 856a59f..35fa8af 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java
@@ -30,7 +30,6 @@ import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SplittedBytes;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -181,7 +180,7 @@ public class SegmentReEncoder implements Serializable {
         Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID);
         RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
 
-        SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
+        ByteArray[] splittedByteses = rowKeySplitter.getSplitBuffers();
         int bufOffset = 0;
         int bodySplitOffset = rowKeySplitter.getBodySplitOffset();
 
@@ -217,8 +216,8 @@ public class SegmentReEncoder implements Serializable {
                     System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
                 }
 
-                int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0,
-                        splittedByteses[useSplit].length);
+                int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].array(), splittedByteses[useSplit].offset(),
+                        splittedByteses[useSplit].length());
                 int idInMergedDict;
 
                 //int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
@@ -233,15 +232,15 @@ public class SegmentReEncoder implements Serializable {
                 bufOffset += mergedDict.getSizeOfId();
             } else {
                 // keep as it is
-                while (splittedByteses[useSplit].length > newKeyBodyBuf.length - bufOffset) {
+                while (splittedByteses[useSplit].length() > newKeyBodyBuf.length - bufOffset) {
                     byte[] oldBuf = newKeyBodyBuf;
                     newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
                     System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
                 }
 
-                System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset,
-                        splittedByteses[useSplit].length);
-                bufOffset += splittedByteses[useSplit].length;
+                System.arraycopy(splittedByteses[useSplit].array(), splittedByteses[useSplit].offset(), newKeyBodyBuf, bufOffset,
+                        splittedByteses[useSplit].length());
+                bufOffset += splittedByteses[useSplit].length();
             }
         }
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
index 35b8924..3d18bd6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.SplittedBytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -98,15 +97,15 @@ public class UpdateOldCuboidShardMapper extends KylinMapper<Text, Text, Text, Te
         mos.write(outputKey, value, generateFileName(baseOutputPath));
     }
 
-    private int buildKey(Cuboid cuboid, SplittedBytes[] splitBuffers) {
+    private int buildKey(Cuboid cuboid, ByteArray[] splitBuffers) {
         RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
 
         int startIdx = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId
         int endIdx = startIdx + Long.bitCount(cuboid.getId());
         int offset = 0;
         for (int i = startIdx; i < endIdx; i++) {
-            System.arraycopy(splitBuffers[i].value, 0, newKeyBodyBuf, offset, splitBuffers[i].length);
-            offset += splitBuffers[i].length;
+            System.arraycopy(splitBuffers[i].array(), splitBuffers[i].offset(), newKeyBodyBuf, offset, splitBuffers[i].length());
+            offset += splitBuffers[i].length();
         }
 
         int fullKeySize = rowkeyEncoder.getBytesLength();
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index e95e433..cb3af31 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -20,14 +20,12 @@ package org.apache.kylin.engine.spark;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
@@ -38,7 +36,6 @@ import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -138,21 +135,19 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         JavaSparkContext sc = new JavaSparkContext(conf);
         sc.sc().addSparkListener(jobListener);
         HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+        SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress
         final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
-
         KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
 
         final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
         final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
 
-        Configuration confOverwrite = new Configuration(sc.hadoopConfiguration());
-        confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2
-        final Job job = Job.getInstance(confOverwrite);
-
         logger.info("RDD input path: {}", inputPath);
         logger.info("RDD Output path: {}", outputPath);
-        setHadoopConf(job, cubeSegment, metaUrl);
+
+        final Job job = Job.getInstance(sConf.get());
+        SparkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
 
         int countMeasureIndex = 0;
         for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
@@ -227,11 +222,13 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
 
         saveToHDFS(allRDDs[0], metaUrl, cubeName, cubeSegment, outputPath, 0, job, envConfig);
 
+        PairFlatMapFunction flatMapFunction = new CuboidFlatMap(cubeName, segmentId, metaUrl, sConf);
         // aggregate to ND cuboids
         for (level = 1; level <= totalLevels; level++) {
             partition = SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
-            allRDDs[level] = allRDDs[level - 1].flatMapToPair(new CuboidFlatMap(cubeName, segmentId, metaUrl, sConf))
-                    .reduceByKey(reducerFunction2, partition).persist(storageLevel);
+
+            allRDDs[level] = allRDDs[level - 1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, partition)
+                    .persist(storageLevel);
             allRDDs[level - 1].unpersist();
             if (envConfig.isSparkSanityCheckEnabled() == true) {
                 sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex);
@@ -241,13 +238,9 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         allRDDs[totalLevels].unpersist();
         logger.info("Finished on calculating all level cuboids.");
         logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
-        HadoopUtil.deleteHDFSMeta(metaUrl);
+        //HadoopUtil.deleteHDFSMeta(metaUrl);
     }
 
-    protected void setHadoopConf(Job job, CubeSegment segment, String metaUrl) throws Exception {
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(Text.class);
-    }
 
 
     protected JavaPairRDD<ByteArray, Object[]> prepareOutput(JavaPairRDD<ByteArray, Object[]> rdd, KylinConfig config,
@@ -277,6 +270,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
                             synchronized (SparkCubingByLayer.class) {
                                 if (initialized == false) {
                                     KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+                                    KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig);
                                     CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
                                     codec = new BufferedMeasureCodec(desc.getMeasures());
                                     initialized = true;
@@ -315,6 +309,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
                 synchronized (SparkCubingByLayer.class) {
                     if (initialized == false) {
                         KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+                        KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
                         CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
                         CubeDesc cubeDesc = cubeInstance.getDescriptor();
                         CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
@@ -353,6 +348,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
 
         public void init() {
             KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
             CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
             cubeDesc = cubeInstance.getDescriptor();
             aggregators = new MeasureAggregators(cubeDesc.getMeasures());
@@ -422,6 +418,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
 
         public void init() {
             KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
             CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
             this.cubeSegment = cubeInstance.getSegmentById(segmentId);
             this.cubeDesc = cubeInstance.getDescriptor();
@@ -430,7 +427,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         }
 
         @Override
-        public Iterator<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
+        public Iterator<Tuple2<ByteArray, Object[]>> call(final Tuple2<ByteArray, Object[]> tuple2) throws Exception {
             if (initialized == false) {
                 synchronized (SparkCubingByLayer.class) {
                     if (initialized == false) {
@@ -441,26 +438,23 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
             }
 
             byte[] key = tuple2._1().array();
-            long cuboidId = rowKeySplitter.split(key);
-            Cuboid parentCuboid = Cuboid.findForMandatory(cubeDesc, cuboidId);
-
-            Collection<Long> myChildren = cubeSegment.getCuboidScheduler().getSpanningCuboid(cuboidId);
+            long cuboidId = rowKeySplitter.parseCuboid(key);
+            final List<Long> myChildren = cubeSegment.getCuboidScheduler().getSpanningCuboid(cuboidId);
 
             // if still empty or null
             if (myChildren == null || myChildren.size() == 0) {
                 return EMTPY_ITERATOR.iterator();
             }
+            rowKeySplitter.split(key);
+            final Cuboid parentCuboid = Cuboid.findForMandatory(cubeDesc, cuboidId);
 
             List<Tuple2<ByteArray, Object[]>> tuples = new ArrayList(myChildren.size());
             for (Long child : myChildren) {
                 Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, child);
-                Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid,
+                ByteArray result = ndCuboidBuilder.buildKey2(parentCuboid, childCuboid,
                         rowKeySplitter.getSplitBuffers());
 
-                byte[] newKey = new byte[result.getFirst()];
-                System.arraycopy(result.getSecond().array(), 0, newKey, 0, result.getFirst());
-
-                tuples.add(new Tuple2<>(new ByteArray(newKey), tuple2._2()));
+                tuples.add(new Tuple2<>(result, tuple2._2()));
             }
 
             return tuples.iterator();
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
index 7fd4123..74a2313 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -24,13 +24,11 @@ import java.util.List;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.HadoopUtil;
@@ -116,6 +114,7 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
         conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
 
         JavaSparkContext sc = new JavaSparkContext(conf);
+        SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress
         KylinSparkJobListener jobListener = new KylinSparkJobListener();
         sc.sc().addSparkListener(jobListener);
 
@@ -131,12 +130,9 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
         logger.info("Input path: {}", inputPath);
         logger.info("Output path: {}", outputPath);
 
-        Configuration confOverwrite = new Configuration(sc.hadoopConfiguration());
-        confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2
-        final Job job = Job.getInstance(confOverwrite);
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(Text.class);
-        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        final Job job = Job.getInstance(sConf.get());
+
+        SparkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
 
         final MeasureAggregators aggregators = new MeasureAggregators(cubeDesc.getMeasures());
         final Function2 reduceFunction = new Function2<Object[], Object[], Object[]>() {
@@ -157,12 +153,17 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
                     throws Exception {
 
                 if (initialized == false) {
-                    synchronized (SparkCubingByLayer.class) {
+                    synchronized (SparkCubingMerge.class) {
                         if (initialized == false) {
-                            KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
-                            CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
-                            codec = new BufferedMeasureCodec(desc.getMeasures());
-                            initialized = true;
+                            synchronized (SparkCubingMerge.class) {
+                                if (initialized == false) {
+                                    KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+                                    KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig);
+                                    CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+                                    codec = new BufferedMeasureCodec(desc.getMeasures());
+                                    initialized = true;
+                                }
+                            }
                         }
                     }
                 }
@@ -231,7 +232,7 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
         // output the data size to console, job engine will parse and save the metric
         // please note: this mechanism won't work when spark.submit.deployMode=cluster
         logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
-        //HadoopUtil.deleteHDFSMeta(metaUrl);
+        //        HadoopUtil.deleteHDFSMeta(metaUrl);
     }
 
     static class ReEncodCuboidFunction implements PairFunction<Tuple2<Text, Text>, Text, Object[]> {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index e7c6ee6..31eebc8 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -24,6 +24,9 @@ import java.util.List;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
@@ -33,6 +36,7 @@ import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.storage.StorageFactory;
+import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
@@ -114,4 +118,18 @@ public class SparkUtil {
         return partition;
     }
 
+
+    public static void setHadoopConfForCuboid(Job job, CubeSegment segment, String metaUrl) throws Exception {
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(Text.class);
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    }
+
+    public static void modifySparkHadoopConfiguration(SparkContext sc) throws Exception {
+        sc.hadoopConfiguration().set("dfs.replication", "2"); // cuboid intermediate files, replication=2
+        sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");
+        sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
+        sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.DefaultCodec"); // or org.apache.hadoop.io.compress.SnappyCodec
+  }
+
 }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
index f6332f4..3f8ea53 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
@@ -40,7 +40,7 @@ public class CoprocessorProjector {
 
         RowKeyEncoder rowKeyMaskEncoder = new RowKeyEncoder(cubeSegment, cuboid) {
             @Override
-            protected void fillHeader(byte[] bytes) {
+            public void fillHeader(byte[] bytes) {
                 Arrays.fill(bytes, 0, this.getHeaderLength(), (byte) 0xff);
             }
 
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index c435d9d..fd8459f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -158,42 +158,6 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
 
         logger.info("Input path: {}", inputPath);
         logger.info("Output path: {}", outputPath);
-
-        JavaPairRDD<Text, Text> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
-        final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
-        if (quickPath) {
-            hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
-                @Override
-                public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
-                    KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
-                            textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength());
-                    return new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
-                }
-            });
-        } else {
-            hfilerdd = inputRDDs
-                    .flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
-                        @Override
-                        public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
-                                throws Exception {
-
-                            List<Tuple2<RowKeyWritable, KeyValue>> result = Lists.newArrayListWithExpectedSize(cfNum);
-                            Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
-                            inputCodec.decode(
-                                    ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength()),
-                                    inputMeasures);
-
-                            for (int i = 0; i < cfNum; i++) {
-                                KeyValue outputValue = keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
-                                result.add(new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
-                                        outputValue));
-                            }
-
-                            return result.iterator();
-                        }
-                    });
-        }
-
         // read partition split keys
         List<RowKeyWritable> keys = new ArrayList<>();
         try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, partitionFilePath, sc.hadoopConfiguration())) {
@@ -207,40 +171,70 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
         }
 
         logger.info("There are " + keys.size() + " split keys, totally " + (keys.size() + 1) + " hfiles");
-
-        final JavaPairRDD<ImmutableBytesWritable, KeyValue> hfilerdd2 = hfilerdd
-                .repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
-                        RowKeyWritable.RowKeyComparator.INSTANCE)
-                .mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, ImmutableBytesWritable, KeyValue>() {
-                    @Override
-                    public Tuple2<ImmutableBytesWritable, KeyValue> call(
-                            Tuple2<RowKeyWritable, KeyValue> rowKeyWritableKeyValueTuple2) throws Exception {
-                        return new Tuple2<>(new ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
-                                rowKeyWritableKeyValueTuple2._2);
-                    }
-                });
-
         Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
         HadoopUtil.healSickConfig(hbaseConf);
         Job job = new Job(hbaseConf, cubeSegment.getStorageLocationIdentifier());
+        job.getConfiguration().set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
         HTable table = new HTable(hbaseConf, cubeSegment.getStorageLocationIdentifier());
         try {
             HFileOutputFormat2.configureIncrementalLoadMap(job, table);
         } catch (IOException ioe) {
             // this can be ignored.
-            logger.debug(ioe.getMessage());
+            logger.debug(ioe.getMessage(), ioe);
         }
 
         FileOutputFormat.setOutputPath(job, new Path(outputPath));
-        hfilerdd2.saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+        JavaPairRDD<Text, Text> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
+        final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
+        if (quickPath) {
+            hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+                @Override
+                public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
+                    KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
+                            textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength());
+                    return new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
+                }
+            });
+        } else {
+            hfilerdd = inputRDDs.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+                @Override
+                public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
+                        throws Exception {
+
+                    List<Tuple2<RowKeyWritable, KeyValue>> result = Lists.newArrayListWithExpectedSize(cfNum);
+                    Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
+                    inputCodec.decode(ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength()),
+                            inputMeasures);
+
+                    for (int i = 0; i < cfNum; i++) {
+                        KeyValue outputValue = keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
+                        result.add(new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
+                                outputValue));
+                    }
+
+                    return result.iterator();
+                }
+            });
+        }
+
+        hfilerdd.repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
+                RowKeyWritable.RowKeyComparator.INSTANCE)
+                .mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, ImmutableBytesWritable, KeyValue>() {
+                    @Override
+                    public Tuple2<ImmutableBytesWritable, KeyValue> call(
+                            Tuple2<RowKeyWritable, KeyValue> rowKeyWritableKeyValueTuple2) throws Exception {
+                        return new Tuple2<>(new ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
+                                rowKeyWritableKeyValueTuple2._2);
+                    }
+                }).saveAsNewAPIHadoopDataset(job.getConfiguration());
 
         // output the data size to console, job engine will parse and save the metric
         // please note: this mechanism won't work when spark.submit.deployMode=cluster
         logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
-        HadoopUtil.deleteHDFSMeta(metaUrl);
+        //HadoopUtil.deleteHDFSMeta(metaUrl);
     }
 
-
     static class HFilePartitioner extends Partitioner {
         private List<RowKeyWritable> keys;