You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/11 08:19:02 UTC

[kylin] branch master updated (0d9fbbe -> cddb493)

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from 0d9fbbe  KYLIN-3416 Group by with expression can not aggregate exactly
     new 195640e  KYLIN-3710 JDBC data source support Spark cubing
     new cddb493  KYLIN-3680 Spark cubing failed with JDBC data source

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kylin/common/util/HadoopUtil.java   |  24 +++
 .../java/org/apache/kylin/engine/mr/IInput.java    |  31 +--
 .../java/org/apache/kylin/engine/mr/IMRInput.java  |  24 +--
 .../java/org/apache/kylin/engine/mr/MRUtil.java    |   7 +-
 .../org/apache/kylin/engine/spark/ISparkInput.java |  24 +--
 .../org/apache/kylin/engine/spark/SparkUtil.java   |  73 ++++---
 .../apache/kylin/source/hive/HiveInputBase.java    |  90 ++++++++-
 .../org/apache/kylin/source/hive/HiveMRInput.java  | 103 ++--------
 .../apache/kylin/source/hive/HiveSparkInput.java   |  84 +-------
 ...JdbcHiveMRInput.java => JdbcHiveInputBase.java} |  49 ++---
 .../apache/kylin/source/jdbc/JdbcHiveMRInput.java  | 225 +++------------------
 .../kylin/source/jdbc/JdbcHiveSparkInput.java      |  48 +++++
 .../org/apache/kylin/source/jdbc/JdbcSource.java   |   3 +
 ...JdbcHiveMRInput.java => JdbcHiveInputBase.java} |  18 +-
 .../source/jdbc/extensible/JdbcHiveMRInput.java    | 116 +++--------
 .../source/jdbc/extensible/JdbcHiveSparkInput.java |  55 +++++
 .../kylin/source/jdbc/extensible/JdbcSource.java   |   3 +
 .../jdbc/extensible/JdbcHiveMRInputTest.java       |  12 +-
 .../apache/kylin/source/kafka/KafkaInputBase.java  |  77 ++++++-
 .../apache/kylin/source/kafka/KafkaMRInput.java    |  88 +-------
 .../apache/kylin/source/kafka/KafkaSparkInput.java |  86 +-------
 21 files changed, 473 insertions(+), 767 deletions(-)
 copy engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java => engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java (66%)
 copy source-jdbc/src/main/java/org/apache/kylin/source/jdbc/{JdbcHiveMRInput.java => JdbcHiveInputBase.java} (93%)
 create mode 100644 source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveSparkInput.java
 copy source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/{JdbcHiveMRInput.java => JdbcHiveInputBase.java} (90%)
 create mode 100644 source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveSparkInput.java


[kylin] 02/02: KYLIN-3680 Spark cubing failed with JDBC data source

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit cddb493884e624c12c3452bad4de7482fc37de8d
Author: chao long <wa...@qq.com>
AuthorDate: Mon Dec 10 13:05:27 2018 +0800

    KYLIN-3680 Spark cubing failed with JDBC data source
---
 .../org/apache/kylin/common/util/HadoopUtil.java   | 24 +++++++++
 .../org/apache/kylin/engine/spark/SparkUtil.java   | 63 ++++++++++++----------
 2 files changed, 60 insertions(+), 27 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index f3123a2..5d09ea7 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -236,4 +236,28 @@ public class HadoopUtil {
         return readFromSequenceFile(getCurrentConfiguration(), inputPath);
     }
 
+    public static boolean isSequenceFile(Configuration conf, Path filePath) {
+        try (SequenceFile.Reader reader = new SequenceFile.Reader(getWorkingFileSystem(conf), filePath, conf)) {
+            return true;
+        } catch (Exception e) {
+            logger.warn("Read sequence file {} failed.", filePath.getName(), e);
+            return false;
+        }
+    }
+
+    public static boolean isSequenceDir(Configuration conf, Path fileDir) throws IOException {
+        FileSystem fs = getWorkingFileSystem(conf);
+        FileStatus[] fileStatuses = fs.listStatus(fileDir, new PathFilter() {
+            @Override
+            public boolean accept(Path path) {
+                return !"_SUCCESS".equals(path.getName());
+            }
+        });
+
+        if (fileStatuses != null && fileStatuses.length > 0) {
+            return isSequenceFile(conf, fileStatuses[0].getPath());
+        }
+
+        return false;
+    }
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index 1c4086d..151103a 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.EngineFactory;
@@ -142,39 +143,47 @@ public class SparkUtil {
         sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.DefaultCodec"); // or org.apache.hadoop.io.compress.SnappyCodec
     }
 
-    public static JavaRDD<String[]> hiveRecordInputRDD(boolean isSequenceFile, JavaSparkContext sc, String inputPath, String hiveTable) {
+    public static JavaRDD<String[]> hiveRecordInputRDD(boolean isSequenceFile, JavaSparkContext sc, String inputPath, String hiveTable) throws IOException {
         JavaRDD<String[]> recordRDD;
 
-        if (isSequenceFile) {
-            recordRDD = sc.sequenceFile(inputPath, BytesWritable.class, Text.class).values()
-                    .map(new Function<Text, String[]>() {
-                        @Override
-                        public String[] call(Text text) throws Exception {
-                            String s = Bytes.toString(text.getBytes(), 0, text.getLength());
-                            return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
-                        }
-                    });
+        if (isSequenceFile && HadoopUtil.isSequenceDir(sc.hadoopConfiguration(), new Path(inputPath))) {
+            recordRDD = getSequenceFormatHiveInput(sc, inputPath);
         } else {
-            SparkSession sparkSession = SparkSession.builder().config(sc.getConf()).enableHiveSupport().getOrCreate();
-            final Dataset intermediateTable = sparkSession.table(hiveTable);
-            recordRDD = intermediateTable.javaRDD().map(new Function<Row, String[]>() {
-                @Override
-                public String[] call(Row row) throws Exception {
-                    String[] result = new String[row.size()];
-                    for (int i = 0; i < row.size(); i++) {
-                        final Object o = row.get(i);
-                        if (o != null) {
-                            result[i] = o.toString();
-                        } else {
-                            result[i] = null;
-                        }
-                    }
-                    return result;
-                }
-            });
+            recordRDD = getOtherFormatHiveInput(sc, hiveTable);
         }
 
         return recordRDD;
     }
 
+    private static JavaRDD<String[]> getSequenceFormatHiveInput(JavaSparkContext sc, String inputPath) {
+        return sc.sequenceFile(inputPath, BytesWritable.class, Text.class).values()
+                .map(new Function<Text, String[]>() {
+                    @Override
+                    public String[] call(Text text) throws Exception {
+                        String s = Bytes.toString(text.getBytes(), 0, text.getLength());
+                        return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
+                    }
+                });
+    }
+
+    private static JavaRDD<String[]> getOtherFormatHiveInput(JavaSparkContext sc, String hiveTable) {
+        SparkSession sparkSession = SparkSession.builder().config(sc.getConf()).enableHiveSupport().getOrCreate();
+        final Dataset intermediateTable = sparkSession.table(hiveTable);
+        return intermediateTable.javaRDD().map(new Function<Row, String[]>() {
+            @Override
+            public String[] call(Row row) throws Exception {
+                String[] result = new String[row.size()];
+                for (int i = 0; i < row.size(); i++) {
+                    final Object o = row.get(i);
+                    if (o != null) {
+                        result[i] = o.toString();
+                    } else {
+                        result[i] = null;
+                    }
+                }
+                return result;
+            }
+        });
+    }
+
 }


[kylin] 01/02: KYLIN-3710 JDBC data source support Spark cubing

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 195640eb0499a449a02b412af7e247cdcde47cec
Author: chao long <wa...@qq.com>
AuthorDate: Mon Dec 10 13:04:59 2018 +0800

    KYLIN-3710 JDBC data source support Spark cubing
---
 .../java/org/apache/kylin/engine/mr/IInput.java    |  31 +--
 .../java/org/apache/kylin/engine/mr/IMRInput.java  |  24 +--
 .../java/org/apache/kylin/engine/mr/MRUtil.java    |   7 +-
 .../org/apache/kylin/engine/spark/ISparkInput.java |  24 +--
 .../org/apache/kylin/engine/spark/SparkUtil.java   |  10 +-
 .../apache/kylin/source/hive/HiveInputBase.java    |  90 ++++++++-
 .../org/apache/kylin/source/hive/HiveMRInput.java  | 103 ++--------
 .../apache/kylin/source/hive/HiveSparkInput.java   |  84 +-------
 ...JdbcHiveMRInput.java => JdbcHiveInputBase.java} |  49 ++---
 .../apache/kylin/source/jdbc/JdbcHiveMRInput.java  | 225 +++------------------
 .../kylin/source/jdbc/JdbcHiveSparkInput.java      |  48 +++++
 .../org/apache/kylin/source/jdbc/JdbcSource.java   |   3 +
 ...JdbcHiveMRInput.java => JdbcHiveInputBase.java} |  18 +-
 .../source/jdbc/extensible/JdbcHiveMRInput.java    | 116 +++--------
 .../source/jdbc/extensible/JdbcHiveSparkInput.java |  55 +++++
 .../kylin/source/jdbc/extensible/JdbcSource.java   |   3 +
 .../jdbc/extensible/JdbcHiveMRInputTest.java       |  12 +-
 .../apache/kylin/source/kafka/KafkaInputBase.java  |  77 ++++++-
 .../apache/kylin/source/kafka/KafkaMRInput.java    |  88 +-------
 .../apache/kylin/source/kafka/KafkaSparkInput.java |  86 +-------
 20 files changed, 413 insertions(+), 740 deletions(-)

diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
similarity index 66%
copy from engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
copy to engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
index 5459c70..758b081 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
@@ -6,44 +6,31 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
-package org.apache.kylin.engine.spark;
+package org.apache.kylin.engine.mr;
 
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 
-/**
- * Any ISource that wishes to serve as input of MapReduce build engine must adapt to this interface.
- */
-public interface ISparkInput {
+public interface IInput {
 
     /** Return a helper to participate in batch cubing job flow. */
-    public ISparkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
 
     /** Return a helper to participate in batch cubing merge job flow. */
-    public ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
-
-    /**
-     * Participate the batch cubing flow as the input side. Responsible for creating
-     * intermediate flat table (Phase 1) and clean up any leftover (Phase 4).
-     * 
-     * - Phase 1: Create Flat Table
-     * - Phase 2: Build Dictionary (with FlatTableInputFormat)
-     * - Phase 3: Build Cube (with FlatTableInputFormat)
-     * - Phase 4: Update Metadata & Cleanup
-     */
-    public interface ISparkBatchCubingInputSide {
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
 
+    public interface IBatchCubingInputSide {
         /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
 
@@ -51,7 +38,7 @@ public interface ISparkInput {
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
 
-    public interface ISparkBatchMergeInputSide {
+    public interface IBatchMergeInputSide {
 
         /** Add step that executes before merge dictionary and before merge cube. */
         public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index c259c4e..74153e0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -22,25 +22,16 @@ import java.util.Collection;
 
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.TableDesc;
 
 /**
  * Any ISource that wishes to serve as input of MapReduce build engine must adapt to this interface.
  */
-public interface IMRInput {
-
-    /** Return a helper to participate in batch cubing job flow. */
-    public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
+public interface IMRInput extends IInput {
 
     /** Return an InputFormat that reads from specified table. */
     public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid);
 
-    /** Return a helper to participate in batch cubing merge job flow. */
-    public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
-
     /**
      * Utility that configures mapper to read from a table.
      */
@@ -65,22 +56,13 @@ public interface IMRInput {
      * - Phase 3: Build Cube (with FlatTableInputFormat)
      * - Phase 4: Update Metadata & Cleanup
      */
-    public interface IMRBatchCubingInputSide {
+    public interface IMRBatchCubingInputSide extends IBatchCubingInputSide {
 
         /** Return an InputFormat that reads from the intermediate flat table */
         public IMRTableInputFormat getFlatTableInputFormat();
-
-        /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
-
-        /** Add step that does necessary clean up, like delete the intermediate flat table */
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
 
-    public interface IMRBatchMergeInputSide {
-
-        /** Add step that executes before merge dictionary and before merge cube. */
-        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+    public interface IMRBatchMergeInputSide extends IBatchMergeInputSide {
 
     }
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 3a0fb84..60d0445 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -26,6 +26,7 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.dict.lookup.LookupProviderFactory;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchMergeInputSide;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
@@ -39,7 +40,7 @@ public class MRUtil {
 
     public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
         IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
-        return SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc);
+        return (IMRBatchCubingInputSide)SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc);
     }
 
     public static IMRTableInputFormat getTableInputFormat(String tableName, String prj, String uuid) {
@@ -63,8 +64,8 @@ public class MRUtil {
         return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
     }
 
-    public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
-        return SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
+    public static IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+        return (IMRBatchMergeInputSide)SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
     }
 
     public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
index 5459c70..4af616c 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
@@ -18,20 +18,12 @@
 
 package org.apache.kylin.engine.spark;
 
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.engine.mr.IInput;
 
 /**
  * Any ISource that wishes to serve as input of MapReduce build engine must adapt to this interface.
  */
-public interface ISparkInput {
-
-    /** Return a helper to participate in batch cubing job flow. */
-    public ISparkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
-
-    /** Return a helper to participate in batch cubing merge job flow. */
-    public ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
+public interface ISparkInput extends IInput {
 
     /**
      * Participate the batch cubing flow as the input side. Responsible for creating
@@ -42,19 +34,11 @@ public interface ISparkInput {
      * - Phase 3: Build Cube (with FlatTableInputFormat)
      * - Phase 4: Update Metadata & Cleanup
      */
-    public interface ISparkBatchCubingInputSide {
+    public interface ISparkBatchCubingInputSide extends IBatchCubingInputSide {
 
-        /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
-
-        /** Add step that does necessary clean up, like delete the intermediate flat table */
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
 
-    public interface ISparkBatchMergeInputSide {
-
-        /** Add step that executes before merge dictionary and before merge cube. */
-        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+    public interface ISparkBatchMergeInputSide extends IBatchMergeInputSide {
 
     }
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index 82a1a9b..1c4086d 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -43,6 +43,8 @@ import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.kylin.engine.spark.ISparkInput.ISparkBatchCubingInputSide;
+import org.apache.kylin.engine.spark.ISparkInput.ISparkBatchMergeInputSide;
 
 import com.google.common.collect.Lists;
 import org.apache.spark.api.java.function.Function;
@@ -52,9 +54,9 @@ import org.apache.spark.sql.SparkSession;
 
 public class SparkUtil {
 
-    public static ISparkInput.ISparkBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+    public static ISparkBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
         IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
-        return SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchCubingInputSide(flatDesc);
+        return (ISparkBatchCubingInputSide)SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchCubingInputSide(flatDesc);
     }
 
     public static ISparkOutput.ISparkBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
@@ -65,8 +67,8 @@ public class SparkUtil {
         return StorageFactory.createEngineAdapter(seg, ISparkOutput.class).getBatchMergeOutputSide(seg);
     }
 
-    public static ISparkInput.ISparkBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
-        return SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchMergeInputSide(seg);
+    public static ISparkBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+        return (ISparkBatchMergeInputSide)SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchMergeInputSide(seg);
     }
 
     public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index c55015b..2f25e50 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -19,16 +19,22 @@
 package org.apache.kylin.source.hive;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.HiveCmdBuilder;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.IInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
@@ -48,9 +54,82 @@ import com.google.common.collect.Sets;
 
 public class HiveInputBase {
 
-    @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(HiveInputBase.class);
 
+    public static class BaseBatchCubingInputSide implements IInput.IBatchCubingInputSide {
+
+        final protected IJoinedFlatTableDesc flatDesc;
+        final protected String flatTableDatabase;
+        final protected String hdfsWorkingDir;
+
+        List<String> hiveViewIntermediateTables = Lists.newArrayList();
+
+        public BaseBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            this.flatDesc = flatDesc;
+            this.flatTableDatabase = config.getHiveDatabaseForIntermediateTable();
+            this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
+        }
+
+        @Override
+        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
+            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+            CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+            final KylinConfig cubeConfig = cubeInstance.getConfig();
+
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+
+            // create flat table first
+            addStepPhase1_DoCreateFlatTable(jobFlow);
+
+            // then count and redistribute
+            if (cubeConfig.isHiveRedistributeEnabled()) {
+                jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc,
+                        cubeInstance.getDescriptor()));
+            }
+
+            // special for hive
+            addStepPhase1_DoMaterializeLookupTable(jobFlow);
+        }
+
+        protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
+            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
+
+            jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
+        }
+
+        protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
+
+            AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir,
+                    flatDesc, hiveViewIntermediateTables, jobFlow.getId());
+            if (task != null) {
+                jobFlow.addTask(task);
+            }
+        }
+
+        @Override
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
+
+            org.apache.kylin.source.hive.GarbageCollectionStep step = new org.apache.kylin.source.hive.GarbageCollectionStep();
+            step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
+            step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
+            step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));
+            step.setHiveViewIntermediateTableIdentities(StringUtil.join(hiveViewIntermediateTables, ","));
+            jobFlow.addTask(step);
+        }
+
+        protected String getIntermediateTableIdentity() {
+            return flatTableDatabase + "." + flatDesc.getTableName();
+        }
+    }
+
+    // ===== static methods ======
+
     protected static String getTableNameForHCat(TableDesc table, String uuid) {
         String tableName = (table.isView()) ? table.getMaterializedName(uuid) : table.getName();
         String database = (table.isView()) ? KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable()
@@ -58,15 +137,6 @@ public class HiveInputBase {
         return String.format(Locale.ROOT, "%s.%s", database, tableName).toUpperCase(Locale.ROOT);
     }
 
-    protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow, String hdfsWorkingDir,
-            IJoinedFlatTableDesc flatTableDesc, String flatTableDatabase) {
-        final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
-        final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-        final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
-        jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatTableDesc));
-    }
-
     protected static AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir,
             String cubeName, IJoinedFlatTableDesc flatDesc) {
         //from hive to hive
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index d6b85ed..df20b2c 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -28,42 +28,23 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.hive.hcatalog.mapreduce.HCatSplit;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
 
 public class HiveMRInput extends HiveInputBase implements IMRInput {
 
-    private static final Logger logger = LoggerFactory.getLogger(HiveMRInput.class);
-
     @Override
-    public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        return new BatchCubingInputSide(flatDesc);
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+        return new HiveMRBatchCubingInputSide(flatDesc);
     }
 
     @Override
-    public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
-        return new HiveTableInputFormat(getTableNameForHCat(table, uuid));
-    }
-
-    @Override
-    public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
         return new IMRBatchMergeInputSide() {
             @Override
             public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
@@ -72,6 +53,11 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
         };
     }
 
+    @Override
+    public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
+        return new HiveTableInputFormat(getTableNameForHCat(table, uuid));
+    }
+
     public static class HiveTableInputFormat implements IMRTableInputFormat {
         final String dbName;
         final String tableName;
@@ -111,80 +97,15 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
         }
     }
 
-    public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
-
-        final protected IJoinedFlatTableDesc flatDesc;
-        final protected String flatTableDatabase;
-        final protected String hdfsWorkingDir;
-
-        List<String> hiveViewIntermediateTables = Lists.newArrayList();
-
-        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-            this.flatDesc = flatDesc;
-            this.flatTableDatabase = config.getHiveDatabaseForIntermediateTable();
-            this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
-        }
-
-        @Override
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
-            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
-            final KylinConfig cubeConfig = cubeInstance.getConfig();
-
-            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-
-            // create flat table first
-            addStepPhase1_DoCreateFlatTable(jobFlow);
-
-            // then count and redistribute
-            if (cubeConfig.isHiveRedistributeEnabled()) {
-                jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc,
-                        cubeInstance.getDescriptor()));
-            }
+    public static class HiveMRBatchCubingInputSide extends BaseBatchCubingInputSide implements IMRBatchCubingInputSide {
 
-            // special for hive
-            addStepPhase1_DoMaterializeLookupTable(jobFlow);
-        }
-
-        protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
-            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
-            jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
-        }
-
-        protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
-            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
-            AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir,
-                    flatDesc, hiveViewIntermediateTables, jobFlow.getId());
-            if (task != null) {
-                jobFlow.addTask(task);
-            }
-        }
-
-        @Override
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
-            org.apache.kylin.source.hive.GarbageCollectionStep step = new org.apache.kylin.source.hive.GarbageCollectionStep();
-            step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
-            step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
-            step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));
-            step.setHiveViewIntermediateTableIdentities(StringUtil.join(hiveViewIntermediateTables, ","));
-            jobFlow.addTask(step);
+        public HiveMRBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            super(flatDesc);
         }
 
         @Override
         public IMRTableInputFormat getFlatTableInputFormat() {
-            return new HiveTableInputFormat(getIntermediateTableIdentity());
-        }
-
-        private String getIntermediateTableIdentity() {
-            return flatTableDatabase + "." + flatDesc.getTableName();
+            return new HiveMRInput.HiveTableInputFormat(getIntermediateTableIdentity());
         }
     }
 
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
index d710db7..0660a66 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
@@ -18,39 +18,26 @@
 
 package org.apache.kylin.source.hive;
 
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.spark.ISparkInput;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-
 public class HiveSparkInput extends HiveInputBase implements ISparkInput {
 
     @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(HiveSparkInput.class);
 
     @Override
-    public ISparkInput.ISparkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        return new BatchCubingInputSide(flatDesc);
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+        return new SparkBatchCubingInputSide(flatDesc);
     }
 
     @Override
-    public ISparkInput.ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
-        return new ISparkInput.ISparkBatchMergeInputSide() {
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new ISparkBatchMergeInputSide() {
             @Override
             public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
                 // doing nothing
@@ -58,67 +45,10 @@ public class HiveSparkInput extends HiveInputBase implements ISparkInput {
         };
     }
 
-    public class BatchCubingInputSide implements ISparkBatchCubingInputSide {
-
-        final protected IJoinedFlatTableDesc flatDesc;
-        final protected String flatTableDatabase;
-        final protected String hdfsWorkingDir;
-
-        List<String> hiveViewIntermediateTables = Lists.newArrayList();
-
-        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-            this.flatDesc = flatDesc;
-            this.flatTableDatabase = config.getHiveDatabaseForIntermediateTable();
-            this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
-        }
-
-        @Override
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
-            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
-            final KylinConfig cubeConfig = cubeInstance.getConfig();
-            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-
-            // create flat table first
-            addStepPhase1_DoCreateFlatTable(jobFlow, hdfsWorkingDir, flatDesc, flatTableDatabase);
-
-            // then count and redistribute
-            if (cubeConfig.isHiveRedistributeEnabled()) {
-                jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc,
-                        cubeInstance.getDescriptor()));
-            }
-
-            // special for hive
-            addStepPhase1_DoMaterializeLookupTable(jobFlow);
-        }
-
-        protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
-            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
-            AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir,
-                    flatDesc, hiveViewIntermediateTables, jobFlow.getId());
-            if (task != null) {
-                jobFlow.addTask(task);
-            }
-        }
-
-        @Override
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
+    public static class SparkBatchCubingInputSide extends BaseBatchCubingInputSide implements ISparkBatchCubingInputSide {
 
-            GarbageCollectionStep step = new GarbageCollectionStep();
-            step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
-            step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
-            step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));
-            step.setHiveViewIntermediateTableIdentities(StringUtil.join(hiveViewIntermediateTables, ","));
-            jobFlow.addTask(step);
-        }
-
-        private String getIntermediateTableIdentity() {
-            return flatTableDatabase + "." + flatDesc.getTableName();
+        public SparkBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            super(flatDesc);
         }
     }
-
 }
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java
similarity index 93%
copy from source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
copy to source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java
index 3460dd2..3769473 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java
@@ -6,21 +6,18 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 package org.apache.kylin.source.jdbc;
 
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
+import com.google.common.collect.Maps;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.SourceConfigurationUtil;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
@@ -34,32 +31,28 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableExtDesc;
-import org.apache.kylin.metadata.model.TableExtDesc.ColumnStats;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.hive.HiveMRInput;
+import org.apache.kylin.source.hive.HiveInputBase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
-
-public class JdbcHiveMRInput extends HiveMRInput {
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
 
-    private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class);
+public class JdbcHiveInputBase extends HiveInputBase {
+    private static final Logger logger = LoggerFactory.getLogger(JdbcHiveInputBase.class);
     private static final String MR_OVERRIDE_QUEUE_KEY = "mapreduce.job.queuename";
     private static final String DEFAULT_QUEUE = "default";
 
-    public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        return new BatchCubingInputSide(flatDesc);
-    }
-
-    public static class BatchCubingInputSide extends HiveMRInput.BatchCubingInputSide {
+    public static class JdbcBaseBatchCubingInputSide extends BaseBatchCubingInputSide {
 
-        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+        public JdbcBaseBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
             super(flatDesc);
         }
 
-        private KylinConfig getConfig() {
+        protected KylinConfig getConfig() {
             return flatDesc.getDataModel().getConfig();
         }
 
@@ -73,6 +66,11 @@ public class JdbcHiveMRInput extends HiveMRInput {
             jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, jobWorkingDir));
         }
 
+        @Override
+        protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
+            // skip
+        }
+
         private AbstractExecutable createFlatHiveTableFromFiles(String hiveInitStatements, String jobWorkingDir) {
             final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
             String filedDelimiter = getConfig().getJdbcSourceFieldDelimiter();
@@ -112,7 +110,7 @@ public class JdbcHiveMRInput extends HiveMRInput {
             long maxCardinality = 0;
             for (TableRef tableRef : flatDesc.getDataModel().getAllTables()) {
                 TableExtDesc tableExtDesc = tblManager.getTableExt(tableRef.getTableDesc());
-                List<ColumnStats> columnStatses = tableExtDesc.getColumnStats();
+                List<TableExtDesc.ColumnStats> columnStatses = tableExtDesc.getColumnStats();
                 if (!columnStatses.isEmpty()) {
                     for (TblColRef colRef : tableRef.getColumns()) {
                         long cardinality = columnStatses.get(colRef.getColumnDesc().getZeroBasedIndex())
@@ -175,14 +173,14 @@ public class JdbcHiveMRInput extends HiveMRInput {
             String filedDelimiter = config.getJdbcSourceFieldDelimiter();
             int mapperNum = config.getSqoopMapperNum();
 
-            String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM \"%s\".%s as %s", splitColumn,
+            String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM %s.%s as %s", splitColumn,
                     splitColumn, splitDatabase, splitTable, splitTableAlias);
             if (partitionDesc.isPartitioned()) {
                 SegmentRange segRange = flatDesc.getSegRange();
                 if (segRange != null && !segRange.isInfinite()) {
                     if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
                             && (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
-                                    .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
+                            .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
                         String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
                                 partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
                                         flatDesc.getSegment(), segRange),
@@ -210,11 +208,6 @@ public class JdbcHiveMRInput extends HiveMRInput {
             return step;
         }
 
-        @Override
-        protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
-            // skip
-        }
-
         protected String generateSqoopConfigArgString() {
             KylinConfig kylinConfig = getConfig();
             Map<String, String> config = Maps.newHashMap();
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index 3460dd2..19f354c 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@ -17,222 +17,45 @@
 */
 package org.apache.kylin.source.jdbc;
 
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.SourceConfigurationUtil;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.util.FlatTableSqlQuoteUtils;
-import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.SegmentRange;
-import org.apache.kylin.metadata.model.TableExtDesc;
-import org.apache.kylin.metadata.model.TableExtDesc.ColumnStats;
-import org.apache.kylin.metadata.model.TableRef;
-import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.hive.HiveMRInput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-
-public class JdbcHiveMRInput extends HiveMRInput {
 
-    private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class);
-    private static final String MR_OVERRIDE_QUEUE_KEY = "mapreduce.job.queuename";
-    private static final String DEFAULT_QUEUE = "default";
+public class JdbcHiveMRInput extends JdbcHiveInputBase implements IMRInput {
 
-    public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        return new BatchCubingInputSide(flatDesc);
+    @Override
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+        return new JdbcMRBatchCubingInputSide(flatDesc);
     }
 
-    public static class BatchCubingInputSide extends HiveMRInput.BatchCubingInputSide {
-
-        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-            super(flatDesc);
-        }
-
-        private KylinConfig getConfig() {
-            return flatDesc.getDataModel().getConfig();
-        }
-
-        @Override
-        protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
-            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
-            jobFlow.addTask(createSqoopToFlatHiveStep(jobWorkingDir, cubeName));
-            jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, jobWorkingDir));
-        }
-
-        private AbstractExecutable createFlatHiveTableFromFiles(String hiveInitStatements, String jobWorkingDir) {
-            final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
-            String filedDelimiter = getConfig().getJdbcSourceFieldDelimiter();
-            // Sqoop does not support exporting SEQUENSEFILE to Hive now SQOOP-869
-            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir,
-                    "TEXTFILE", filedDelimiter);
-
-            HiveCmdStep step = new HiveCmdStep();
-            step.setCmd(hiveInitStatements + dropTableHql + createTableHql);
-            step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
-            return step;
-        }
-
-        /**
-         * Choose a better split-by column for sqoop. The strategy is:
-         * 1. Prefer ClusteredBy column
-         * 2. Prefer DistributedBy column
-         * 3. Prefer Partition date column
-         * 4. Prefer Higher cardinality column
-         * 5. Prefer numeric column
-         * 6. Pick a column at first glance
-         * @return A column reference <code>TblColRef</code>for sqoop split-by
-         */
-        protected TblColRef determineSplitColumn() {
-            if (null != flatDesc.getClusterBy()) {
-                return flatDesc.getClusterBy();
-            }
-            if (null != flatDesc.getDistributedBy()) {
-                return flatDesc.getDistributedBy();
-            }
-            PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc();
-            if (partitionDesc.isPartitioned()) {
-                return partitionDesc.getPartitionDateColumnRef();
-            }
-            TblColRef splitColumn = null;
-            TableMetadataManager tblManager = TableMetadataManager.getInstance(getConfig());
-            long maxCardinality = 0;
-            for (TableRef tableRef : flatDesc.getDataModel().getAllTables()) {
-                TableExtDesc tableExtDesc = tblManager.getTableExt(tableRef.getTableDesc());
-                List<ColumnStats> columnStatses = tableExtDesc.getColumnStats();
-                if (!columnStatses.isEmpty()) {
-                    for (TblColRef colRef : tableRef.getColumns()) {
-                        long cardinality = columnStatses.get(colRef.getColumnDesc().getZeroBasedIndex())
-                                .getCardinality();
-                        splitColumn = cardinality > maxCardinality ? colRef : splitColumn;
-                    }
-                }
-            }
-            if (null == splitColumn) {
-                for (TblColRef colRef : flatDesc.getAllColumns()) {
-                    if (colRef.getType().isIntegerFamily()) {
-                        return colRef;
-                    }
-                }
-                splitColumn = flatDesc.getAllColumns().get(0);
-            }
-
-            return splitColumn;
-        }
-
-        private String getSqoopJobQueueName(KylinConfig config) {
-            Map<String, String> mrConfigOverride = config.getMRConfigOverride();
-            if (mrConfigOverride.containsKey(MR_OVERRIDE_QUEUE_KEY)) {
-                return mrConfigOverride.get(MR_OVERRIDE_QUEUE_KEY);
+    @Override
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new IMRBatchMergeInputSide() {
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+                // doing nothing
             }
-            return DEFAULT_QUEUE;
-        }
-
-        protected AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, String cubeName) {
-            KylinConfig config = getConfig();
-            PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc();
-            String partCol = null;
-
-            if (partitionDesc.isPartitioned()) {
-                partCol = partitionDesc.getPartitionDateColumn();//tablename.colname
-            }
-
-            String splitTable;
-            String splitTableAlias;
-            String splitColumn;
-            String splitDatabase;
-            TblColRef splitColRef = determineSplitColumn();
-            splitTable = splitColRef.getTableRef().getTableName();
-            splitTableAlias = splitColRef.getTableAlias();
-            splitColumn = JoinedFlatTable.getQuotedColExpressionInSourceDB(flatDesc, splitColRef);
-            splitDatabase = splitColRef.getColumnDesc().getTable().getDatabase();
-
-            //using sqoop to extract data from jdbc source and dump them to hive
-            String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { partCol });
-            selectSql = escapeQuotationInSql(selectSql);
-
-
-
-            String hiveTable = flatDesc.getTableName();
-            String connectionUrl = config.getJdbcSourceConnectionUrl();
-            String driverClass = config.getJdbcSourceDriver();
-            String jdbcUser = config.getJdbcSourceUser();
-            String jdbcPass = config.getJdbcSourcePass();
-            String sqoopHome = config.getSqoopHome();
-            String filedDelimiter = config.getJdbcSourceFieldDelimiter();
-            int mapperNum = config.getSqoopMapperNum();
+        };
+    }
 
-            String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM \"%s\".%s as %s", splitColumn,
-                    splitColumn, splitDatabase, splitTable, splitTableAlias);
-            if (partitionDesc.isPartitioned()) {
-                SegmentRange segRange = flatDesc.getSegRange();
-                if (segRange != null && !segRange.isInfinite()) {
-                    if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
-                            && (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
-                                    .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
-                        String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
-                                partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
-                                        flatDesc.getSegment(), segRange),
-                                "`");
-                        bquery += " WHERE " + quotedPartCond;
-                    }
-                }
-            }
-            bquery = escapeQuotationInSql(bquery);
+    @Override
+    public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
+        return new HiveMRInput.HiveTableInputFormat(getTableNameForHCat(table, uuid));
+    }
 
-            // escape ` in cmd
-            splitColumn = escapeQuotationInSql(splitColumn);
+    public static class JdbcMRBatchCubingInputSide extends JdbcBaseBatchCubingInputSide implements IMRBatchCubingInputSide {
 
-            String cmd = String.format(Locale.ROOT,
-                    "%s/bin/sqoop import" + generateSqoopConfigArgString()
-                            + "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
-                            + "--target-dir %s/%s --split-by %s --boundary-query \"%s\" --null-string '' "
-                            + "--fields-terminated-by '%s' --num-mappers %d",
-                    sqoopHome, connectionUrl, driverClass, jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable,
-                    splitColumn, bquery, filedDelimiter, mapperNum);
-            logger.debug(String.format(Locale.ROOT, "sqoop cmd:%s", cmd));
-            CmdStep step = new CmdStep();
-            step.setCmd(cmd);
-            step.setName(ExecutableConstants.STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE);
-            return step;
+        public JdbcMRBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            super(flatDesc);
         }
 
         @Override
-        protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
-            // skip
-        }
-
-        protected String generateSqoopConfigArgString() {
-            KylinConfig kylinConfig = getConfig();
-            Map<String, String> config = Maps.newHashMap();
-            config.put("mapreduce.job.queuename", getSqoopJobQueueName(kylinConfig)); // override job queue from mapreduce config
-            config.putAll(SourceConfigurationUtil.loadSqoopConfiguration());
-            config.putAll(kylinConfig.getSqoopConfigOverride());
-
-            StringBuilder args = new StringBuilder(" -Dorg.apache.sqoop.splitter.allow_text_splitter=true ");
-            for (Map.Entry<String, String> entry : config.entrySet()) {
-                args.append(" -D" + entry.getKey() + "=" + entry.getValue() + " ");
-            }
-            return args.toString();
+        public IMRTableInputFormat getFlatTableInputFormat() {
+            return new HiveMRInput.HiveTableInputFormat(getIntermediateTableIdentity());
         }
     }
 
-    protected static String escapeQuotationInSql(String sqlExpr) {
-        sqlExpr = sqlExpr.replaceAll("\"", "\\\\\"");
-        sqlExpr = sqlExpr.replaceAll("`", "\\\\`");
-        return sqlExpr;
-    }
 }
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveSparkInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveSparkInput.java
new file mode 100644
index 0000000..8a8471a
--- /dev/null
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveSparkInput.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc;
+
+import org.apache.kylin.engine.spark.ISparkInput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+
+public class JdbcHiveSparkInput extends JdbcHiveInputBase implements ISparkInput {
+
+    @Override
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+        return new JdbcSparkBatchCubingInputSide(flatDesc);
+    }
+
+    @Override
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new ISparkBatchMergeInputSide() {
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+                // doing nothing
+            }
+        };
+    }
+
+    public static class JdbcSparkBatchCubingInputSide extends JdbcBaseBatchCubingInputSide implements ISparkBatchCubingInputSide {
+
+        public JdbcSparkBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            super(flatDesc);
+        }
+    }
+}
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
index 3bf7498..1bda6c2 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.spark.ISparkInput;
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.IReadableTable;
@@ -44,6 +45,8 @@ public class JdbcSource implements ISource {
     public <I> I adaptToBuildEngine(Class<I> engineInterface) {
         if (engineInterface == IMRInput.class) {
             return (I) new JdbcHiveMRInput();
+        } else if (engineInterface == ISparkInput.class) {
+            return (I) new JdbcHiveSparkInput();
         } else {
             throw new RuntimeException("Cannot adapt to " + engineInterface);
         }
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
similarity index 90%
copy from source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
copy to source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
index 2e57a44..10eb31e 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.kylin.source.jdbc.extensible;
 
 import org.apache.hadoop.util.StringUtils;
@@ -34,23 +35,14 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Locale;
 
-public class JdbcHiveMRInput extends org.apache.kylin.source.jdbc.JdbcHiveMRInput {
-    private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class);
-
-    private final JdbcConnector dataSource;
+public class JdbcHiveInputBase extends org.apache.kylin.source.jdbc.JdbcHiveInputBase {
 
-    JdbcHiveMRInput(JdbcConnector dataSource) {
-        this.dataSource = dataSource;
-    }
-
-    public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        return new BatchCubingInputSide(flatDesc, dataSource);
-    }
+    private static final Logger logger = LoggerFactory.getLogger(JdbcHiveInputBase.class);
 
-    public static class BatchCubingInputSide extends org.apache.kylin.source.jdbc.JdbcHiveMRInput.BatchCubingInputSide {
+    public static class JDBCBaseBatchCubingInputSide extends JdbcBaseBatchCubingInputSide {
         private final JdbcConnector dataSource;
 
-        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc, JdbcConnector dataSource) {
+        public JDBCBaseBatchCubingInputSide(IJoinedFlatTableDesc flatDesc, JdbcConnector dataSource) {
             super(flatDesc);
             this.dataSource = dataSource;
         }
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
index 2e57a44..7df4ab5 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
@@ -17,25 +17,15 @@
  */
 package org.apache.kylin.source.jdbc.extensible;
 
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.util.FlatTableSqlQuoteUtils;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.SegmentRange;
-import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.sdk.datasource.framework.JdbcConnector;
-import org.apache.kylin.source.jdbc.sqoop.SqoopCmdStep;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.kylin.source.hive.HiveMRInput;
 
-import java.util.Locale;
-
-public class JdbcHiveMRInput extends org.apache.kylin.source.jdbc.JdbcHiveMRInput {
-    private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class);
+public class JdbcHiveMRInput extends JdbcHiveInputBase implements IMRInput {
 
     private final JdbcConnector dataSource;
 
@@ -44,86 +34,34 @@ public class JdbcHiveMRInput extends org.apache.kylin.source.jdbc.JdbcHiveMRInpu
     }
 
     public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        return new BatchCubingInputSide(flatDesc, dataSource);
+        return new JdbcMRBatchCubingInputSide(flatDesc, dataSource);
     }
 
-    public static class BatchCubingInputSide extends org.apache.kylin.source.jdbc.JdbcHiveMRInput.BatchCubingInputSide {
-        private final JdbcConnector dataSource;
-
-        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc, JdbcConnector dataSource) {
-            super(flatDesc);
-            this.dataSource = dataSource;
-        }
-
-        protected JdbcConnector getDataSource() {
-            return dataSource;
-        }
-
-        @Override
-        protected AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, String cubeName) {
-            KylinConfig config = flatDesc.getDataModel().getConfig();
-            PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc();
-            String partCol = null;
-
-            if (partitionDesc.isPartitioned()) {
-                partCol = partitionDesc.getPartitionDateColumn(); //tablename.colname
+    @Override
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new IMRBatchMergeInputSide() {
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+                // doing nothing
             }
+        };
+    }
 
-            String splitTable;
-            String splitTableAlias;
-            String splitColumn;
-            String splitDatabase;
-            TblColRef splitColRef = determineSplitColumn();
-            splitTable = splitColRef.getTableRef().getTableName();
-            splitTable = splitColRef.getTableRef().getTableDesc().getName();
-            splitTableAlias = splitColRef.getTableAlias();
-            //to solve case sensitive if necessary
-            splitColumn = JoinedFlatTable.getQuotedColExpressionInSourceDB(flatDesc, splitColRef);
-            splitDatabase = splitColRef.getColumnDesc().getTable().getDatabase().toLowerCase(Locale.ROOT);
-
-            //using sqoop to extract data from jdbc source and dump them to hive
-            String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { partCol });
-            selectSql = escapeQuotationInSql(dataSource.convertSql(selectSql));
-
-            String hiveTable = flatDesc.getTableName();
-            String sqoopHome = config.getSqoopHome();
-            String filedDelimiter = config.getJdbcSourceFieldDelimiter();
-            int mapperNum = config.getSqoopMapperNum();
-
-            String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM `%s`.%s as `%s`", splitColumn, splitColumn,
-                    splitDatabase, splitTable, splitTableAlias);
-            bquery = dataSource.convertSql(bquery);
-            if (partitionDesc.isPartitioned()) {
-                SegmentRange segRange = flatDesc.getSegRange();
-                if (segRange != null && !segRange.isInfinite()) {
-                    if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
-                            && (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
-                            .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
-                        String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
-                                partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
-                                        flatDesc.getSegment(), segRange),
-                                "`");
-                        bquery += " WHERE " + quotedPartCond;
-                    }
-                }
-            }
-            bquery = escapeQuotationInSql(bquery);
+    @Override
+    public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
+        return new HiveMRInput.HiveTableInputFormat(getTableNameForHCat(table, uuid));
+    }
 
-            splitColumn = escapeQuotationInSql(dataSource.convertColumn(splitColumn, FlatTableSqlQuoteUtils.QUOTE));
+    public static class JdbcMRBatchCubingInputSide extends JDBCBaseBatchCubingInputSide implements IMRInput.IMRBatchCubingInputSide {
 
-            String cmd = StringUtils.format(
-                    "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
-                            + "--target-dir %s/%s --split-by %s --boundary-query \"%s\" --null-string '' "
-                            + "--fields-terminated-by '%s' --num-mappers %d",
-                    dataSource.getJdbcUrl(), dataSource.getJdbcDriver(), dataSource.getJdbcUser(),
-                    dataSource.getJdbcPassword(), selectSql, jobWorkingDir, hiveTable, splitColumn, bquery,
-                    filedDelimiter, mapperNum);
-            logger.debug("sqoop cmd: {}", cmd);
+        public JdbcMRBatchCubingInputSide(IJoinedFlatTableDesc flatDesc, JdbcConnector dataSource) {
+            super(flatDesc, dataSource);
+        }
 
-            SqoopCmdStep step = new SqoopCmdStep();
-            step.setCmd(cmd);
-            step.setName(ExecutableConstants.STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE);
-            return step;
+        @Override
+        public IMRInput.IMRTableInputFormat getFlatTableInputFormat() {
+            return new HiveMRInput.HiveTableInputFormat(getIntermediateTableIdentity());
         }
     }
+
 }
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveSparkInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveSparkInput.java
new file mode 100644
index 0000000..a5701ad
--- /dev/null
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveSparkInput.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc.extensible;
+
+import org.apache.kylin.engine.spark.ISparkInput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.sdk.datasource.framework.JdbcConnector;
+
+public class JdbcHiveSparkInput extends JdbcHiveInputBase implements ISparkInput {
+
+    private final JdbcConnector dataSource;
+
+    JdbcHiveSparkInput(JdbcConnector dataSource) {
+        this.dataSource = dataSource;
+    }
+
+    @Override
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+        return new JdbcSparkBatchCubingInputSide(flatDesc, dataSource);
+    }
+
+    @Override
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new ISparkBatchMergeInputSide() {
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+                // doing nothing
+            }
+        };
+    }
+
+    public static class JdbcSparkBatchCubingInputSide extends JDBCBaseBatchCubingInputSide implements ISparkBatchCubingInputSide {
+
+        public JdbcSparkBatchCubingInputSide(IJoinedFlatTableDesc flatDesc, JdbcConnector dataSource) {
+            super(flatDesc, dataSource);
+        }
+    }
+}
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcSource.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcSource.java
index 3e8f0fd..da055e1 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcSource.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcSource.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.spark.ISparkInput;
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.sdk.datasource.framework.JdbcConnector;
@@ -60,6 +61,8 @@ public class JdbcSource implements ISource {
     public <I> I adaptToBuildEngine(Class<I> engineInterface) {
         if (engineInterface == IMRInput.class) {
             return (I) new JdbcHiveMRInput(dataSource);
+        } else if (engineInterface == ISparkInput.class) {
+            return (I) new JdbcHiveSparkInput(dataSource);
         } else {
             throw new RuntimeException("Cannot adapt to " + engineInterface);
         }
diff --git a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
index 956f86c..20c37ef 100644
--- a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
+++ b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
@@ -60,7 +60,7 @@ public class JdbcHiveMRInputTest extends TestBase {
         CubeSegment seg = cubeManager.appendSegment(cubeManager.getCube(cubeDesc.getName()),
                 new SegmentRange.TSRange(System.currentTimeMillis() - 100L, System.currentTimeMillis() + 100L));
         CubeJoinedFlatTableDesc flatDesc = new CubeJoinedFlatTableDesc(seg);
-        JdbcHiveMRInput.BatchCubingInputSide inputSide = (JdbcHiveMRInput.BatchCubingInputSide) input
+        JdbcHiveMRInput.JdbcMRBatchCubingInputSide inputSide = (JdbcHiveMRInput.JdbcMRBatchCubingInputSide) input
                 .getBatchCubingInputSide(flatDesc);
 
         AbstractExecutable executable = new MockInputSide(flatDesc, inputSide).createSqoopToFlatHiveStep("/tmp",
@@ -86,7 +86,7 @@ public class JdbcHiveMRInputTest extends TestBase {
         CubeSegment seg = cubeManager.appendSegment(cubeManager.getCube(cubeDesc.getName()),
                 new SegmentRange.TSRange(0L, Long.MAX_VALUE));
         CubeJoinedFlatTableDesc flatDesc = new CubeJoinedFlatTableDesc(seg);
-        JdbcHiveMRInput.BatchCubingInputSide inputSide = (JdbcHiveMRInput.BatchCubingInputSide) input
+        JdbcHiveMRInput.JdbcMRBatchCubingInputSide inputSide = (JdbcHiveMRInput.JdbcMRBatchCubingInputSide) input
                 .getBatchCubingInputSide(flatDesc);
 
         AbstractExecutable executable = new MockInputSide(flatDesc, inputSide).createSqoopToFlatHiveStep("/tmp",
@@ -111,7 +111,7 @@ public class JdbcHiveMRInputTest extends TestBase {
         CubeSegment seg = cubeManager.appendSegment(cubeManager.getCube(cubeDesc.getName()),
                 new SegmentRange.TSRange(System.currentTimeMillis() - 100L, System.currentTimeMillis() + 100L));
         CubeJoinedFlatTableDesc flatDesc = new CubeJoinedFlatTableDesc(seg);
-        JdbcHiveMRInput.BatchCubingInputSide inputSide = (JdbcHiveMRInput.BatchCubingInputSide) input
+        JdbcHiveMRInput.JdbcMRBatchCubingInputSide inputSide = (JdbcHiveMRInput.JdbcMRBatchCubingInputSide) input
                 .getBatchCubingInputSide(flatDesc);
 
         AbstractExecutable executable = new MockInputSide(flatDesc, inputSide).createSqoopToFlatHiveStep("/tmp",
@@ -127,10 +127,10 @@ public class JdbcHiveMRInputTest extends TestBase {
         source.close();
     }
 
-    private static class MockInputSide extends JdbcHiveMRInput.BatchCubingInputSide {
-        JdbcHiveMRInput.BatchCubingInputSide input;
+    private static class MockInputSide extends JdbcHiveMRInput.JdbcMRBatchCubingInputSide {
+        JdbcHiveMRInput.JdbcMRBatchCubingInputSide input;
 
-        public MockInputSide(IJoinedFlatTableDesc flatDesc, JdbcHiveMRInput.BatchCubingInputSide input) {
+        public MockInputSide(IJoinedFlatTableDesc flatDesc, JdbcHiveMRInput.JdbcMRBatchCubingInputSide input) {
             super(flatDesc, input.getDataSource());
             this.input = input;
         }
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
index cb2e14c..7620ab3 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
@@ -20,18 +20,23 @@ package org.apache.kylin.source.kafka;
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Locale;
 import java.util.Set;
 
-import com.google.common.collect.Sets;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.IInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
@@ -45,8 +50,78 @@ import org.apache.kylin.source.hive.GarbageCollectionStep;
 import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
 import org.apache.kylin.source.kafka.job.MergeOffsetStep;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 public class KafkaInputBase {
 
+    public static class BaseBatchCubingInputSide implements IInput.IBatchCubingInputSide {
+
+        final JobEngineConfig conf;
+        final CubeSegment seg;
+        private CubeDesc cubeDesc;
+        private KylinConfig config;
+        protected IJoinedFlatTableDesc flatDesc;
+        protected String hiveTableDatabase;
+        final private List<String> intermediateTables = Lists.newArrayList();
+        final private List<String> intermediatePaths = Lists.newArrayList();
+        private String cubeName;
+
+        public BaseBatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
+            this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+            this.config = seg.getConfig();
+            this.flatDesc = flatDesc;
+            this.hiveTableDatabase = config.getHiveDatabaseForIntermediateTable();
+            this.seg = seg;
+            this.cubeDesc = seg.getCubeDesc();
+            this.cubeName = seg.getCubeInstance().getName();
+        }
+
+        @Override
+        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
+
+            boolean onlyOneTable = cubeDesc.getModel().getLookupTables().size() == 0;
+            final String baseLocation = getJobWorkingDir(jobFlow);
+            if (onlyOneTable) {
+                // directly use flat table location
+                final String intermediateFactTable = flatDesc.getTableName();
+                final String tableLocation = baseLocation + "/" + intermediateFactTable;
+                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation, seg));
+                intermediatePaths.add(tableLocation);
+            } else {
+                final String mockFactTableName = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX
+                        + cubeName.toLowerCase(Locale.ROOT) + "_" + seg.getUuid().replaceAll("-", "_") + "_fact";
+                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName, seg));
+                jobFlow.addTask(createFlatTable(hiveTableDatabase, mockFactTableName, baseLocation, cubeName, cubeDesc,
+                        flatDesc, intermediateTables, intermediatePaths));
+            }
+        }
+
+        protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
+            return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
+        }
+
+        @Override
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+            jobFlow.addTask(createGCStep(intermediateTables, intermediatePaths));
+
+        }
+    }
+
+    public static  class BaseBatchMergeInputSide implements IInput.IBatchMergeInputSide {
+
+        private CubeSegment cubeSegment;
+
+        BaseBatchMergeInputSide(CubeSegment cubeSegment) {
+            this.cubeSegment = cubeSegment;
+        }
+
+        @Override
+        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+            jobFlow.addTask(createMergeOffsetStep(jobFlow.getId(), cubeSegment));
+        }
+    }
+
     protected static AbstractExecutable createMergeOffsetStep(String jobId, CubeSegment cubeSegment) {
 
         final MergeOffsetStep result = new MergeOffsetStep();
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 1c94f9c..d709572 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -20,8 +20,6 @@ package org.apache.kylin.source.kafka;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -30,46 +28,36 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
 
 public class KafkaMRInput extends KafkaInputBase implements IMRInput {
 
-    private static final Logger logger = LoggerFactory.getLogger(KafkaMRInput.class);
     private CubeSegment cubeSegment;
 
     @Override
-    public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
         this.cubeSegment = (CubeSegment) flatDesc.getSegment();
-        return new BatchCubingInputSide(cubeSegment, flatDesc);
+        return new KafkaMRBatchCubingInputSide(cubeSegment, flatDesc);
     }
 
     @Override
-    public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
-
-        return new KafkaTableInputFormat(cubeSegment, null);
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new KafkaMRBatchMergeInputSide((CubeSegment)seg);
     }
 
     @Override
-    public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
-        return new KafkaMRBatchMergeInputSide((CubeSegment) seg);
+    public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
+        return new KafkaTableInputFormat(cubeSegment, null);
     }
 
     public static class KafkaTableInputFormat implements IMRTableInputFormat {
@@ -110,56 +98,10 @@ public class KafkaMRInput extends KafkaInputBase implements IMRInput {
         }
     }
 
-    public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
-
-        final JobEngineConfig conf;
-        final CubeSegment seg;
-        private CubeDesc cubeDesc;
-        private KylinConfig config;
-        protected IJoinedFlatTableDesc flatDesc;
-        protected String hiveTableDatabase;
-        private List<String> intermediateTables = Lists.newArrayList();
-        private List<String> intermediatePaths = Lists.newArrayList();
-        private String cubeName;
-
-        public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
-            this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
-            this.config = seg.getConfig();
-            this.flatDesc = flatDesc;
-            this.hiveTableDatabase = config.getHiveDatabaseForIntermediateTable();
-            this.seg = seg;
-            this.cubeDesc = seg.getCubeDesc();
-            this.cubeName = seg.getCubeInstance().getName();
-        }
-
-        @Override
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
-
-            boolean onlyOneTable = cubeDesc.getModel().getLookupTables().size() == 0;
-            final String baseLocation = getJobWorkingDir(jobFlow);
-            if (onlyOneTable) {
-                // directly use flat table location
-                final String intermediateFactTable = flatDesc.getTableName();
-                final String tableLocation = baseLocation + "/" + intermediateFactTable;
-                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation, seg));
-                intermediatePaths.add(tableLocation);
-            } else {
-                final String mockFactTableName = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX
-                        + cubeName.toLowerCase(Locale.ROOT) + "_" + seg.getUuid().replaceAll("-", "_") + "_fact";
-                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName, seg));
-                jobFlow.addTask(createFlatTable(hiveTableDatabase, mockFactTableName, baseLocation, cubeName, cubeDesc,
-                        flatDesc, intermediateTables, intermediatePaths));
-            }
-        }
-
-        protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
-            return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
-        }
-
-        @Override
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            jobFlow.addTask(createGCStep(intermediateTables, intermediatePaths));
+    public static class KafkaMRBatchCubingInputSide extends BaseBatchCubingInputSide implements IMRBatchCubingInputSide {
 
+        public KafkaMRBatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
+            super(seg, flatDesc);
         }
 
         @Override
@@ -168,18 +110,10 @@ public class KafkaMRInput extends KafkaInputBase implements IMRInput {
         }
     }
 
-    class KafkaMRBatchMergeInputSide implements IMRBatchMergeInputSide {
-
-        private CubeSegment cubeSegment;
+    public static class KafkaMRBatchMergeInputSide extends BaseBatchMergeInputSide implements IMRBatchMergeInputSide {
 
         KafkaMRBatchMergeInputSide(CubeSegment cubeSegment) {
-            this.cubeSegment = cubeSegment;
-        }
-
-        @Override
-        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
-            jobFlow.addTask(createMergeOffsetStep(jobFlow.getId(), cubeSegment));
+            super(cubeSegment);
         }
     }
-
 }
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
index 7db6c32..edbc002 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
@@ -17,105 +17,37 @@
 */
 package org.apache.kylin.source.kafka;
 
-import java.util.List;
-import java.util.Locale;
-
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.spark.ISparkInput;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
 
 public class KafkaSparkInput extends KafkaInputBase implements ISparkInput {
 
-    private static final Logger logger = LoggerFactory.getLogger(KafkaSparkInput.class);
     private CubeSegment cubeSegment;
 
     @Override
-    public ISparkInput.ISparkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
         this.cubeSegment = (CubeSegment) flatDesc.getSegment();
-        return new BatchCubingInputSide(cubeSegment, flatDesc);
+        return new KafkaSparkBatchCubingInputSide(cubeSegment, flatDesc);
     }
 
     @Override
-    public ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
-        return new KafkaSparkBatchMergeInputSide((CubeSegment) seg);
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new KafkaSparkBatchMergeInputSide((CubeSegment)seg);
     }
 
-    public static class BatchCubingInputSide implements ISparkBatchCubingInputSide {
-
-        final JobEngineConfig conf;
-        final CubeSegment seg;
-        private CubeDesc cubeDesc;
-        private KylinConfig config;
-        protected IJoinedFlatTableDesc flatDesc;
-        protected String hiveTableDatabase;
-        final private List<String> intermediateTables = Lists.newArrayList();
-        final private List<String> intermediatePaths = Lists.newArrayList();
-        private String cubeName;
-
-        public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
-            this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
-            this.config = seg.getConfig();
-            this.flatDesc = flatDesc;
-            this.hiveTableDatabase = config.getHiveDatabaseForIntermediateTable();
-            this.seg = seg;
-            this.cubeDesc = seg.getCubeDesc();
-            this.cubeName = seg.getCubeInstance().getName();
-        }
-
-        @Override
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
-
-            boolean onlyOneTable = cubeDesc.getModel().getLookupTables().size() == 0;
-            final String baseLocation = getJobWorkingDir(jobFlow);
-            if (onlyOneTable) {
-                // directly use flat table location
-                final String intermediateFactTable = flatDesc.getTableName();
-                final String tableLocation = baseLocation + "/" + intermediateFactTable;
-                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation, seg));
-                intermediatePaths.add(tableLocation);
-            } else {
-                final String mockFactTableName = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX
-                        + cubeName.toLowerCase(Locale.ROOT) + "_" + seg.getUuid().replaceAll("-", "_") + "_fact";
-                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName, seg));
-                jobFlow.addTask(createFlatTable(hiveTableDatabase, mockFactTableName, baseLocation, cubeName, cubeDesc,
-                        flatDesc, intermediateTables, intermediatePaths));
-            }
-        }
-
-        protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
-            return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
-        }
-
-        @Override
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            jobFlow.addTask(createGCStep(intermediateTables, intermediatePaths));
+    public static class KafkaSparkBatchCubingInputSide extends BaseBatchCubingInputSide implements ISparkBatchCubingInputSide {
 
+        public KafkaSparkBatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
+            super(seg, flatDesc);
         }
     }
 
-    class KafkaSparkBatchMergeInputSide implements ISparkBatchMergeInputSide {
-
-        private CubeSegment cubeSegment;
+    public static class KafkaSparkBatchMergeInputSide extends BaseBatchMergeInputSide implements ISparkBatchMergeInputSide {
 
         KafkaSparkBatchMergeInputSide(CubeSegment cubeSegment) {
-            this.cubeSegment = cubeSegment;
-        }
-
-        @Override
-        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
-            jobFlow.addTask(createMergeOffsetStep(jobFlow.getId(), cubeSegment));
+            super(cubeSegment);
         }
     }
-
 }