You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/05 03:18:33 UTC

[kylin] branch master updated: KYLIN-3427 covnert to HFile in Spark

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 54cd2c7  KYLIN-3427 covnert to HFile in Spark
54cd2c7 is described below

commit 54cd2c793bb5487cefc6def073a902ae73e5c5d4
Author: shaofengshi <sh...@apache.org>
AuthorDate: Sat Jun 30 10:45:47 2018 +0800

    KYLIN-3427 covnert to HFile in Spark
---
 build/conf/kylin-spark-log4j.properties            |  43 +++
 .../java/org/apache/kylin/common/KylinConfig.java  |   2 +-
 .../org/apache/kylin/common/util/BytesUtil.java    |  15 +
 .../org/apache/kylin/common/util/ClassUtil.java    |  10 +
 .../src/main/resources/kylin-defaults.properties   |   8 +-
 .../org/apache/kylin/metadata/model/JoinsTree.java |   2 +-
 .../org/apache/kylin/metrics/MetricsManager.java   |   2 +-
 .../kylin/engine/mr/BatchCubingJobBuilder2.java    |  30 +-
 .../kylin/engine/mr/BatchMergeJobBuilder2.java     |  14 -
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |  55 ++++
 engine-spark/pom.xml                               |  11 -
 .../org/apache/kylin/engine/spark/ISparkInput.java |  60 ++++
 .../apache/kylin/engine/spark/ISparkOutput.java    | 129 ++++++++
 .../kylin/engine/spark/KylinKryoRegistrator.java   |   5 +-
 .../kylin/engine/spark/KylinSparkJobListener.java  |  55 ++++
 .../engine/spark/SparkBatchCubingEngine2.java      |  39 ++-
 .../engine/spark/SparkBatchCubingJobBuilder2.java  |  66 ++--
 .../engine/spark/SparkBatchMergeJobBuilder2.java   |  36 +--
 .../kylin/engine/spark/SparkCubingByLayer.java     |   5 +-
 .../apache/kylin/engine/spark/SparkExecutable.java |  28 +-
 .../org/apache/kylin/engine/spark/SparkUtil.java   |  63 ++++
 source-hive/pom.xml                                |   8 +
 .../kylin/source/hive/GarbageCollectionStep.java   | 117 +++++++
 .../apache/kylin/source/hive/HiveInputBase.java    | 167 ++++++++++
 .../org/apache/kylin/source/hive/HiveMRInput.java  | 336 +--------------------
 .../org/apache/kylin/source/hive/HiveSource.java   |   3 +
 .../apache/kylin/source/hive/HiveSparkInput.java   | 125 ++++++++
 .../source/hive/RedistributeFlatHiveTableStep.java | 147 +++++++++
 .../apache/kylin/source/jdbc/JdbcHiveMRInput.java  |   2 +-
 .../apache/kylin/source/hive/HiveMRInputTest.java  |   5 +-
 .../apache/kylin/source/kafka/KafkaInputBase.java  | 163 ++++++++++
 .../apache/kylin/source/kafka/KafkaMRInput.java    | 178 +----------
 .../org/apache/kylin/source/kafka/KafkaSource.java |   3 +
 .../apache/kylin/source/kafka/KafkaSparkInput.java | 120 ++++++++
 storage-hbase/pom.xml                              |  12 +
 .../apache/kylin/storage/hbase/HBaseStorage.java   |  19 +-
 .../{HBaseMRSteps.java => HBaseJobSteps.java}      |  47 +--
 .../hbase/steps/HBaseMROutput2Transition.java      |   8 +-
 .../kylin/storage/hbase/steps/HBaseMRSteps.java    | 213 +------------
 .../hbase/steps/HBaseSparkOutputTransition.java    | 100 ++++++
 .../kylin/storage/hbase/steps/HBaseSparkSteps.java |  72 +++++
 .../kylin/storage/hbase/steps/KeyValueCreator.java |   3 +-
 .../kylin/storage/hbase/steps/RowKeyWritable.java  |  41 ++-
 .../kylin/storage/hbase/steps/SparkCubeHFile.java  | 305 +++++++++++++++++++
 .../storage/hbase/steps/HFilePartitionerTest.java  |  65 ++++
 45 files changed, 2074 insertions(+), 863 deletions(-)

diff --git a/build/conf/kylin-spark-log4j.properties b/build/conf/kylin-spark-log4j.properties
new file mode 100644
index 0000000..948fb32
--- /dev/null
+++ b/build/conf/kylin-spark-log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+log4j.rootCategory=WARN,stderr,stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n
+
+log4j.appender.stderr=org.apache.log4j.ConsoleAppender
+log4j.appender.stderr.Target=System.err
+log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
+log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n
+
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.spark-project.jetty=WARN
+log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
+log4j.logger.org.apache.parquet=ERROR
+log4j.logger.parquet=ERROR
+
+# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
+log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
+log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
+log4j.logger.org.apache.spark.sql=WARN
+
+log4j.logger.org.apache.kylin=DEBUG
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 81c9b89..4b3b7c3 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -400,7 +400,7 @@ public class KylinConfig extends KylinConfigBase {
 
     // ============================================================================
     
-    Map<Class, Object> managersCache = new ConcurrentHashMap<>();
+    transient Map<Class, Object> managersCache = new ConcurrentHashMap<>();
 
     private KylinConfig() {
         super();
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
index 0aa47be..4a54c48 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
@@ -457,4 +457,19 @@ public class BytesUtil {
         return sb.toString();
     }
 
+    /**
+     *
+     * @param hex String value of a byte array in hex, e.g, "\\x00\\x0A";
+     * @return the byte array that the hex represented.
+     */
+    public static byte[] fromHex(String hex) {
+        byte[] b = new byte[hex.length() / 4];
+        for (int i = 0; i < b.length; i++) {
+            int index = i * 4;
+            int v = Integer.parseInt(hex.substring(index + 2, index + 4), 16);
+            b[i] = (byte) v;
+        }
+        return b;
+    }
+
 }
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java
index a54a24d..bfc6ce1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java
@@ -134,4 +134,14 @@ public class ClassUtil {
             throw new RuntimeException(var6);
         }
     }
+
+    public static String findContainingJar(String className, String perferLibraryName) {
+        try {
+            return findContainingJar(Class.forName(className), perferLibraryName);
+        } catch (ClassNotFoundException e) {
+            logger.warn("failed to locate jar for class " + className + ", ignore it");
+        }
+
+        return "";
+    }
 }
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 37520ef..b391b5e 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -66,10 +66,10 @@ kylin.engine.default=2
 kylin.storage.default=2
 kylin.web.hive-limit=20
 kylin.web.help.length=4
-kylin.web.help.0=start|Getting Started|http://kylin.apache.org/docs21/tutorial/kylin_sample.html
-kylin.web.help.1=odbc|ODBC Driver|http://kylin.apache.org/docs21/tutorial/odbc.html
-kylin.web.help.2=tableau|Tableau Guide|http://kylin.apache.org/docs21/tutorial/tableau_91.html
-kylin.web.help.3=onboard|Cube Design Tutorial|http://kylin.apache.org/docs21/howto/howto_optimize_cubes.html
+kylin.web.help.0=start|Getting Started|http://kylin.apache.org/docs/tutorial/kylin_sample.html
+kylin.web.help.1=odbc|ODBC Driver|http://kylin.apache.org/docs/tutorial/odbc.html
+kylin.web.help.2=tableau|Tableau Guide|http://kylin.apache.org/docs/tutorial/tableau_91.html
+kylin.web.help.3=onboard|Cube Design Tutorial|http://kylin.apache.org/docs/howto/howto_optimize_cubes.html
 kylin.web.link-streaming-guide=http://kylin.apache.org/
 kylin.htrace.show-gui-trace-toggle=false
 kylin.web.link-hadoop=
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
index d86a74a..a29d76e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
@@ -203,7 +203,7 @@ public class JoinsTree implements Serializable {
         boolean matches(JoinDesc join1, JoinDesc join2);
     }
 
-    public static class DefaultJoinDescMatcher implements IJoinDescMatcher {
+    public static class DefaultJoinDescMatcher implements IJoinDescMatcher, Serializable {
         @Override
         public boolean matches(JoinDesc join1, JoinDesc join2) {
             if (join1 == null) {
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
index e7f042e..bcfb275 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
@@ -132,7 +132,7 @@ public class MetricsManager {
             Preconditions.checkArgument(activeReservoirPointers.size() == sourceReporterBindProps.keySet().size(),
                     "Duplicate register names exist!!!");
         } else {
-            logger.info("Kylin metrics monitor is not enabled!!!");
+            logger.info("Kylin metrics monitor is not enabled");
         }
     }
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index b1149ed..5498365 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -22,17 +22,14 @@ import java.util.List;
 
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidUtil;
-import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
 import org.apache.kylin.engine.mr.steps.NDCuboidJob;
-import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -89,7 +86,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         return result;
     }
 
-    private boolean isEnableUHCDictStep() {
+    public boolean isEnableUHCDictStep() {
         if (!config.getConfig().isBuildUHCDictWithMREnabled()) {
             return false;
         }
@@ -102,21 +99,6 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         return true;
     }
 
-    private LookupMaterializeContext addMaterializeLookupTableSteps(final CubingJob result) {
-        LookupMaterializeContext lookupMaterializeContext = new LookupMaterializeContext(result);
-        CubeDesc cubeDesc = seg.getCubeDesc();
-        List<String> allSnapshotTypes = cubeDesc.getAllExtLookupSnapshotTypes();
-        if (allSnapshotTypes.isEmpty()) {
-            return null;
-        }
-        for (String snapshotType : allSnapshotTypes) {
-            logger.info("add lookup table materialize steps for storage type:{}", snapshotType);
-            ILookupMaterializer materializer = MRUtil.getExtLookupMaterializer(snapshotType);
-            materializer.materializeLookupTablesForCube(lookupMaterializeContext, seg.getCubeInstance());
-        }
-        return lookupMaterializeContext;
-    }
-
     protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
         // Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime
         final int maxLevel = CuboidUtil.getLongestDepth(seg.getCuboidScheduler().getAllCuboidIds());
@@ -128,16 +110,6 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         }
     }
 
-    private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
-        SaveStatisticsStep result = new SaveStatisticsStep();
-        result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
-        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
-        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
-        CubingExecutableUtil.setStatisticsPath(getStatisticsPath(jobId), result.getParams());
-        CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
-        return result;
-    }
-
     protected void addInMemCubingSteps(final CubingJob result, String jobId, String cuboidRootPath) {
         // base cuboid job
         MapReduceExecutable cubeStep = new MapReduceExecutable();
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index badf628..d443f52 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -21,9 +21,6 @@ package org.apache.kylin.engine.mr;
 import java.util.List;
 
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
-import org.apache.kylin.job.constant.ExecutableConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,15 +69,4 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
         return result;
     }
 
-    private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
-        MergeStatisticsStep result = new MergeStatisticsStep();
-        result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
-
-        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
-        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
-        CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams());
-        CubingExecutableUtil.setMergedStatisticsPath(mergedStatisticsFolder, result.getParams());
-
-        return result;
-    }
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 6458a6a..a1b2cfe 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
@@ -31,6 +32,8 @@ import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
 import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
+import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
+import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
 import org.apache.kylin.engine.mr.steps.UHCDictionaryJob;
 import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
 import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
@@ -38,6 +41,7 @@ import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
 
 import com.google.common.base.Preconditions;
+import org.apache.kylin.metadata.model.TblColRef;
 
 /**
  * Hold reusable steps for builders.
@@ -79,6 +83,18 @@ public class JobBuilderSupport {
         return result;
     }
 
+    public MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
+        MergeStatisticsStep result = new MergeStatisticsStep();
+        result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
+
+        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+        CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams());
+        CubingExecutableUtil.setMergedStatisticsPath(mergedStatisticsFolder, result.getParams());
+
+        return result;
+    }
+
     public MapReduceExecutable createBuildUHCDictStep(String jobId) {
         MapReduceExecutable result = new MapReduceExecutable();
         result.setName(ExecutableConstants.STEP_NAME_BUILD_UHC_DICTIONARY);
@@ -173,6 +189,45 @@ public class JobBuilderSupport {
         return result;
     }
 
+
+    public boolean isEnableUHCDictStep() {
+        if (!config.getConfig().isBuildUHCDictWithMREnabled()) {
+            return false;
+        }
+
+        List<TblColRef> uhcColumns = seg.getCubeDesc().getAllUHCColumns();
+        if (uhcColumns.size() == 0) {
+            return false;
+        }
+
+        return true;
+    }
+
+    public LookupMaterializeContext addMaterializeLookupTableSteps(final CubingJob result) {
+        LookupMaterializeContext lookupMaterializeContext = new LookupMaterializeContext(result);
+        CubeDesc cubeDesc = seg.getCubeDesc();
+        List<String> allSnapshotTypes = cubeDesc.getAllExtLookupSnapshotTypes();
+        if (allSnapshotTypes.isEmpty()) {
+            return null;
+        }
+        for (String snapshotType : allSnapshotTypes) {
+            ILookupMaterializer materializer = MRUtil.getExtLookupMaterializer(snapshotType);
+            materializer.materializeLookupTablesForCube(lookupMaterializeContext, seg.getCubeInstance());
+        }
+        return lookupMaterializeContext;
+    }
+
+
+    public SaveStatisticsStep createSaveStatisticsStep(String jobId) {
+        SaveStatisticsStep result = new SaveStatisticsStep();
+        result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
+        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+        CubingExecutableUtil.setStatisticsPath(getStatisticsPath(jobId), result.getParams());
+        CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
+        return result;
+    }
+
     // ============================================================================
 
     public String getJobWorkingDir(String jobId) {
diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml
index 819aaaa..700aeb5 100644
--- a/engine-spark/pom.xml
+++ b/engine-spark/pom.xml
@@ -32,10 +32,6 @@
     </parent>
 
     <dependencies>
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-storage-hbase</artifactId>
-        </dependency>
 
         <dependency>
             <groupId>org.apache.kylin</groupId>
@@ -81,13 +77,6 @@
 
         <dependency>
             <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-storage-hbase</artifactId>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-core-common</artifactId>
             <type>test-jar</type>
             <scope>test</scope>
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
new file mode 100644
index 0000000..5459c70
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+
+/**
+ * Any ISource that wishes to serve as input of MapReduce build engine must adapt to this interface.
+ */
+public interface ISparkInput {
+
+    /** Return a helper to participate in batch cubing job flow. */
+    public ISparkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
+
+    /** Return a helper to participate in batch cubing merge job flow. */
+    public ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
+
+    /**
+     * Participate the batch cubing flow as the input side. Responsible for creating
+     * intermediate flat table (Phase 1) and clean up any leftover (Phase 4).
+     * 
+     * - Phase 1: Create Flat Table
+     * - Phase 2: Build Dictionary (with FlatTableInputFormat)
+     * - Phase 3: Build Cube (with FlatTableInputFormat)
+     * - Phase 4: Update Metadata & Cleanup
+     */
+    public interface ISparkBatchCubingInputSide {
+
+        /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
+        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
+
+        /** Add step that does necessary clean up, like delete the intermediate flat table */
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+    }
+
+    public interface ISparkBatchMergeInputSide {
+
+        /** Add step that executes before merge dictionary and before merge cube. */
+        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+    }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkOutput.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkOutput.java
new file mode 100644
index 0000000..0ebc611
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkOutput.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.spark;
+
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public interface ISparkOutput {
+
+    /** Return a helper to participate in batch cubing job flow. */
+    public ISparkBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg);
+
+    /**
+     * Participate the batch cubing flow as the output side. Responsible for saving
+     * the cuboid output to storage at the end of Phase 3.
+     * 
+     * - Phase 1: Create Flat Table
+     * - Phase 2: Build Dictionary
+     * - Phase 3: Build Cube
+     * - Phase 4: Update Metadata & Cleanup
+     */
+    public interface ISparkBatchCubingOutputSide {
+
+        /** Add step that executes after build dictionary and before build cube. */
+        public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow);
+
+        /**
+         * Add step that saves cuboids from HDFS to storage.
+         * 
+         * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn, 
+         * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+         * dictionary encoding; Mx is measure value serialization form.
+         */
+        void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
+
+        /** Add step that does any necessary clean up. */
+        void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+
+    }
+
+    /** Return a helper to participate in batch merge job flow. */
+    ISparkBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg);
+
+    /**
+     * Participate the batch cubing flow as the output side. Responsible for saving
+     * the cuboid output to storage at the end of Phase 2.
+     * 
+     * - Phase 1: Merge Dictionary
+     * - Phase 2: Merge Cube
+     * - Phase 3: Update Metadata & Cleanup
+     */
+    interface ISparkBatchMergeOutputSide {
+
+        /** Add step that executes after merge dictionary and before merge cube. */
+        void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+        /**
+         * Add step that saves cuboid output from HDFS to storage.
+         * 
+         * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn, 
+         * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+         * dictionary encoding; Mx is measure value serialization form.
+         */
+        void addStepPhase2_BuildCube(CubeSegment set, List<CubeSegment> mergingSegments, DefaultChainedExecutable jobFlow);
+
+        /** Add step that does any necessary clean up. */
+        void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+
+    }
+
+    interface ISparkMergeOutputFormat {
+
+        /** Configure the InputFormat of given job. */
+        void configureJobInput(Job job, String input) throws Exception;
+
+        /** Configure the OutputFormat of given job. */
+        void configureJobOutput(Job job, String output, CubeSegment segment) throws Exception;
+
+        CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube);
+    }
+
+    ISparkBatchOptimizeOutputSide getBatchOptimizeOutputSide(CubeSegment seg);
+
+    /**
+     * Participate the batch cubing flow as the output side. Responsible for saving
+     * the cuboid output to storage at the end of Phase 3.
+     *
+     * - Phase 1: Filter Recommended Cuboid Data
+     * - Phase 2: Copy Dictionary & Calculate Statistics & Update Reused Cuboid Shard
+     * - Phase 3: Build Cube
+     * - Phase 4: Cleanup Optimize
+     * - Phase 5: Update Metadata & Cleanup
+     */
+    interface ISparkBatchOptimizeOutputSide {
+
+        /** Create HTable based on recommended cuboids & statistics*/
+        void addStepPhase2_CreateHTable(DefaultChainedExecutable jobFlow);
+
+        /** Build only missing cuboids*/
+        void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
+
+        /** Cleanup intermediate cuboid data on HDFS*/
+        void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+
+        /** Invoked by Checkpoint job & Cleanup old segments' HTables and related working directory*/
+        void addStepPhase5_Cleanup(DefaultChainedExecutable jobFlow);
+    }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
index f9aee63..90c52c2 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
@@ -20,7 +20,7 @@ package org.apache.kylin.engine.spark;
 
 import java.util.LinkedHashSet;
 import java.util.Set;
-
+import org.apache.hadoop.io.Text;
 import org.apache.kylin.engine.spark.util.PercentileCounterSerializer;
 import org.apache.kylin.measure.percentile.PercentileCounter;
 import org.apache.spark.serializer.KryoRegistrator;
@@ -45,6 +45,7 @@ public class KylinKryoRegistrator implements KryoRegistrator {
         kyroClasses.add(String[].class);
         kyroClasses.add(String[][].class);
         kyroClasses.add(Object[].class);
+        kyroClasses.add(Text.class);
         kyroClasses.add(java.math.BigDecimal.class);
         kyroClasses.add(java.util.ArrayList.class);
         kyroClasses.add(java.util.LinkedList.class);
@@ -83,7 +84,6 @@ public class KylinKryoRegistrator implements KryoRegistrator {
         kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnFamilyDesc[].class);
         kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnDesc[].class);
         kyroClasses.add(org.apache.kylin.cube.model.RowKeyColDesc[].class);
-
         kylinClassByReflection1(kyroClasses);
         kylinClassByReflection2(kyroClasses);
 
@@ -94,6 +94,7 @@ public class KylinKryoRegistrator implements KryoRegistrator {
         kyroClasses.add(org.roaringbitmap.buffer.MappeableArrayContainer.class);
         kyroClasses.add(org.roaringbitmap.buffer.MappeableBitmapContainer.class);
 
+        kyroClasses.add(Class.class);
 
         addClassQuitely(kyroClasses, "com.google.common.collect.EmptyImmutableList");
         addClassQuitely(kyroClasses, "java.nio.HeapShortBuffer");
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinSparkJobListener.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinSparkJobListener.java
new file mode 100644
index 0000000..7976c3b
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinSparkJobListener.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import java.io.Serializable;
+
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerTaskEnd;
+
+public class KylinSparkJobListener extends SparkListener implements Serializable {
+
+    public JobMetrics metrics = new JobMetrics();
+
+    @Override
+    public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+        if (taskEnd.taskMetrics().outputMetrics() != null) {
+            metrics.bytesWritten += taskEnd.taskMetrics().outputMetrics().bytesWritten();
+            metrics.recordsWritten += taskEnd.taskMetrics().outputMetrics().recordsWritten();
+        }
+    }
+
+    public JobMetrics getMetrics() {
+        return metrics;
+    }
+
+    public static class JobMetrics implements Serializable {
+        long bytesWritten;
+        long recordsWritten;
+
+        public long getBytesWritten() {
+            return bytesWritten;
+        }
+
+        public long getRecordsWritten() {
+            return recordsWritten;
+        }
+    }
+
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
index a7a4151..47316b4 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
@@ -19,15 +19,50 @@
 package org.apache.kylin.engine.spark;
 
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.MRBatchCubingEngine2;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.IBatchCubingEngine;
+import org.apache.kylin.engine.mr.BatchOptimizeJobBuilder2;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 
 /**
  */
-public class SparkBatchCubingEngine2 extends MRBatchCubingEngine2 {
+public class SparkBatchCubingEngine2 implements IBatchCubingEngine {
+    @Override
+    public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) {
+        return new CubeJoinedFlatTableDesc(cubeDesc);
+    }
+
+    @Override
+    public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) {
+        return new CubeJoinedFlatTableDesc(newSegment);
+    }
+
     @Override
     public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
         return new SparkBatchCubingJobBuilder2(newSegment, submitter).build();
     }
 
+    @Override
+    public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+        return new SparkBatchMergeJobBuilder2(mergeSegment, submitter).build();
+    }
+
+    @Override
+    public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter) {
+        //TODO use Spark to optimize
+        return new BatchOptimizeJobBuilder2(optimizeSegment, submitter).build();
+    }
+
+    @Override
+    public Class<?> getSourceInterface() {
+        return ISparkInput.class;
+    }
+
+    @Override
+    public Class<?> getStorageInterface() {
+        return ISparkOutput.class;
+    }
+
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 6ce9b90..e545166 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -20,12 +20,12 @@ package org.apache.kylin.engine.spark;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.StorageURL;
-import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.LookupMaterializeContext;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
@@ -37,15 +37,57 @@ import java.util.Map;
 
 /**
  */
-public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
+public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
 
     private static final Logger logger = LoggerFactory.getLogger(SparkBatchCubingJobBuilder2.class);
 
+    private final ISparkInput.ISparkBatchCubingInputSide inputSide;
+    private final ISparkOutput.ISparkBatchCubingOutputSide outputSide;
+
     public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
         super(newSegment, submitter);
+        this.inputSide = SparkUtil.getBatchCubingInputSide(seg);
+        this.outputSide = SparkUtil.getBatchCubingOutputSide(seg);
     }
 
-    @Override
+    public CubingJob build() {
+        logger.info("Spark new job to BUILD segment " + seg);
+
+        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+        final String jobId = result.getId();
+        final String cuboidRootPath = getCuboidRootPath(jobId);
+
+        // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
+        inputSide.addStepPhase1_CreateFlatTable(result);
+
+        // Phase 2: Build Dictionary
+        result.addTask(createFactDistinctColumnsStep(jobId));
+
+        if (isEnableUHCDictStep()) {
+            result.addTask(createBuildUHCDictStep(jobId));
+        }
+
+        result.addTask(createBuildDictionaryStep(jobId));
+        result.addTask(createSaveStatisticsStep(jobId));
+
+        // add materialize lookup tables if needed
+        LookupMaterializeContext lookupMaterializeContext = addMaterializeLookupTableSteps(result);
+
+        outputSide.addStepPhase2_BuildDictionary(result);
+
+        // Phase 3: Build Cube
+        addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
+        outputSide.addStepPhase3_BuildCube(result);
+
+        // Phase 4: Update Metadata & Cleanup
+        result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
+        inputSide.addStepPhase4_Cleanup(result);
+        outputSide.addStepPhase4_Cleanup(result);
+
+        return result;
+    }
+
+
     protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
         final SparkExecutable sparkExecutable = new SparkExecutable();
         sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
@@ -53,20 +95,6 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
         result.addTask(sparkExecutable);
     }
 
-    @Override
-    protected void addInMemCubingSteps(final CubingJob result, String jobId, String cuboidRootPath) {
-
-    }
-
-    private static String findJar(String className, String perferLibraryName) {
-        try {
-            return ClassUtil.findContainingJar(Class.forName(className), perferLibraryName);
-        } catch (ClassNotFoundException e) {
-            logger.warn("failed to locate jar for class " + className + ", ignore it");
-        }
-
-        return "";
-    }
 
     public void configureSparkJob(final CubeSegment seg, final SparkExecutable sparkExecutable,
             final String jobId, final String cuboidRootPath) {
@@ -90,7 +118,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
         sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE);
     }
 
-    private String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
+    public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
         Map<String, String> param = new HashMap<>();
         param.put("path", getDumpMetadataPath(jobId));
         return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "hdfs", param).toString();
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
similarity index 64%
copy from engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
copy to engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
index badf628..b68f6e86 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
@@ -16,34 +16,33 @@
  * limitations under the License.
 */
 
-package org.apache.kylin.engine.mr;
+package org.apache.kylin.engine.spark;
 
 import java.util.List;
 
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
-import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-public class BatchMergeJobBuilder2 extends JobBuilderSupport {
-    private static final Logger logger = LoggerFactory.getLogger(BatchMergeJobBuilder2.class);
+public class SparkBatchMergeJobBuilder2 extends JobBuilderSupport {
+    private static final Logger logger = LoggerFactory.getLogger(SparkBatchMergeJobBuilder2.class);
 
-    private final IMROutput2.IMRBatchMergeOutputSide2 outputSide;
-    private final IMRInput.IMRBatchMergeInputSide inputSide;
+    private final ISparkOutput.ISparkBatchMergeOutputSide outputSide;
+    private final ISparkInput.ISparkBatchMergeInputSide inputSide;
 
-    public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
+    public SparkBatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
         super(mergeSegment, submitter);
-        this.outputSide = MRUtil.getBatchMergeOutputSide2(seg);
-        this.inputSide = MRUtil.getBatchMergeInputSide(seg);
+        this.outputSide = SparkUtil.getBatchMergeOutputSide2(seg);
+        this.inputSide = SparkUtil.getBatchMergeInputSide(seg);
     }
 
     public CubingJob build() {
-        logger.info("MR_V2 new job to MERGE segment " + seg);
+        logger.info("Spark_V2 new job to MERGE segment " + seg);
 
         final CubeSegment cubeSegment = seg;
         final CubingJob result = CubingJob.createMergeJob(cubeSegment, submitter, config);
@@ -71,16 +70,5 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
 
         return result;
     }
-
-    private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
-        MergeStatisticsStep result = new MergeStatisticsStep();
-        result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
-
-        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
-        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
-        CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams());
-        CubingExecutableUtil.setMergedStatisticsPath(mergedStatisticsFolder, result.getParams());
-
-        return result;
-    }
+    
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 8f547fd..f842fd5 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -129,8 +129,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
         String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
 
-        Class[] kryoClassArray = new Class[] { org.apache.hadoop.io.Text.class,
-                Class.forName("scala.reflect.ClassTag$$anon$1"), java.lang.Class.class };
+        Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1") };
 
         SparkConf conf = new SparkConf().setAppName("Cubing for:" + cubeName + " segment " + segmentId);
         //serialization conf
@@ -242,7 +241,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         }
         allRDDs[totalLevels].unpersist();
         logger.info("Finished on calculating all level cuboids.");
-        deleteHDFSMeta(metaUrl);
+        //        deleteHDFSMeta(metaUrl);
     }
 
     protected void setHadoopConf(Job job, CubeSegment segment, String metaUrl) throws Exception {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 8de78c0..dfaa2e1 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -37,6 +37,7 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.common.JobRelatedMetaUtil;
 import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
@@ -53,6 +54,7 @@ public class SparkExecutable extends AbstractExecutable {
     private static final String CLASS_NAME = "className";
     private static final String JARS = "jars";
     private static final String JOB_ID = "jobId";
+    private String counter_save_as;
 
     public void setClassName(String className) {
         this.setParam(CLASS_NAME, className);
@@ -66,6 +68,10 @@ public class SparkExecutable extends AbstractExecutable {
         this.setParam(JARS, jars);
     }
 
+    public void setCounterSaveAs(String value) {
+        counter_save_as = value;
+    }
+
     private String formatArgs() {
         StringBuilder stringBuilder = new StringBuilder();
         for (Map.Entry<String, String> entry : getParams().entrySet()) {
@@ -125,7 +131,7 @@ public class SparkExecutable extends AbstractExecutable {
         try {
             attachSegmentMetadataWithDict(segment);
         } catch (IOException e) {
-            throw new ExecuteException("meta dump fialed");
+            throw new ExecuteException("meta dump failed");
         }
 
         StringBuilder stringBuilder = new StringBuilder();
@@ -145,7 +151,10 @@ public class SparkExecutable extends AbstractExecutable {
             CliCommandExecutor exec = new CliCommandExecutor();
             PatternedLogger patternedLogger = new PatternedLogger(logger);
             exec.execute(cmd, patternedLogger);
-            getManager().addJobInfo(getId(), patternedLogger.getInfo());
+
+            Map<String, String> joblogInfo = patternedLogger.getInfo();
+            readCounters(joblogInfo);
+            getManager().addJobInfo(getId(), joblogInfo);
             return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
         } catch (Exception e) {
             logger.error("error run spark job:", e);
@@ -192,4 +201,19 @@ public class SparkExecutable extends AbstractExecutable {
         //upload metadata
         ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig);
     }
+
+    private void readCounters(final Map<String, String> info) {
+        if (counter_save_as != null) {
+            String[] saveAsNames = counter_save_as.split(",");
+            saveCounterAs(info.get(ExecutableConstants.SOURCE_RECORDS_COUNT), saveAsNames, 0, info);
+            saveCounterAs(info.get(ExecutableConstants.SOURCE_RECORDS_SIZE), saveAsNames, 1, info);
+            saveCounterAs(info.get(ExecutableConstants.HDFS_BYTES_WRITTEN), saveAsNames, 2, info);
+        }
+    }
+
+    private void saveCounterAs(String counter, String[] saveAsNames, int i, Map<String, String> info) {
+        if (saveAsNames.length > i && StringUtils.isBlank(saveAsNames[i]) == false) {
+            info.put(saveAsNames[i].trim(), counter);
+        }
+    }
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
new file mode 100644
index 0000000..a4e17c3
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.SourceManager;
+import org.apache.kylin.storage.StorageFactory;
+
+public class SparkUtil {
+
+    public static ISparkInput.ISparkBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+        IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+        return SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchCubingInputSide(flatDesc);
+    }
+
+    private static TableDesc getTableDesc(String tableName, String prj) {
+        return TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName, prj);
+    }
+
+    public static ISparkOutput.ISparkBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
+        return StorageFactory.createEngineAdapter(seg, ISparkOutput.class).getBatchCubingOutputSide(seg);
+    }
+
+    public static ISparkOutput.ISparkBatchMergeOutputSide getBatchMergeOutputSide2(CubeSegment seg) {
+        return StorageFactory.createEngineAdapter(seg, ISparkOutput.class).getBatchMergeOutputSide(seg);
+    }
+
+    public static ISparkInput.ISparkBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+        return SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchMergeInputSide(seg);
+    }
+
+    public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
+        return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg);
+    }
+
+    private static synchronized GenericOptionsParser getParser(Configuration conf, String[] args) throws Exception {
+        return new GenericOptionsParser(conf, args);
+    }
+}
diff --git a/source-hive/pom.xml b/source-hive/pom.xml
index e7afb49..4c7e937 100644
--- a/source-hive/pom.xml
+++ b/source-hive/pom.xml
@@ -101,6 +101,14 @@
             <version>1.10.19</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-engine-spark</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-engine-spark</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java
new file mode 100644
index 0000000..ac25d07
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.HiveCmdBuilder;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class GarbageCollectionStep extends AbstractExecutable {
+    private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        KylinConfig config = context.getConfig();
+        StringBuffer output = new StringBuffer();
+        try {
+            output.append(cleanUpIntermediateFlatTable(config));
+            // don't drop view to avoid concurrent issue
+            //output.append(cleanUpHiveViewIntermediateTable(config));
+        } catch (IOException e) {
+            logger.error("job:" + getId() + " execute finished with exception", e);
+            return ExecuteResult.createError(e);
+        }
+
+        return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+    }
+
+    private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException {
+        StringBuffer output = new StringBuffer();
+        final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+        final List<String> hiveTables = this.getIntermediateTables();
+        for (String hiveTable : hiveTables) {
+            if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
+                hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
+                hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  " + hiveTable + ";");
+
+                output.append("Hive table " + hiveTable + " is dropped. \n");
+            }
+        }
+        config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
+        rmdirOnHDFS(getExternalDataPaths());
+        output.append("Path " + getExternalDataPaths() + " is deleted. \n");
+
+        return output.toString();
+    }
+
+    private void rmdirOnHDFS(List<String> paths) throws IOException {
+        for (String path : paths) {
+            Path externalDataPath = new Path(path);
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
+            if (fs.exists(externalDataPath)) {
+                fs.delete(externalDataPath, true);
+            }
+        }
+    }
+
+    public void setIntermediateTables(List<String> tableIdentity) {
+        setParam("oldHiveTables", StringUtil.join(tableIdentity, ","));
+    }
+
+    private List<String> getIntermediateTables() {
+        List<String> intermediateTables = Lists.newArrayList();
+        String[] tables = StringUtil.splitAndTrim(getParam("oldHiveTables"), ",");
+        for (String t : tables) {
+            intermediateTables.add(t);
+        }
+        return intermediateTables;
+    }
+
+    public void setExternalDataPaths(List<String> externalDataPaths) {
+        setParam("externalDataPaths", StringUtil.join(externalDataPaths, ","));
+    }
+
+    private List<String> getExternalDataPaths() {
+        String[] paths = StringUtil.splitAndTrim(getParam("externalDataPaths"), ",");
+        List<String> result = Lists.newArrayList();
+        for (String s : paths) {
+            result.add(s);
+        }
+        return result;
+    }
+
+    public void setHiveViewIntermediateTableIdentities(String tableIdentities) {
+        setParam("oldHiveViewIntermediateTables", tableIdentities);
+    }
+}
\ No newline at end of file
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
new file mode 100644
index 0000000..eae2e1c
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.HiveCmdBuilder;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.common.ShellExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.JoinTableDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+public class HiveInputBase {
+
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(HiveInputBase.class);
+
+    protected static String getTableNameForHCat(TableDesc table) {
+        String tableName = (table.isView()) ? table.getMaterializedName() : table.getName();
+        String database = (table.isView()) ? KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable()
+                : table.getDatabase();
+        return String.format("%s.%s", database, tableName).toUpperCase();
+    }
+
+    protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow, String hdfsWorkingDir,
+            IJoinedFlatTableDesc flatTableDesc, String flatTableDatabase) {
+        final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+        final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+        final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
+
+        jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatTableDesc));
+    }
+
+    protected static AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir,
+            String cubeName, IJoinedFlatTableDesc flatDesc) {
+        //from hive to hive
+        final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
+        final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
+        String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
+
+        CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
+        step.setInitStatement(hiveInitStatements);
+        step.setCreateTableStatement(dropTableHql + createTableHql + insertDataHqls);
+        CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+        step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+        return step;
+    }
+
+    protected static AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName,
+            IJoinedFlatTableDesc flatDesc) {
+        RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
+        step.setInitStatement(hiveInitStatements);
+        step.setIntermediateTable(flatDesc.getTableName());
+        step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc));
+        CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+        step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE);
+        return step;
+    }
+
+    protected static ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements,
+            String jobWorkingDir, IJoinedFlatTableDesc flatDesc, List<String> intermediateTables) {
+        ShellExecutable step = new ShellExecutable();
+        step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
+
+        KylinConfig kylinConfig = flatDesc.getSegment().getConfig();
+        TableMetadataManager metadataManager = TableMetadataManager.getInstance(kylinConfig);
+        final Set<TableDesc> lookupViewsTables = Sets.newHashSet();
+
+        String prj = flatDesc.getDataModel().getProject();
+        for (JoinTableDesc lookupDesc : flatDesc.getDataModel().getJoinTables()) {
+            TableDesc tableDesc = metadataManager.getTableDesc(lookupDesc.getTable(), prj);
+            if (lookupDesc.getKind() == DataModelDesc.TableKind.LOOKUP && tableDesc.isView()) {
+                lookupViewsTables.add(tableDesc);
+            }
+        }
+
+        if (lookupViewsTables.size() == 0) {
+            return null;
+        }
+
+        HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+        hiveCmdBuilder.overwriteHiveProps(kylinConfig.getHiveConfigOverride());
+        hiveCmdBuilder.addStatement(hiveInitStatements);
+        for (TableDesc lookUpTableDesc : lookupViewsTables) {
+            String identity = lookUpTableDesc.getIdentity();
+            String intermediate = lookUpTableDesc.getMaterializedName();
+            if (lookUpTableDesc.isView()) {
+                String materializeViewHql = materializeViewHql(intermediate, identity, jobWorkingDir);
+                hiveCmdBuilder.addStatement(materializeViewHql);
+                intermediateTables.add(intermediate);
+            }
+        }
+
+        step.setCmd(hiveCmdBuilder.build());
+        return step;
+    }
+
+    // each append must be a complete hql.
+    protected static String materializeViewHql(String viewName, String tableName, String jobWorkingDir) {
+        StringBuilder createIntermediateTableHql = new StringBuilder();
+        createIntermediateTableHql.append("DROP TABLE IF EXISTS " + viewName + ";\n");
+        createIntermediateTableHql.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + viewName + " LIKE " + tableName
+                + " LOCATION '" + jobWorkingDir + "/" + viewName + "';\n");
+        createIntermediateTableHql.append("ALTER TABLE " + viewName + " SET TBLPROPERTIES('auto.purge'='true');\n");
+        createIntermediateTableHql.append("INSERT OVERWRITE TABLE " + viewName + " SELECT * FROM " + tableName + ";\n");
+        return createIntermediateTableHql.toString();
+    }
+
+    protected static String getJobWorkingDir(DefaultChainedExecutable jobFlow, String hdfsWorkingDir) {
+
+        String jobWorkingDir = JobBuilderSupport.getJobWorkingDir(hdfsWorkingDir, jobFlow.getId());
+        if (KylinConfig.getInstanceFromEnv().getHiveTableDirCreateFirst()) {
+            // Create work dir to avoid hive create it,
+            // the difference is that the owners are different.
+            checkAndCreateWorkDir(jobWorkingDir);
+        }
+        return jobWorkingDir;
+    }
+
+    protected static void checkAndCreateWorkDir(String jobWorkingDir) {
+        try {
+            Path path = new Path(jobWorkingDir);
+            FileSystem fileSystem = HadoopUtil.getFileSystem(path);
+            if (!fileSystem.exists(path)) {
+                logger.info("Create jobWorkDir : " + jobWorkingDir);
+                fileSystem.mkdirs(path);
+            }
+        } catch (IOException e) {
+            logger.error("Could not create lookUp table dir : " + jobWorkingDir);
+        }
+    }
+
+}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 8653500..8aec8b9 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -21,58 +21,33 @@ package org.apache.kylin.source.hive;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.HiveCmdBuilder;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.common.PatternedLogger;
-import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.TableMetadataManager;
-import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
-import org.apache.kylin.metadata.model.JoinTableDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 
-public class HiveMRInput implements IMRInput {
+public class HiveMRInput extends HiveInputBase implements IMRInput {
 
     @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(HiveMRInput.class);
 
-    public static String getTableNameForHCat(TableDesc table) {
-        String tableName = (table.isView()) ? table.getMaterializedName() : table.getName();
-        String database = (table.isView()) ? KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable()
-                : table.getDatabase();
-        return String.format("%s.%s", database, tableName).toUpperCase();
-    }
-
     @Override
     public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
         return new BatchCubingInputSide(flatDesc);
@@ -132,7 +107,7 @@ public class HiveMRInput implements IMRInput {
         final protected String flatTableDatabase;
         final protected String hdfsWorkingDir;
 
-        String hiveViewIntermediateTables = "";
+        List<String> hiveViewIntermediateTables = Lists.newArrayList();
 
         public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
             KylinConfig config = KylinConfig.getInstanceFromEnv();
@@ -154,7 +129,7 @@ public class HiveMRInput implements IMRInput {
             // then count and redistribute
             if (cubeConfig.isHiveRedistributeEnabled()) {
                 if (flatDesc.getClusterBy() != null || flatDesc.getDistributedBy() != null) {
-                    jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName));
+                    jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc));
                 }
             }
 
@@ -165,132 +140,31 @@ public class HiveMRInput implements IMRInput {
         protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
             final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
             final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-            final String jobWorkingDir = getJobWorkingDir(jobFlow);
+            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
 
-            jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName));
+            jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
         }
 
         protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
             final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-            final String jobWorkingDir = getJobWorkingDir(jobFlow);
+            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
 
-            AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir);
+            AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir, flatDesc, hiveViewIntermediateTables);
             if (task != null) {
                 jobFlow.addTask(task);
             }
         }
 
-        protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
-
-            String jobWorkingDir = JobBuilderSupport.getJobWorkingDir(hdfsWorkingDir, jobFlow.getId());
-            if (KylinConfig.getInstanceFromEnv().getHiveTableDirCreateFirst()) {
-                // Create work dir to avoid hive create it,
-                // the difference is that the owners are different.
-                checkAndCreateWorkDir(jobWorkingDir);
-            }
-            return jobWorkingDir;
-        }
-
-        private void checkAndCreateWorkDir(String jobWorkingDir) {
-            try {
-                Path path = new Path(jobWorkingDir);
-                FileSystem fileSystem = HadoopUtil.getFileSystem(path);
-                if (!fileSystem.exists(path)) {
-                    logger.info("Create jobWorkDir : " + jobWorkingDir);
-                    fileSystem.mkdirs(path);
-                }
-            } catch (IOException e) {
-                logger.error("Could not create lookUp table dir : " + jobWorkingDir);
-            }
-        }
-
-        private AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName) {
-            RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
-            step.setInitStatement(hiveInitStatements);
-            step.setIntermediateTable(flatDesc.getTableName());
-            step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc));
-            CubingExecutableUtil.setCubeName(cubeName, step.getParams());
-            step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE);
-            return step;
-        }
-
-        private ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements,
-                String jobWorkingDir) {
-            ShellExecutable step = new ShellExecutable();
-            step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
-
-            KylinConfig kylinConfig = ((CubeSegment) flatDesc.getSegment()).getConfig();
-            TableMetadataManager metadataManager = TableMetadataManager.getInstance(kylinConfig);
-            final Set<TableDesc> lookupViewsTables = Sets.newHashSet();
-
-            String prj = flatDesc.getDataModel().getProject();
-            for (JoinTableDesc lookupDesc : flatDesc.getDataModel().getJoinTables()) {
-                TableDesc tableDesc = metadataManager.getTableDesc(lookupDesc.getTable(), prj);
-                if (lookupDesc.getKind() == DataModelDesc.TableKind.LOOKUP && tableDesc.isView()) {
-                    lookupViewsTables.add(tableDesc);
-                }
-            }
-
-            if (lookupViewsTables.size() == 0) {
-                return null;
-            }
-
-            HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
-            hiveCmdBuilder.overwriteHiveProps(kylinConfig.getHiveConfigOverride());
-            hiveCmdBuilder.addStatement(hiveInitStatements);
-            for (TableDesc lookUpTableDesc : lookupViewsTables) {
-                String identity = lookUpTableDesc.getIdentity();
-                String intermediate = lookUpTableDesc.getMaterializedName();
-                if (lookUpTableDesc.isView()) {
-                    String materializeViewHql = materializeViewHql(intermediate, identity, jobWorkingDir);
-                    hiveCmdBuilder.addStatement(materializeViewHql);
-                    hiveViewIntermediateTables = hiveViewIntermediateTables + intermediate + ";";
-                }
-            }
-
-            hiveViewIntermediateTables = hiveViewIntermediateTables.substring(0,
-                    hiveViewIntermediateTables.length() - 1);
-
-            step.setCmd(hiveCmdBuilder.build());
-            return step;
-        }
-
-        // each append must be a complete hql.
-        public static String materializeViewHql(String viewName, String tableName, String jobWorkingDir) {
-            StringBuilder createIntermediateTableHql = new StringBuilder();
-            createIntermediateTableHql.append("DROP TABLE IF EXISTS " + viewName + ";\n");
-            createIntermediateTableHql.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + viewName + " LIKE " + tableName
-                    + " LOCATION '" + jobWorkingDir + "/" + viewName + "';\n");
-            createIntermediateTableHql.append("ALTER TABLE " + viewName + " SET TBLPROPERTIES('auto.purge'='true');\n");
-            createIntermediateTableHql
-                    .append("INSERT OVERWRITE TABLE " + viewName + " SELECT * FROM " + tableName + ";\n");
-            return createIntermediateTableHql.toString();
-        }
-
-        private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir,
-                String cubeName) {
-            //from hive to hive
-            final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
-            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
-            String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
-
-            CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
-            step.setInitStatement(hiveInitStatements);
-            step.setCreateTableStatement(dropTableHql + createTableHql + insertDataHqls);
-            CubingExecutableUtil.setCubeName(cubeName, step.getParams());
-            step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
-            return step;
-        }
 
         @Override
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            final String jobWorkingDir = getJobWorkingDir(jobFlow);
+            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
 
             GarbageCollectionStep step = new GarbageCollectionStep();
             step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
             step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
             step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));
-            step.setHiveViewIntermediateTableIdentities(hiveViewIntermediateTables);
+            step.setHiveViewIntermediateTableIdentities(StringUtil.join(hiveViewIntermediateTables, ","));
             jobFlow.addTask(step);
         }
 
@@ -304,196 +178,4 @@ public class HiveMRInput implements IMRInput {
         }
     }
 
-    public static class RedistributeFlatHiveTableStep extends AbstractExecutable {
-        private final PatternedLogger stepLogger = new PatternedLogger(logger);
-
-        private long computeRowCount(String database, String table) throws Exception {
-            IHiveClient hiveClient = HiveClientFactory.getHiveClient();
-            return hiveClient.getHiveTableRows(database, table);
-        }
-
-        private long getDataSize(String database, String table) throws Exception {
-            IHiveClient hiveClient = HiveClientFactory.getHiveClient();
-            long size = hiveClient.getHiveTableMeta(database, table).fileSize;
-            return size;
-        }
-
-        private void redistributeTable(KylinConfig config, int numReducers) throws IOException {
-            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
-            hiveCmdBuilder.overwriteHiveProps(config.getHiveConfigOverride());
-            hiveCmdBuilder.addStatement(getInitStatement());
-            hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n");
-            hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n");
-            hiveCmdBuilder.addStatement(getRedistributeDataStatement());
-            final String cmd = hiveCmdBuilder.toString();
-
-            stepLogger.log("Redistribute table, cmd: ");
-            stepLogger.log(cmd);
-
-            Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
-            getManager().addJobInfo(getId(), stepLogger.getInfo());
-
-            if (response.getFirst() != 0) {
-                throw new RuntimeException("Failed to redistribute flat hive table");
-            }
-        }
-
-        private KylinConfig getCubeSpecificConfig() {
-            String cubeName = CubingExecutableUtil.getCubeName(getParams());
-            CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-            CubeInstance cube = manager.getCube(cubeName);
-            return cube.getConfig();
-        }
-
-        @Override
-        protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-            KylinConfig config = getCubeSpecificConfig();
-            String intermediateTable = getIntermediateTable();
-            String database, tableName;
-            if (intermediateTable.indexOf(".") > 0) {
-                database = intermediateTable.substring(0, intermediateTable.indexOf("."));
-                tableName = intermediateTable.substring(intermediateTable.indexOf(".") + 1);
-            } else {
-                database = config.getHiveDatabaseForIntermediateTable();
-                tableName = intermediateTable;
-            }
-
-            try {
-                long rowCount = computeRowCount(database, tableName);
-                logger.debug("Row count of table '" + intermediateTable + "' is " + rowCount);
-                if (rowCount == 0) {
-                    if (!config.isEmptySegmentAllowed()) {
-                        stepLogger.log("Detect upstream hive table is empty, "
-                                + "fail the job because \"kylin.job.allow-empty-segment\" = \"false\"");
-                        return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
-                    } else {
-                        return new ExecuteResult(ExecuteResult.State.SUCCEED,
-                                "Row count is 0, no need to redistribute");
-                    }
-                }
-
-                int mapperInputRows = config.getHadoopJobMapperInputRows();
-
-                int numReducers = Math.round(rowCount / ((float) mapperInputRows));
-                numReducers = Math.max(1, numReducers);
-                numReducers = Math.min(numReducers, config.getHadoopJobMaxReducerNumber());
-
-                stepLogger.log("total input rows = " + rowCount);
-                stepLogger.log("expected input rows per mapper = " + mapperInputRows);
-                stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers);
-
-                redistributeTable(config, numReducers);
-                long dataSize = getDataSize(database, tableName);
-                getManager().addJobInfo(getId(), ExecutableConstants.HDFS_BYTES_WRITTEN, "" + dataSize);
-                return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());
-
-            } catch (Exception e) {
-                logger.error("job:" + getId() + " execute finished with exception", e);
-                return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog(), e);
-            }
-        }
-
-        public void setInitStatement(String sql) {
-            setParam("HiveInit", sql);
-        }
-
-        public String getInitStatement() {
-            return getParam("HiveInit");
-        }
-
-        public void setRedistributeDataStatement(String sql) {
-            setParam("HiveRedistributeData", sql);
-        }
-
-        public String getRedistributeDataStatement() {
-            return getParam("HiveRedistributeData");
-        }
-
-        public String getIntermediateTable() {
-            return getParam("intermediateTable");
-        }
-
-        public void setIntermediateTable(String intermediateTable) {
-            setParam("intermediateTable", intermediateTable);
-        }
-    }
-
-    public static class GarbageCollectionStep extends AbstractExecutable {
-        private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
-
-        @Override
-        protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-            KylinConfig config = context.getConfig();
-            StringBuffer output = new StringBuffer();
-            try {
-                output.append(cleanUpIntermediateFlatTable(config));
-                // don't drop view to avoid concurrent issue
-                //output.append(cleanUpHiveViewIntermediateTable(config));
-            } catch (IOException e) {
-                logger.error("job:" + getId() + " execute finished with exception", e);
-                return ExecuteResult.createError(e);
-            }
-
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
-        }
-
-        private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException {
-            StringBuffer output = new StringBuffer();
-            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
-            final List<String> hiveTables = this.getIntermediateTables();
-            for (String hiveTable : hiveTables) {
-                if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
-                    hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
-                    hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  " + hiveTable + ";");
-
-                    output.append("Hive table " + hiveTable + " is dropped. \n");
-                }
-            }
-            config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
-            rmdirOnHDFS(getExternalDataPaths());
-            output.append("Path " + getExternalDataPaths() + " is deleted. \n");
-
-            return output.toString();
-        }
-
-        private void rmdirOnHDFS(List<String> paths) throws IOException {
-            for (String path : paths) {
-                Path externalDataPath = new Path(path);
-                FileSystem fs = HadoopUtil.getWorkingFileSystem();
-                if (fs.exists(externalDataPath)) {
-                    fs.delete(externalDataPath, true);
-                }
-            }
-        }
-
-        public void setIntermediateTables(List<String> tableIdentity) {
-            setParam("oldHiveTables", StringUtil.join(tableIdentity, ","));
-        }
-
-        private List<String> getIntermediateTables() {
-            List<String> intermediateTables = Lists.newArrayList();
-            String[] tables = StringUtil.splitAndTrim(getParam("oldHiveTables"), ",");
-            for (String t : tables) {
-                intermediateTables.add(t);
-            }
-            return intermediateTables;
-        }
-
-        public void setExternalDataPaths(List<String> externalDataPaths) {
-            setParam("externalDataPaths", StringUtil.join(externalDataPaths, ","));
-        }
-
-        private List<String> getExternalDataPaths() {
-            String[] paths = StringUtil.splitAndTrim(getParam("externalDataPaths"), ",");
-            List<String> result = Lists.newArrayList();
-            for (String s : paths) {
-                result.add(s);
-            }
-            return result;
-        }
-
-        public void setHiveViewIntermediateTableIdentities(String tableIdentities) {
-            setParam("oldHiveViewIntermediateTables", tableIdentities);
-        }
-    }
 }
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
index 58bd2c3..daf93d3 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.spark.ISparkInput;
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.IReadableTable;
@@ -45,6 +46,8 @@ public class HiveSource implements ISource {
     public <I> I adaptToBuildEngine(Class<I> engineInterface) {
         if (engineInterface == IMRInput.class) {
             return (I) new HiveMRInput();
+        } else if (engineInterface == ISparkInput.class) {
+            return (I) new HiveSparkInput();
         } else {
             throw new RuntimeException("Cannot adapt to " + engineInterface);
         }
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
new file mode 100644
index 0000000..779835b
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.spark.ISparkInput;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class HiveSparkInput extends HiveInputBase implements ISparkInput {
+
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(HiveSparkInput.class);
+
+    @Override
+    public ISparkInput.ISparkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+        return new BatchCubingInputSide(flatDesc);
+    }
+
+    @Override
+    public ISparkInput.ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new ISparkInput.ISparkBatchMergeInputSide() {
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+                // doing nothing
+            }
+        };
+    }
+
+    public class BatchCubingInputSide implements ISparkBatchCubingInputSide {
+
+        final protected IJoinedFlatTableDesc flatDesc;
+        final protected String flatTableDatabase;
+        final protected String hdfsWorkingDir;
+
+        List<String> hiveViewIntermediateTables = Lists.newArrayList();
+
+        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            this.flatDesc = flatDesc;
+            this.flatTableDatabase = config.getHiveDatabaseForIntermediateTable();
+            this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
+        }
+
+        @Override
+        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
+            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+            final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName)
+                    .getConfig();
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+
+            // create flat table first
+            addStepPhase1_DoCreateFlatTable(jobFlow, hdfsWorkingDir, flatDesc, flatTableDatabase);
+
+            // then count and redistribute
+            if (cubeConfig.isHiveRedistributeEnabled()) {
+                if (flatDesc.getClusterBy() != null || flatDesc.getDistributedBy() != null) {
+                    jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc));
+                }
+            }
+
+            // special for hive
+            addStepPhase1_DoMaterializeLookupTable(jobFlow);
+        }
+
+        protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
+
+            AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir, flatDesc, hiveViewIntermediateTables);
+            if (task != null) {
+                jobFlow.addTask(task);
+            }
+        }
+
+
+
+        @Override
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
+
+            GarbageCollectionStep step = new GarbageCollectionStep();
+            step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
+            step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
+            step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));
+            step.setHiveViewIntermediateTableIdentities(StringUtil.join(hiveViewIntermediateTables, ","));
+            jobFlow.addTask(step);
+        }
+
+        private String getIntermediateTableIdentity() {
+            return flatTableDatabase + "." + flatDesc.getTableName();
+        }
+    }
+
+}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java
new file mode 100644
index 0000000..0dfb5bf
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HiveCmdBuilder;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+public class RedistributeFlatHiveTableStep extends AbstractExecutable {
+    private final PatternedLogger stepLogger = new PatternedLogger(logger);
+
+    private long computeRowCount(String database, String table) throws Exception {
+        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+        return hiveClient.getHiveTableRows(database, table);
+    }
+
+    private long getDataSize(String database, String table) throws Exception {
+        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+        long size = hiveClient.getHiveTableMeta(database, table).fileSize;
+        return size;
+    }
+
+    private void redistributeTable(KylinConfig config, int numReducers) throws IOException {
+        final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+        hiveCmdBuilder.overwriteHiveProps(config.getHiveConfigOverride());
+        hiveCmdBuilder.addStatement(getInitStatement());
+        hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n");
+        hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n");
+        hiveCmdBuilder.addStatement(getRedistributeDataStatement());
+        final String cmd = hiveCmdBuilder.toString();
+
+        stepLogger.log("Redistribute table, cmd: ");
+        stepLogger.log(cmd);
+
+        Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
+        getManager().addJobInfo(getId(), stepLogger.getInfo());
+
+        if (response.getFirst() != 0) {
+            throw new RuntimeException("Failed to redistribute flat hive table");
+        }
+    }
+
+    private KylinConfig getCubeSpecificConfig() {
+        String cubeName = CubingExecutableUtil.getCubeName(getParams());
+        CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+        CubeInstance cube = manager.getCube(cubeName);
+        return cube.getConfig();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        KylinConfig config = getCubeSpecificConfig();
+        String intermediateTable = getIntermediateTable();
+        String database, tableName;
+        if (intermediateTable.indexOf(".") > 0) {
+            database = intermediateTable.substring(0, intermediateTable.indexOf("."));
+            tableName = intermediateTable.substring(intermediateTable.indexOf(".") + 1);
+        } else {
+            database = config.getHiveDatabaseForIntermediateTable();
+            tableName = intermediateTable;
+        }
+
+        try {
+            long rowCount = computeRowCount(database, tableName);
+            logger.debug("Row count of table '" + intermediateTable + "' is " + rowCount);
+            if (rowCount == 0) {
+                if (!config.isEmptySegmentAllowed()) {
+                    stepLogger.log("Detect upstream hive table is empty, "
+                            + "fail the job because \"kylin.job.allow-empty-segment\" = \"false\"");
+                    return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+                } else {
+                    return new ExecuteResult(ExecuteResult.State.SUCCEED, "Row count is 0, no need to redistribute");
+                }
+            }
+
+            int mapperInputRows = config.getHadoopJobMapperInputRows();
+
+            int numReducers = Math.round(rowCount / ((float) mapperInputRows));
+            numReducers = Math.max(1, numReducers);
+            numReducers = Math.min(numReducers, config.getHadoopJobMaxReducerNumber());
+
+            stepLogger.log("total input rows = " + rowCount);
+            stepLogger.log("expected input rows per mapper = " + mapperInputRows);
+            stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers);
+
+            redistributeTable(config, numReducers);
+            long dataSize = getDataSize(database, tableName);
+            getManager().addJobInfo(getId(), ExecutableConstants.HDFS_BYTES_WRITTEN, "" + dataSize);
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());
+
+        } catch (Exception e) {
+            logger.error("job:" + getId() + " execute finished with exception", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog(), e);
+        }
+    }
+
+    public void setInitStatement(String sql) {
+        setParam("HiveInit", sql);
+    }
+
+    public String getInitStatement() {
+        return getParam("HiveInit");
+    }
+
+    public void setRedistributeDataStatement(String sql) {
+        setParam("HiveRedistributeData", sql);
+    }
+
+    public String getRedistributeDataStatement() {
+        return getParam("HiveRedistributeData");
+    }
+
+    public String getIntermediateTable() {
+        return getParam("intermediateTable");
+    }
+
+    public void setIntermediateTable(String intermediateTable) {
+        setParam("intermediateTable", intermediateTable);
+    }
+}
\ No newline at end of file
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index fcdb516..74d95cf 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@ -66,7 +66,7 @@ public class JdbcHiveMRInput extends HiveMRInput {
         protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
             final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
             final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-            final String jobWorkingDir = getJobWorkingDir(jobFlow);
+            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
 
             jobFlow.addTask(createSqoopToFlatHiveStep(jobWorkingDir, cubeName));
             jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, jobWorkingDir));
diff --git a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java
index a81cc21..34c946a 100644
--- a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java
+++ b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java
@@ -46,8 +46,7 @@ public class HiveMRInputTest {
             DefaultChainedExecutable defaultChainedExecutable = mock(DefaultChainedExecutable.class);
             defaultChainedExecutable.setId(UUID.randomUUID().toString());
 
-            HiveMRInput.BatchCubingInputSide batchCubingInputSide = new HiveMRInput.BatchCubingInputSide(null);
-            String jobWorkingDir = batchCubingInputSide.getJobWorkingDir(defaultChainedExecutable);
+            String jobWorkingDir = HiveInputBase.getJobWorkingDir(defaultChainedExecutable, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
             jobWorkDirPath = new Path(jobWorkingDir);
             Assert.assertTrue(fileSystem.exists(jobWorkDirPath));
         } finally {
@@ -65,7 +64,7 @@ public class HiveMRInputTest {
 
         StringBuilder hqls = new StringBuilder();
         for (int i = 0; i < viewSize; i++) {
-            String hql = HiveMRInput.BatchCubingInputSide.materializeViewHql(mockedViewNames[i], mockedTalbeNames[i],
+            String hql = HiveInputBase.materializeViewHql(mockedViewNames[i], mockedTalbeNames[i],
                     mockedWorkingDir);
             hqls.append(hql);
         }
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
new file mode 100644
index 0000000..a624f8f
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.kafka;
+
+import java.util.List;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.hive.CreateFlatHiveTableStep;
+import org.apache.kylin.source.hive.GarbageCollectionStep;
+import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
+import org.apache.kylin.source.kafka.job.MergeOffsetStep;
+
+public class KafkaInputBase {
+
+    protected static AbstractExecutable createMergeOffsetStep(String jobId, CubeSegment cubeSegment) {
+
+        final MergeOffsetStep result = new MergeOffsetStep();
+        result.setName("Merge offset step");
+
+        CubingExecutableUtil.setCubeName(cubeSegment.getCubeInstance().getName(), result.getParams());
+        CubingExecutableUtil.setSegmentId(cubeSegment.getUuid(), result.getParams());
+        CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
+        return result;
+    }
+
+    protected static MapReduceExecutable createSaveKafkaDataStep(String jobId, String location, CubeSegment seg) {
+        MapReduceExecutable result = new MapReduceExecutable();
+        result.setName("Save data from Kafka");
+        result.setMapReduceJobClass(KafkaFlatTableJob.class);
+        JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system");
+        StringBuilder cmd = new StringBuilder();
+        jobBuilderSupport.appendMapReduceParameters(cmd);
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, location);
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+                "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
+
+        result.setMapReduceParams(cmd.toString());
+        return result;
+    }
+
+    protected static AbstractExecutable createFlatTable(final String hiveTableDatabase, final String mockFactTableName,
+            final String baseLocation, final String cubeName, final CubeDesc cubeDesc,
+            final IJoinedFlatTableDesc flatDesc, final List<String> intermediateTables,
+            final List<String> intermediatePaths) {
+        final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase);
+
+        final IJoinedFlatTableDesc mockfactDesc = new IJoinedFlatTableDesc() {
+
+            @Override
+            public String getTableName() {
+                return mockFactTableName;
+            }
+
+            @Override
+            public DataModelDesc getDataModel() {
+                return cubeDesc.getModel();
+            }
+
+            @Override
+            public List<TblColRef> getAllColumns() {
+                return flatDesc.getFactColumns();
+            }
+
+            @Override
+            public List<TblColRef> getFactColumns() {
+                return null;
+            }
+
+            @Override
+            public int getColumnIndex(TblColRef colRef) {
+                return 0;
+            }
+
+            @Override
+            public SegmentRange getSegRange() {
+                return null;
+            }
+
+            @Override
+            public TblColRef getDistributedBy() {
+                return null;
+            }
+
+            @Override
+            public TblColRef getClusterBy() {
+                return null;
+            }
+
+            @Override
+            public ISegment getSegment() {
+                return null;
+            }
+
+            @Override
+            public boolean useAlias() {
+                return false;
+            }
+        };
+        final String dropFactTableHql = JoinedFlatTable.generateDropTableStatement(mockfactDesc);
+        // the table inputformat is sequence file
+        final String createFactTableHql = JoinedFlatTable.generateCreateTableStatement(mockfactDesc, baseLocation,
+                JoinedFlatTable.SEQUENCEFILE);
+
+        final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
+        final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, baseLocation);
+        String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
+        insertDataHqls = insertDataHqls.replace(flatDesc.getDataModel().getRootFactTableName() + " ",
+                mockFactTableName + " ");
+
+        CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
+        CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+        step.setInitStatement(hiveInitStatements);
+        step.setCreateTableStatement(
+                dropFactTableHql + createFactTableHql + dropTableHql + createTableHql + insertDataHqls);
+        step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+
+        intermediateTables.add(flatDesc.getTableName());
+        intermediateTables.add(mockFactTableName);
+        intermediatePaths.add(baseLocation + "/" + flatDesc.getTableName());
+        intermediatePaths.add(baseLocation + "/" + mockFactTableName);
+        return step;
+    }
+
+    protected static AbstractExecutable createGCStep(List<String> intermediateTables, List<String> intermediatePaths) {
+        GarbageCollectionStep step = new GarbageCollectionStep();
+        step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
+        step.setIntermediateTables(intermediateTables);
+        step.setExternalDataPaths(intermediatePaths);
+
+        return step;
+    }
+}
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index bc2426d..a45cc63 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -22,8 +22,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
-import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -31,38 +29,25 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
-import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.hive.CreateFlatHiveTableStep;
-import org.apache.kylin.source.hive.HiveMRInput;
-import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
-import org.apache.kylin.source.kafka.job.MergeOffsetStep;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KafkaMRInput implements IMRInput {
+import com.google.common.collect.Lists;
+
+public class KafkaMRInput extends KafkaInputBase implements IMRInput {
 
     private static final Logger logger = LoggerFactory.getLogger(KafkaMRInput.class);
     private CubeSegment cubeSegment;
@@ -99,7 +84,8 @@ public class KafkaMRInput implements IMRInput {
             job.setInputFormatClass(SequenceFileInputFormat.class);
             String jobId = job.getConfiguration().get(BatchConstants.ARG_CUBING_JOB_ID);
             IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
-            String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
+            String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc,
+                    JobBuilderSupport.getJobWorkingDir(conf, jobId));
             try {
                 FileInputFormat.addInputPath(job, new Path(inputPath));
             } catch (IOException e) {
@@ -110,7 +96,7 @@ public class KafkaMRInput implements IMRInput {
         @Override
         public Collection<String[]> parseMapperInput(Object mapperInput) {
             Text text = (Text) mapperInput;
-            String[] columns  = Bytes.toString(text.getBytes(), 0, text.getLength()).split(delimiter);
+            String[] columns = Bytes.toString(text.getBytes(), 0, text.getLength()).split(delimiter);
             return Collections.singletonList(columns);
         }
 
@@ -120,7 +106,7 @@ public class KafkaMRInput implements IMRInput {
 
         final JobEngineConfig conf;
         final CubeSegment seg;
-        private CubeDesc cubeDesc ;
+        private CubeDesc cubeDesc;
         private KylinConfig config;
         protected IJoinedFlatTableDesc flatDesc;
         protected String hiveTableDatabase;
@@ -147,119 +133,23 @@ public class KafkaMRInput implements IMRInput {
                 // directly use flat table location
                 final String intermediateFactTable = flatDesc.getTableName();
                 final String tableLocation = baseLocation + "/" + intermediateFactTable;
-                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation));
+                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation, seg));
                 intermediatePaths.add(tableLocation);
             } else {
-                final String mockFactTableName =  MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + cubeName.toLowerCase() + "_"
-                        + seg.getUuid().replaceAll("-", "_") + "_fact";
-                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName));
-                jobFlow.addTask(createFlatTable(mockFactTableName, baseLocation));
+                final String mockFactTableName = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + cubeName.toLowerCase()
+                        + "_" + seg.getUuid().replaceAll("-", "_") + "_fact";
+                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName, seg));
+                jobFlow.addTask(createFlatTable(hiveTableDatabase, mockFactTableName, baseLocation, cubeName, cubeDesc, flatDesc, intermediateTables, intermediatePaths));
             }
         }
-        private AbstractExecutable createFlatTable(final String mockFactTableName, String baseLocation) {
-            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase);
-
-            final IJoinedFlatTableDesc mockfactDesc = new IJoinedFlatTableDesc() {
-
-                @Override
-                public String getTableName() {
-                    return mockFactTableName;
-                }
-
-                @Override
-                public DataModelDesc getDataModel() {
-                    return cubeDesc.getModel();
-                }
-
-                @Override
-                public List<TblColRef> getAllColumns() {
-                    return flatDesc.getFactColumns();
-                }
-
-                @Override
-                public List<TblColRef> getFactColumns() {
-                    return null;
-                }
-
-                @Override
-                public int getColumnIndex(TblColRef colRef) {
-                    return 0;
-                }
-
-                @Override
-                public SegmentRange getSegRange() {
-                    return null;
-                }
-
-                @Override
-                public TblColRef getDistributedBy() {
-                    return null;
-                }
-
-                @Override
-                public TblColRef getClusterBy() {
-                    return null;
-                }
-
-                @Override
-                public ISegment getSegment() {
-                    return null;
-                }
-
-                @Override
-                public boolean useAlias() {
-                    return false;
-                }
-            };
-            final String dropFactTableHql = JoinedFlatTable.generateDropTableStatement(mockfactDesc);
-            // the table inputformat is sequence file
-            final String createFactTableHql = JoinedFlatTable.generateCreateTableStatement(mockfactDesc, baseLocation, JoinedFlatTable.SEQUENCEFILE);
-
-            final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
-            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, baseLocation);
-            String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
-            insertDataHqls = insertDataHqls.replace(flatDesc.getDataModel().getRootFactTableName() + " ", mockFactTableName + " ");
-
-            CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
-            CubingExecutableUtil.setCubeName(cubeName, step.getParams());
-            step.setInitStatement(hiveInitStatements);
-            step.setCreateTableStatement(dropFactTableHql + createFactTableHql + dropTableHql + createTableHql + insertDataHqls);
-            step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
-
-            intermediateTables.add(flatDesc.getTableName());
-            intermediateTables.add(mockFactTableName);
-            intermediatePaths.add(baseLocation + "/" + flatDesc.getTableName());
-            intermediatePaths.add(baseLocation + "/" + mockFactTableName);
-            return step;
-        }
 
         protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
             return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
         }
 
-        private MapReduceExecutable createSaveKafkaDataStep(String jobId, String location) {
-            MapReduceExecutable result = new MapReduceExecutable();
-            result.setName("Save data from Kafka");
-            result.setMapReduceJobClass(KafkaFlatTableJob.class);
-            JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system");
-            StringBuilder cmd = new StringBuilder();
-            jobBuilderSupport.appendMapReduceParameters(cmd);
-            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
-            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, location);
-            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
-            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
-
-            result.setMapReduceParams(cmd.toString());
-            return result;
-        }
-
         @Override
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            HiveMRInput.GarbageCollectionStep step = new HiveMRInput.GarbageCollectionStep();
-            step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
-            step.setIntermediateTables(intermediateTables);
-            step.setExternalDataPaths(intermediatePaths);
-            jobFlow.addTask(step);
+            jobFlow.addTask(createGCStep(intermediateTables, intermediatePaths));
 
         }
 
@@ -279,48 +169,8 @@ public class KafkaMRInput implements IMRInput {
 
         @Override
         public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
-
-            final MergeOffsetStep result = new MergeOffsetStep();
-            result.setName("Merge offset step");
-
-            CubingExecutableUtil.setCubeName(cubeSegment.getCubeInstance().getName(), result.getParams());
-            CubingExecutableUtil.setSegmentId(cubeSegment.getUuid(), result.getParams());
-            CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams());
-            jobFlow.addTask(result);
+            jobFlow.addTask(createMergeOffsetStep(jobFlow.getId(), cubeSegment));
         }
     }
 
-    @Deprecated
-    public static class GarbageCollectionStep extends AbstractExecutable {
-        private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
-
-        @Override
-        protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-            try {
-                rmdirOnHDFS(getDataPath());
-            } catch (IOException e) {
-                logger.error("job:" + getId() + " execute finished with exception", e);
-                return ExecuteResult.createError(e);
-            }
-
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "HDFS path " + getDataPath() + " is dropped.\n");
-        }
-
-        private void rmdirOnHDFS(String path) throws IOException {
-            Path externalDataPath = new Path(path);
-            FileSystem fs = HadoopUtil.getWorkingFileSystem();
-            if (fs.exists(externalDataPath)) {
-                fs.delete(externalDataPath, true);
-            }
-        }
-
-        public void setDataPath(String externalDataPath) {
-            setParam("dataPath", externalDataPath);
-        }
-
-        private String getDataPath() {
-            return getParam("dataPath");
-        }
-
-    }
 }
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 1d65b96..70d37aa 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -29,6 +29,7 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.spark.ISparkInput;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.ISourceAware;
@@ -61,6 +62,8 @@ public class KafkaSource implements ISource {
     public <I> I adaptToBuildEngine(Class<I> engineInterface) {
         if (engineInterface == IMRInput.class) {
             return (I) new KafkaMRInput();
+        } else if(engineInterface == ISparkInput.class) {
+            return (I) new KafkaSparkInput();
         } else {
             throw new RuntimeException("Cannot adapt to " + engineInterface);
         }
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
new file mode 100644
index 0000000..7600329
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka;
+
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.spark.ISparkInput;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class KafkaSparkInput extends KafkaInputBase implements ISparkInput {
+
+    private static final Logger logger = LoggerFactory.getLogger(KafkaSparkInput.class);
+    private CubeSegment cubeSegment;
+
+    @Override
+    public ISparkInput.ISparkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+        this.cubeSegment = (CubeSegment) flatDesc.getSegment();
+        return new BatchCubingInputSide(cubeSegment, flatDesc);
+    }
+
+    @Override
+    public ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new KafkaSparkBatchMergeInputSide((CubeSegment) seg);
+    }
+
+    public static class BatchCubingInputSide implements ISparkBatchCubingInputSide {
+
+        final JobEngineConfig conf;
+        final CubeSegment seg;
+        private CubeDesc cubeDesc;
+        private KylinConfig config;
+        protected IJoinedFlatTableDesc flatDesc;
+        protected String hiveTableDatabase;
+        final private List<String> intermediateTables = Lists.newArrayList();
+        final private List<String> intermediatePaths = Lists.newArrayList();
+        private String cubeName;
+
+        public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
+            this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+            this.config = seg.getConfig();
+            this.flatDesc = flatDesc;
+            this.hiveTableDatabase = config.getHiveDatabaseForIntermediateTable();
+            this.seg = seg;
+            this.cubeDesc = seg.getCubeDesc();
+            this.cubeName = seg.getCubeInstance().getName();
+        }
+
+        @Override
+        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
+
+            boolean onlyOneTable = cubeDesc.getModel().getLookupTables().size() == 0;
+            final String baseLocation = getJobWorkingDir(jobFlow);
+            if (onlyOneTable) {
+                // directly use flat table location
+                final String intermediateFactTable = flatDesc.getTableName();
+                final String tableLocation = baseLocation + "/" + intermediateFactTable;
+                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation, seg));
+                intermediatePaths.add(tableLocation);
+            } else {
+                final String mockFactTableName = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + cubeName.toLowerCase()
+                        + "_" + seg.getUuid().replaceAll("-", "_") + "_fact";
+                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName, seg));
+                jobFlow.addTask(createFlatTable(hiveTableDatabase, mockFactTableName, baseLocation, cubeName, cubeDesc,
+                        flatDesc, intermediateTables, intermediatePaths));
+            }
+        }
+
+        protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
+            return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
+        }
+
+        @Override
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+            jobFlow.addTask(createGCStep(intermediateTables, intermediatePaths));
+
+        }
+    }
+
+    class KafkaSparkBatchMergeInputSide implements ISparkBatchMergeInputSide {
+
+        private CubeSegment cubeSegment;
+
+        KafkaSparkBatchMergeInputSide(CubeSegment cubeSegment) {
+            this.cubeSegment = cubeSegment;
+        }
+
+        @Override
+        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+            jobFlow.addTask(createMergeOffsetStep(jobFlow.getId(), cubeSegment));
+        }
+    }
+
+}
diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml
index a2494c2..c1b4cea 100644
--- a/storage-hbase/pom.xml
+++ b/storage-hbase/pom.xml
@@ -42,6 +42,11 @@
             <artifactId>kylin-engine-mr</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-engine-spark</artifactId>
+        </dependency>
+
         <!-- Env & Test -->
         <dependency>
             <groupId>org.apache.kylin</groupId>
@@ -102,6 +107,13 @@
             <version>${powermock.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <!-- Spark dependency -->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.11</artifactId>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
index d24285b..ded6598 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
@@ -18,10 +18,10 @@
 
 package org.apache.kylin.storage.hbase;
 
-import com.google.common.base.Preconditions;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.spark.ISparkOutput;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.DataModelManager;
 import org.apache.kylin.metadata.model.IStorageAware;
@@ -32,6 +32,9 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.storage.IStorage;
 import org.apache.kylin.storage.IStorageQuery;
 import org.apache.kylin.storage.hbase.steps.HBaseMROutput2Transition;
+import org.apache.kylin.storage.hbase.steps.HBaseSparkOutputTransition;
+
+import com.google.common.base.Preconditions;
 
 @SuppressWarnings("unused")
 //used by reflection
@@ -47,14 +50,16 @@ public class HBaseStorage implements IStorage {
             CubeInstance cubeInstance = (CubeInstance) realization;
             String cubeStorageQuery;
             if (cubeInstance.getStorageType() == IStorageAware.ID_HBASE) {//v2 query engine cannot go with v1 storage now
-                throw new IllegalStateException("Storage Engine (id=" + IStorageAware.ID_HBASE + ") is not supported any more");
+                throw new IllegalStateException(
+                        "Storage Engine (id=" + IStorageAware.ID_HBASE + ") is not supported any more");
             } else {
                 cubeStorageQuery = v2CubeStorageQuery;//by default use v2
             }
 
             IStorageQuery ret;
             try {
-                ret = (IStorageQuery) Class.forName(cubeStorageQuery).getConstructor(CubeInstance.class).newInstance((CubeInstance) realization);
+                ret = (IStorageQuery) Class.forName(cubeStorageQuery).getConstructor(CubeInstance.class)
+                        .newInstance((CubeInstance) realization);
             } catch (Exception e) {
                 throw new RuntimeException("Failed to initialize storage query for " + cubeStorageQuery, e);
             }
@@ -67,11 +72,13 @@ public class HBaseStorage implements IStorage {
 
     private static TblColRef getPartitionCol(IRealization realization) {
         String modelName = realization.getModel().getName();
-        DataModelDesc dataModelDesc = DataModelManager.getInstance(KylinConfig.getInstanceFromEnv()).getDataModelDesc(modelName);
+        DataModelDesc dataModelDesc = DataModelManager.getInstance(KylinConfig.getInstanceFromEnv())
+                .getDataModelDesc(modelName);
         PartitionDesc partitionDesc = dataModelDesc.getPartitionDesc();
         Preconditions.checkArgument(partitionDesc != null, "PartitionDesc for " + realization + " is null!");
         TblColRef partitionColRef = partitionDesc.getPartitionDateColumnRef();
-        Preconditions.checkArgument(partitionColRef != null, "getPartitionDateColumnRef for " + realization + " is null");
+        Preconditions.checkArgument(partitionColRef != null,
+                "getPartitionDateColumnRef for " + realization + " is null");
         return partitionColRef;
     }
 
@@ -80,6 +87,8 @@ public class HBaseStorage implements IStorage {
     public <I> I adaptToBuildEngine(Class<I> engineInterface) {
         if (engineInterface == IMROutput2.class) {
             return (I) new HBaseMROutput2Transition();
+        } else if (engineInterface == ISparkOutput.class) {
+            return (I) new HBaseSparkOutputTransition();
         } else {
             throw new RuntimeException("Cannot adapt to " + engineInterface);
         }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
similarity index 87%
copy from storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
copy to storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
index 28ccca8..4fda139 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
@@ -18,36 +18,41 @@
 
 package org.apache.kylin.storage.hbase.steps;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidModeEnum;
-import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 
-import java.util.ArrayList;
-import java.util.List;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
-public class HBaseMRSteps extends JobBuilderSupport {
+/**
+ * Common steps for building cube into HBase
+ */
+public abstract class HBaseJobSteps extends JobBuilderSupport {
 
-    public HBaseMRSteps(CubeSegment seg) {
+    public HBaseJobSteps(CubeSegment seg) {
         super(seg, null);
     }
 
     public HadoopShellExecutable createCreateHTableStep(String jobId) {
         return createCreateHTableStep(jobId, CuboidModeEnum.CURRENT);
     }
-    
+
+    // TODO make it abstract
     public HadoopShellExecutable createCreateHTableStep(String jobId, CuboidModeEnum cuboidMode) {
         HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
         createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
@@ -63,8 +68,8 @@ public class HBaseMRSteps extends JobBuilderSupport {
         return createHtableStep;
     }
 
+    // TODO make it abstract
     public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments, String jobID, Class<? extends AbstractHadoopJob> clazz) {
-
         final List<String> mergingCuboidPaths = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
@@ -88,29 +93,9 @@ public class HBaseMRSteps extends JobBuilderSupport {
         return mergeCuboidDataStep;
     }
 
-    public MapReduceExecutable createConvertCuboidToHfileStep(String jobId) {
-        String cuboidRootPath = getCuboidRootPath(jobId);
-        String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*";
-
-        MapReduceExecutable createHFilesStep = new MapReduceExecutable();
-        createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd);
-        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
-        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
-        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getHFilePath(jobId));
-        appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step");
-
-        createHFilesStep.setMapReduceParams(cmd.toString());
-        createHFilesStep.setMapReduceJobClass(CubeHFileJob.class);
-        createHFilesStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
-
-        return createHFilesStep;
-    }
+    abstract public AbstractExecutable createConvertCuboidToHfileStep(String jobId);
 
+    // TODO make it abstract
     public HadoopShellExecutable createBulkLoadStep(String jobId) {
         HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
         bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index ea372d9..94a60f8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -42,6 +42,7 @@ import org.apache.kylin.engine.mr.steps.InMemCuboidMapper;
 import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
 import org.apache.kylin.engine.mr.steps.NDCuboidMapper;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IEngineAware;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,8 +63,13 @@ public class HBaseMROutput2Transition implements IMROutput2 {
 
     @Override
     public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) {
+
+        boolean useSpark = seg.getCubeDesc().getEngineType() == IEngineAware.ID_SPARK;
+
+        // TODO need refactor
+        final HBaseJobSteps steps = useSpark ? new HBaseSparkSteps(seg) : new HBaseMRSteps(seg);
+
         return new IMRBatchCubingOutputSide2() {
-            HBaseMRSteps steps = new HBaseMRSteps(seg);
 
             @Override
             public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 28ccca8..25683d3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -18,77 +18,26 @@
 
 package org.apache.kylin.storage.hbase.steps;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidModeEnum;
 import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.job.execution.AbstractExecutable;
 
-import java.util.ArrayList;
-import java.util.List;
-
-public class HBaseMRSteps extends JobBuilderSupport {
+public class HBaseMRSteps extends HBaseJobSteps {
 
     public HBaseMRSteps(CubeSegment seg) {
-        super(seg, null);
+        super(seg);
     }
 
     public HadoopShellExecutable createCreateHTableStep(String jobId) {
         return createCreateHTableStep(jobId, CuboidModeEnum.CURRENT);
     }
-    
-    public HadoopShellExecutable createCreateHTableStep(String jobId, CuboidModeEnum cuboidMode) {
-        HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
-        createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
-        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString());
 
-        createHtableStep.setJobParams(cmd.toString());
-        createHtableStep.setJobClass(CreateHTableJob.class);
-
-        return createHtableStep;
-    }
-
-    public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments, String jobID, Class<? extends AbstractHadoopJob> clazz) {
-
-        final List<String> mergingCuboidPaths = Lists.newArrayList();
-        for (CubeSegment merging : mergingSegments) {
-            mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
-        }
-        String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
-        String outputPath = getCuboidRootPath(jobID);
-
-        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
-        mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd);
-        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, formattedPath);
-        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
-        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
-
-        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
-        mergeCuboidDataStep.setMapReduceJobClass(clazz);
-        return mergeCuboidDataStep;
-    }
-
-    public MapReduceExecutable createConvertCuboidToHfileStep(String jobId) {
+    public AbstractExecutable createConvertCuboidToHfileStep(String jobId) {
         String cuboidRootPath = getCuboidRootPath(jobId);
         String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*";
 
@@ -110,158 +59,4 @@ public class HBaseMRSteps extends JobBuilderSupport {
 
         return createHFilesStep;
     }
-
-    public HadoopShellExecutable createBulkLoadStep(String jobId) {
-        HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
-        bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
-
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getHFilePath(jobId));
-        appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
-
-        bulkLoadStep.setJobParams(cmd.toString());
-        bulkLoadStep.setJobClass(BulkLoadJob.class);
-
-        return bulkLoadStep;
-    }
-
-    public MergeGCStep createMergeGCStep() {
-        MergeGCStep result = new MergeGCStep();
-        result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE);
-        result.setOldHTables(getMergingHTables());
-        return result;
-    }
-
-    public MergeGCStep createOptimizeGCStep() {
-        MergeGCStep result = new MergeGCStep();
-        result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
-        result.setOldHTables(getOptimizeHTables());
-        return result;
-    }
-
-    public List<CubeSegment> getOptimizeSegments() {
-        CubeInstance cube = (CubeInstance) seg.getRealization();
-        List<CubeSegment> newSegments = Lists.newArrayList(cube.getSegments(SegmentStatusEnum.READY_PENDING));
-        List<CubeSegment> oldSegments = Lists.newArrayListWithExpectedSize(newSegments.size());
-        for (CubeSegment segment : newSegments) {
-            oldSegments.add(cube.getOriginalSegmentToOptimize(segment));
-        }
-        return oldSegments;
-    }
-
-    public List<String> getOptimizeHTables() {
-        return getOldHTables(getOptimizeSegments());
-    }
-
-    public List<String> getOldHTables(final List<CubeSegment> oldSegments) {
-        final List<String> oldHTables = Lists.newArrayListWithExpectedSize(oldSegments.size());
-        for (CubeSegment segment : oldSegments) {
-            oldHTables.add(segment.getStorageLocationIdentifier());
-        }
-        return oldHTables;
-    }
-
-    public List<String> getMergingHTables() {
-        final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg);
-        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg);
-        final List<String> mergingHTables = Lists.newArrayList();
-        for (CubeSegment merging : mergingSegments) {
-            mergingHTables.add(merging.getStorageLocationIdentifier());
-        }
-        return mergingHTables;
-    }
-
-    public List<String> getMergingHDFSPaths() {
-        final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg);
-        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg);
-        final List<String> mergingHDFSPaths = Lists.newArrayList();
-        for (CubeSegment merging : mergingSegments) {
-            mergingHDFSPaths.add(getJobWorkingDir(merging.getLastBuildJobID()));
-        }
-        return mergingHDFSPaths;
-    }
-
-    public List<String> getOptimizeHDFSPaths() {
-        return getOldHDFSPaths(getOptimizeSegments());
-    }
-
-    public List<String> getOldHDFSPaths(final List<CubeSegment> oldSegments) {
-        final List<String> oldHDFSPaths = Lists.newArrayListWithExpectedSize(oldSegments.size());
-        for (CubeSegment oldSegment : oldSegments) {
-            oldHDFSPaths.add(getJobWorkingDir(oldSegment.getLastBuildJobID()));
-        }
-        return oldHDFSPaths;
-    }
-
-    public String getHFilePath(String jobId) {
-        return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/hfile/");
-    }
-
-    public String getRowkeyDistributionOutputPath(String jobId) {
-        return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/rowkey_stats");
-    }
-
-    public void addOptimizeGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
-        String jobId = jobFlow.getId();
-
-        List<String> toDeletePaths = new ArrayList<>();
-        toDeletePaths.add(getOptimizationRootPath(jobId));
-
-        HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
-        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
-        step.setDeletePaths(toDeletePaths);
-        step.setJobId(jobId);
-
-        jobFlow.addTask(step);
-    }
-
-    public void addCheckpointGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
-        String jobId = jobFlow.getId();
-
-        jobFlow.addTask(createOptimizeGCStep());
-
-        List<String> toDeletePaths = new ArrayList<>();
-        toDeletePaths.addAll(getOptimizeHDFSPaths());
-
-        HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
-        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
-        step.setDeletePaths(toDeletePaths);
-        step.setJobId(jobId);
-
-        jobFlow.addTask(step);
-    }
-
-    public void addMergingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
-        String jobId = jobFlow.getId();
-
-        jobFlow.addTask(createMergeGCStep());
-
-        List<String> toDeletePaths = new ArrayList<>();
-        toDeletePaths.addAll(getMergingHDFSPaths());
-        toDeletePaths.add(getHFilePath(jobId));
-
-        HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
-        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
-        step.setDeletePaths(toDeletePaths);
-        step.setJobId(jobId);
-
-        jobFlow.addTask(step);
-    }
-
-    public void addCubingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
-        String jobId = jobFlow.getId();
-
-        List<String> toDeletePaths = new ArrayList<>();
-        toDeletePaths.add(getFactDistinctColumnsPath(jobId));
-        toDeletePaths.add(getHFilePath(jobId));
-
-        HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
-        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE);
-        step.setDeletePaths(toDeletePaths);
-        step.setJobId(jobId);
-
-        jobFlow.addTask(step);
-    }
-
 }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
new file mode 100644
index 0000000..08f58af
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.util.List;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
+import org.apache.kylin.engine.spark.ISparkOutput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "Transition" impl generates cuboid files and then convert to HFile.
+ * The additional step slows down build process, but the gains is merge
+ * can read from HDFS instead of over HBase region server. See KYLIN-1007.
+ * 
+ * This is transitional because finally we want to merge from HTable snapshot.
+ * However multiple snapshots as MR input is only supported by HBase 1.x.
+ * Before most users upgrade to latest HBase, they can only use this transitional
+ * cuboid file solution.
+ */
+public class HBaseSparkOutputTransition implements ISparkOutput {
+
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(HBaseSparkOutputTransition.class);
+
+    @Override
+    public ISparkBatchCubingOutputSide getBatchCubingOutputSide(final CubeSegment seg) {
+        final HBaseJobSteps steps = new HBaseSparkSteps(seg);
+
+        return new ISparkBatchCubingOutputSide() {
+
+            @Override
+            public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.createCreateHTableStep(jobFlow.getId()));
+            }
+
+            @Override
+            public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
+                jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+            }
+
+            @Override
+            public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+                // nothing to do
+            }
+
+        };
+    }
+
+    @Override
+    public ISparkBatchMergeOutputSide getBatchMergeOutputSide(final CubeSegment seg) {
+        return new ISparkBatchMergeOutputSide() {
+            HBaseSparkSteps steps = new HBaseSparkSteps(seg);
+
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.createCreateHTableStep(jobFlow.getId()));
+            }
+
+            @Override
+            public void addStepPhase2_BuildCube(CubeSegment seg, List<CubeSegment> mergingSegments,
+                    DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(
+                        steps.createMergeCuboidDataStep(seg, mergingSegments, jobFlow.getId(), MergeCuboidJob.class));
+                jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
+                jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+            }
+
+            @Override
+            public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
+                steps.addMergingGarbageCollectionSteps(jobFlow);
+            }
+
+        };
+    }
+
+    public ISparkBatchOptimizeOutputSide getBatchOptimizeOutputSide(final CubeSegment seg) {
+        return null;
+    }
+}
\ No newline at end of file
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
new file mode 100644
index 0000000..c2794ff
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.steps;
+
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.spark.SparkBatchCubingJobBuilder2;
+import org.apache.kylin.engine.spark.SparkExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
+
+public class HBaseSparkSteps extends HBaseJobSteps {
+
+    public HBaseSparkSteps(CubeSegment seg) {
+        super(seg);
+    }
+
+    public AbstractExecutable createConvertCuboidToHfileStep(String jobId) {
+        String cuboidRootPath = getCuboidRootPath(jobId);
+        String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/");
+
+        SparkBatchCubingJobBuilder2 jobBuilder2 = new SparkBatchCubingJobBuilder2(seg, null);
+        final SparkExecutable sparkExecutable = new SparkExecutable();
+        sparkExecutable.setClassName(SparkCubeHFile.class.getName());
+        sparkExecutable.setParam(SparkCubeHFile.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+        sparkExecutable.setParam(SparkCubeHFile.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+        sparkExecutable.setParam(SparkCubeHFile.OPTION_INPUT_PATH.getOpt(), inputPath);
+        sparkExecutable.setParam(SparkCubeHFile.OPTION_META_URL.getOpt(),
+                jobBuilder2.getSegmentMetadataUrl(seg.getConfig(), jobId));
+        sparkExecutable.setParam(SparkCubeHFile.OPTION_OUTPUT_PATH.getOpt(), getHFilePath(jobId));
+        sparkExecutable.setParam(SparkCubeHFile.OPTION_PARTITION_FILE_PATH.getOpt(), getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
+
+        sparkExecutable.setJobId(jobId);
+
+        StringBuilder jars = new StringBuilder();
+        StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.KeyValue.class));
+        StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.class));
+        StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.regionserver.BloomType.class));
+        StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.protobuf.generated.HFileProtos.class)); //hbase-protocal.jar
+        StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.CompatibilityFactory.class)); //hbase-hadoop-compact.jar
+        StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar("org.htrace.HTraceConfiguration", null)); // htrace-core.jar
+        StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar("org.apache.htrace.Trace", null)); // htrace-core.jar
+        StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar("com.yammer.metrics.core.MetricsRegistry", null)); // metrics-core.jar
+
+        StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
+        sparkExecutable.setJars(jars.toString());
+
+        sparkExecutable.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
+        sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+
+        return sparkExecutable;
+    }
+
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
index 222c9f4..c67d54d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.storage.hbase.steps;
 
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.List;
 
@@ -32,7 +33,7 @@ import org.apache.kylin.metadata.model.MeasureDesc;
 /**
  * @author George Song (ysong1)
  */
-public class KeyValueCreator {
+public class KeyValueCreator implements Serializable {
     byte[] cfBytes;
     byte[] qBytes;
     long timestamp;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowKeyWritable.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowKeyWritable.java
index 599a3f4..bba68da 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowKeyWritable.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowKeyWritable.java
@@ -18,19 +18,21 @@
 
 package org.apache.kylin.storage.hbase.steps;
 
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.Serializable;
 
-public class RowKeyWritable implements WritableComparable<RowKeyWritable> {
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.kylin.common.util.BytesUtil;
+
+public class RowKeyWritable implements WritableComparable<RowKeyWritable>, Serializable {
     private byte[] data;
     private int offset;
     private int length;
-    private KeyValue.KVComparator kvComparator = new KeyValue.KVComparator();
+    private final static SerializableKVComparator kvComparator = new SerializableKVComparator();
 
     static {
         WritableComparator.define(RowKeyWritable.class, new RowKeyComparator());
@@ -56,6 +58,18 @@ public class RowKeyWritable implements WritableComparable<RowKeyWritable> {
         this.length = length;
     }
 
+    public byte[] getData() {
+        return data;
+    }
+
+    public int getOffset() {
+        return offset;
+    }
+
+    public int getLength() {
+        return length;
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         out.writeInt(this.length);
@@ -70,12 +84,23 @@ public class RowKeyWritable implements WritableComparable<RowKeyWritable> {
         this.offset = 0;
     }
 
+    @Override
     public int compareTo(RowKeyWritable other) {
         return kvComparator.compare(this.data, this.offset, this.length, other.data, other.offset, other.length);
     }
 
-    public static class RowKeyComparator extends WritableComparator {
-        private KeyValue.KVComparator kvComparator = new KeyValue.KVComparator();
+    @Override
+    public String toString() {
+        return BytesUtil.toHex(data, offset, length);
+    }
+
+    public static class SerializableKVComparator extends KeyValue.KVComparator implements Serializable {
+
+    }
+
+    public static class RowKeyComparator extends WritableComparator implements Serializable {
+        public static final RowKeyComparator INSTANCE = new RowKeyComparator();
+        private SerializableKVComparator kvComparator = new SerializableKVComparator();
         private static final int LENGTH_BYTES = 4;
 
         @Override
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
new file mode 100644
index 0000000..b2571ae
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.engine.spark.KylinSparkJobListener;
+import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.spark.Partitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import scala.Tuple2;
+
+/**
+ * Spark application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase.
+ */
+public class SparkCubeHFile extends AbstractApplication implements Serializable {
+
+    protected static final Logger logger = LoggerFactory.getLogger(SparkCubeHFile.class);
+
+    public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
+            .withDescription("Cube Segment Id").create("segmentId");
+    public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create("metaUrl");
+    public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+            .isRequired(true).withDescription("HFile output path").create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+            .isRequired(true).withDescription("Cuboid files PATH").create(BatchConstants.ARG_INPUT);
+    public static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName(BatchConstants.ARG_PARTITION)
+            .hasArg().isRequired(true).withDescription("Partition file path.").create(BatchConstants.ARG_PARTITION);
+
+    private Options options;
+
+    private KylinSparkJobListener jobListener;
+
+    public SparkCubeHFile() {
+        options = new Options();
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_META_URL);
+        options.addOption(OPTION_OUTPUT_PATH);
+        options.addOption(OPTION_PARTITION_FILE_PATH);
+        jobListener = new KylinSparkJobListener();
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        final String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+        final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+        final Path partitionFilePath = new Path(optionsHelper.getOptionValue(OPTION_PARTITION_FILE_PATH));
+
+        Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), KeyValueCreator.class,
+                KeyValue.class, RowKeyWritable.class };
+
+        SparkConf conf = new SparkConf().setAppName("Covnerting Hfile for:" + cubeName + " segment " + segmentId);
+        //serialization conf
+        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
+        conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
+
+        JavaSparkContext sc = new JavaSparkContext(conf);
+        sc.sc().addSparkListener(jobListener);
+        final FileSystem fs = partitionFilePath.getFileSystem(sc.hadoopConfiguration());
+        if (!fs.exists(partitionFilePath)) {
+            throw new IllegalArgumentException("File not exist: " + partitionFilePath.toString());
+        }
+
+        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+        final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+
+        final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+        final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+
+        final MeasureCodec inputCodec = new MeasureCodec(cubeDesc.getMeasures());
+        final List<KeyValueCreator> keyValueCreators = Lists.newArrayList();
+
+        for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
+            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+                keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
+            }
+        }
+
+        final int cfNum = keyValueCreators.size();
+        final boolean quickPath = (keyValueCreators.size() == 1) && keyValueCreators.get(0).isFullCopy;
+
+        logger.info("Input path: {}", inputPath);
+        logger.info("Output path: {}", outputPath);
+
+        List<JavaPairRDD> inputRDDs = parseInputPath(inputPath, fs, sc);
+        final JavaPairRDD<Text, Text> allCuboidFile = sc.union(inputRDDs.toArray(new JavaPairRDD[inputRDDs.size()]));
+        final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
+        if (quickPath) {
+            hfilerdd = allCuboidFile.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+                @Override
+                public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
+                    KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
+                            textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength());
+                    return new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
+                }
+            });
+        } else {
+            hfilerdd = allCuboidFile
+                    .flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+                        @Override
+                        public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
+                                throws Exception {
+
+                            List<Tuple2<RowKeyWritable, KeyValue>> result = Lists.newArrayListWithExpectedSize(cfNum);
+                            Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
+                            inputCodec.decode(
+                                    ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength()),
+                                    inputMeasures);
+
+                            for (int i = 0; i < cfNum; i++) {
+                                KeyValue outputValue = keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
+                                result.add(new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
+                                        outputValue));
+                            }
+
+                            return result.iterator();
+                        }
+                    });
+        }
+
+        allCuboidFile.unpersist();
+
+        // read partition split keys
+        List<RowKeyWritable> keys = new ArrayList<>();
+        try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, partitionFilePath, sc.hadoopConfiguration())) {
+            RowKeyWritable key = new RowKeyWritable();
+            Writable value = NullWritable.get();
+            while (reader.next(key, value)) {
+                keys.add(key);
+                logger.info(" ------- split key: " + key);
+                key = new RowKeyWritable(); // important, new an object!
+            }
+        }
+
+        logger.info("There are " + keys.size() + " split keys, totally " + (keys.size() + 1) + " hfiles");
+
+        final JavaPairRDD<ImmutableBytesWritable, KeyValue> hfilerdd2 = hfilerdd
+                .repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
+                        RowKeyWritable.RowKeyComparator.INSTANCE)
+                .mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, ImmutableBytesWritable, KeyValue>() {
+                    @Override
+                    public Tuple2<ImmutableBytesWritable, KeyValue> call(
+                            Tuple2<RowKeyWritable, KeyValue> rowKeyWritableKeyValueTuple2) throws Exception {
+                        return new Tuple2<>(new ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
+                                rowKeyWritableKeyValueTuple2._2);
+                    }
+                });
+
+        Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
+        HadoopUtil.healSickConfig(hbaseConf);
+        Job job = new Job(hbaseConf, cubeSegment.getStorageLocationIdentifier());
+        HTable table = new HTable(hbaseConf, cubeSegment.getStorageLocationIdentifier());
+        try {
+            HFileOutputFormat2.configureIncrementalLoadMap(job, table);
+        } catch (IOException ioe) {
+            // this can be ignored.
+            logger.debug(ioe.getMessage());
+        }
+
+        hfilerdd2.saveAsNewAPIHadoopFile(outputPath, ImmutableBytesWritable.class, KeyValue.class,
+                HFileOutputFormat2.class, job.getConfiguration());
+
+        System.out.println("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
+        // deleteHDFSMeta(metaUrl);
+    }
+
+    private List<JavaPairRDD> parseInputPath(String inputPath, FileSystem fs, JavaSparkContext sc) throws IOException {
+        List<JavaPairRDD> inputRDDs = Lists.newArrayList();
+        Path inputHDFSPath = new Path(inputPath);
+        FileStatus[] fileStatuses = fs.listStatus(inputHDFSPath);
+        boolean hasDir = false;
+        for (FileStatus stat : fileStatuses) {
+            if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
+                hasDir = true;
+                inputRDDs.add(sc.sequenceFile(stat.getPath().toString(), Text.class, Text.class));
+            }
+        }
+
+        if (!hasDir) {
+            inputRDDs.add(sc.sequenceFile(inputHDFSPath.toString(), Text.class, Text.class));
+        }
+
+        return inputRDDs;
+    }
+
+    static class HFilePartitioner extends Partitioner {
+        private List<RowKeyWritable> keys;
+
+        public HFilePartitioner(List splitKyes) {
+            keys = splitKyes;
+        }
+
+        @Override
+        public int numPartitions() {
+            return keys.size() + 1;
+        }
+
+        @Override
+        public int getPartition(Object o) {
+            int pos = Collections.binarySearch(this.keys, (RowKeyWritable) o) + 1;
+            return pos < 0 ? -pos : pos;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            HFilePartitioner that = (HFilePartitioner) o;
+            return Objects.equals(keys, that.keys);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(keys);
+        }
+    }
+
+    protected void deleteHDFSMeta(String metaUrl) throws IOException {
+        String realHdfsPath = StorageURL.valueOf(metaUrl).getParameter("path");
+        HadoopUtil.getFileSystem(realHdfsPath).delete(new Path(realHdfsPath), true);
+        logger.info("Delete metadata in HDFS for this job: " + realHdfsPath);
+    }
+
+}
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/HFilePartitionerTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/HFilePartitionerTest.java
new file mode 100644
index 0000000..827aef8
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/HFilePartitionerTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.util.ArrayList;
+
+import org.apache.kylin.common.util.BytesUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HFilePartitionerTest {
+
+    @Test
+    public void testPartitioner() {
+        String[] splitKyes = new String[] {
+                "\\x00\\x0A\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x5F\\x03\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF",
+                "\\x00\\x02\\x00\\x01\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF",
+                "\\x00\\x0A\\x00\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x58\\xF3\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF",
+                "\\x00\\x02\\x00\\x02\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF",
+                "\\x00\\x0A\\x00\\x02\\x00\\x00\\x00\\x00\\x00\\x00\\x58\\x5C\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF",
+                "\\x00\\x0A\\x00\\x02\\x00\\x00\\x00\\x00\\x00\\x00\\x7C\\x50\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF",
+                "\\x00\\x02\\x00\\x03\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF",
+                "\\x00\\x0A\\x00\\x03\\x00\\x00\\x00\\x00\\x00\\x00\\x5B\\xF3\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF",
+                "\\x00\\x02\\x00\\x04\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF",
+                "\\x00\\x0A\\x00\\x04\\x00\\x00\\x00\\x00\\x00\\x00\\x5B\\xFC\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF", };
+
+        ArrayList<RowKeyWritable> keys = new ArrayList();
+        for (int i = 0; i < splitKyes.length; i++) {
+            byte[] bytes = BytesUtil.fromHex(splitKyes[i]);
+            RowKeyWritable rowKeyWritable = new RowKeyWritable(bytes);
+            keys.add(rowKeyWritable);
+        }
+
+        SparkCubeHFile.HFilePartitioner partitioner = new SparkCubeHFile.HFilePartitioner(keys);
+
+        String testRowKey = "\\x00\\x11\\x00\\x02\\x00\\x00\\x00\\x00\\x00\\x00\\x40\\xAC\\x0B\\x37\\xF9\\x05\\x04\\x02\\x00\\x02\\x46\\x32\\x4D\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x04";
+
+        int partition = partitioner.getPartition(new RowKeyWritable(BytesUtil.fromHex(testRowKey)));
+
+        Assert.assertEquals(4, partition);
+
+        testRowKey = "\\x00\\x0D\\x00\\x04\\x00\\x00\\x00\\x00\\x00\\x00\\x40\\x00\\x0B\\x39\\x6C\\x02\\x46\\x32\\x4D\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x04";
+        partition = partitioner.getPartition(new RowKeyWritable(BytesUtil.fromHex(testRowKey)));
+
+        Assert.assertEquals(9, partition);
+
+    }
+
+}