You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/11 08:19:03 UTC

[kylin] 01/02: KYLIN-3710 JDBC data source support Spark cubing

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 195640eb0499a449a02b412af7e247cdcde47cec
Author: chao long <wa...@qq.com>
AuthorDate: Mon Dec 10 13:04:59 2018 +0800

    KYLIN-3710 JDBC data source support Spark cubing
---
 .../java/org/apache/kylin/engine/mr/IInput.java    |  31 +--
 .../java/org/apache/kylin/engine/mr/IMRInput.java  |  24 +--
 .../java/org/apache/kylin/engine/mr/MRUtil.java    |   7 +-
 .../org/apache/kylin/engine/spark/ISparkInput.java |  24 +--
 .../org/apache/kylin/engine/spark/SparkUtil.java   |  10 +-
 .../apache/kylin/source/hive/HiveInputBase.java    |  90 ++++++++-
 .../org/apache/kylin/source/hive/HiveMRInput.java  | 103 ++--------
 .../apache/kylin/source/hive/HiveSparkInput.java   |  84 +-------
 ...JdbcHiveMRInput.java => JdbcHiveInputBase.java} |  49 ++---
 .../apache/kylin/source/jdbc/JdbcHiveMRInput.java  | 225 +++------------------
 .../kylin/source/jdbc/JdbcHiveSparkInput.java      |  48 +++++
 .../org/apache/kylin/source/jdbc/JdbcSource.java   |   3 +
 ...JdbcHiveMRInput.java => JdbcHiveInputBase.java} |  18 +-
 .../source/jdbc/extensible/JdbcHiveMRInput.java    | 116 +++--------
 .../source/jdbc/extensible/JdbcHiveSparkInput.java |  55 +++++
 .../kylin/source/jdbc/extensible/JdbcSource.java   |   3 +
 .../jdbc/extensible/JdbcHiveMRInputTest.java       |  12 +-
 .../apache/kylin/source/kafka/KafkaInputBase.java  |  77 ++++++-
 .../apache/kylin/source/kafka/KafkaMRInput.java    |  88 +-------
 .../apache/kylin/source/kafka/KafkaSparkInput.java |  86 +-------
 20 files changed, 413 insertions(+), 740 deletions(-)

diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
similarity index 66%
copy from engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
copy to engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
index 5459c70..758b081 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
@@ -6,44 +6,31 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
-package org.apache.kylin.engine.spark;
+package org.apache.kylin.engine.mr;
 
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 
-/**
- * Any ISource that wishes to serve as input of MapReduce build engine must adapt to this interface.
- */
-public interface ISparkInput {
+public interface IInput {
 
     /** Return a helper to participate in batch cubing job flow. */
-    public ISparkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
 
     /** Return a helper to participate in batch cubing merge job flow. */
-    public ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
-
-    /**
-     * Participate the batch cubing flow as the input side. Responsible for creating
-     * intermediate flat table (Phase 1) and clean up any leftover (Phase 4).
-     * 
-     * - Phase 1: Create Flat Table
-     * - Phase 2: Build Dictionary (with FlatTableInputFormat)
-     * - Phase 3: Build Cube (with FlatTableInputFormat)
-     * - Phase 4: Update Metadata & Cleanup
-     */
-    public interface ISparkBatchCubingInputSide {
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
 
+    public interface IBatchCubingInputSide {
         /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
 
@@ -51,7 +38,7 @@ public interface ISparkInput {
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
 
-    public interface ISparkBatchMergeInputSide {
+    public interface IBatchMergeInputSide {
 
         /** Add step that executes before merge dictionary and before merge cube. */
         public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index c259c4e..74153e0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -22,25 +22,16 @@ import java.util.Collection;
 
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.TableDesc;
 
 /**
  * Any ISource that wishes to serve as input of MapReduce build engine must adapt to this interface.
  */
-public interface IMRInput {
-
-    /** Return a helper to participate in batch cubing job flow. */
-    public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
+public interface IMRInput extends IInput {
 
     /** Return an InputFormat that reads from specified table. */
     public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid);
 
-    /** Return a helper to participate in batch cubing merge job flow. */
-    public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
-
     /**
      * Utility that configures mapper to read from a table.
      */
@@ -65,22 +56,13 @@ public interface IMRInput {
      * - Phase 3: Build Cube (with FlatTableInputFormat)
      * - Phase 4: Update Metadata & Cleanup
      */
-    public interface IMRBatchCubingInputSide {
+    public interface IMRBatchCubingInputSide extends IBatchCubingInputSide {
 
         /** Return an InputFormat that reads from the intermediate flat table */
         public IMRTableInputFormat getFlatTableInputFormat();
-
-        /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
-
-        /** Add step that does necessary clean up, like delete the intermediate flat table */
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
 
-    public interface IMRBatchMergeInputSide {
-
-        /** Add step that executes before merge dictionary and before merge cube. */
-        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+    public interface IMRBatchMergeInputSide extends IBatchMergeInputSide {
 
     }
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 3a0fb84..60d0445 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -26,6 +26,7 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.dict.lookup.LookupProviderFactory;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchMergeInputSide;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
@@ -39,7 +40,7 @@ public class MRUtil {
 
     public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
         IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
-        return SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc);
+        return (IMRBatchCubingInputSide)SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc);
     }
 
     public static IMRTableInputFormat getTableInputFormat(String tableName, String prj, String uuid) {
@@ -63,8 +64,8 @@ public class MRUtil {
         return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
     }
 
-    public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
-        return SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
+    public static IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+        return (IMRBatchMergeInputSide)SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
     }
 
     public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
index 5459c70..4af616c 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
@@ -18,20 +18,12 @@
 
 package org.apache.kylin.engine.spark;
 
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.engine.mr.IInput;
 
 /**
  * Any ISource that wishes to serve as input of MapReduce build engine must adapt to this interface.
  */
-public interface ISparkInput {
-
-    /** Return a helper to participate in batch cubing job flow. */
-    public ISparkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
-
-    /** Return a helper to participate in batch cubing merge job flow. */
-    public ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
+public interface ISparkInput extends IInput {
 
     /**
      * Participate the batch cubing flow as the input side. Responsible for creating
@@ -42,19 +34,11 @@ public interface ISparkInput {
      * - Phase 3: Build Cube (with FlatTableInputFormat)
      * - Phase 4: Update Metadata & Cleanup
      */
-    public interface ISparkBatchCubingInputSide {
+    public interface ISparkBatchCubingInputSide extends IBatchCubingInputSide {
 
-        /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
-
-        /** Add step that does necessary clean up, like delete the intermediate flat table */
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
 
-    public interface ISparkBatchMergeInputSide {
-
-        /** Add step that executes before merge dictionary and before merge cube. */
-        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+    public interface ISparkBatchMergeInputSide extends IBatchMergeInputSide {
 
     }
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index 82a1a9b..1c4086d 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -43,6 +43,8 @@ import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.kylin.engine.spark.ISparkInput.ISparkBatchCubingInputSide;
+import org.apache.kylin.engine.spark.ISparkInput.ISparkBatchMergeInputSide;
 
 import com.google.common.collect.Lists;
 import org.apache.spark.api.java.function.Function;
@@ -52,9 +54,9 @@ import org.apache.spark.sql.SparkSession;
 
 public class SparkUtil {
 
-    public static ISparkInput.ISparkBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+    public static ISparkBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
         IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
-        return SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchCubingInputSide(flatDesc);
+        return (ISparkBatchCubingInputSide)SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchCubingInputSide(flatDesc);
     }
 
     public static ISparkOutput.ISparkBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
@@ -65,8 +67,8 @@ public class SparkUtil {
         return StorageFactory.createEngineAdapter(seg, ISparkOutput.class).getBatchMergeOutputSide(seg);
     }
 
-    public static ISparkInput.ISparkBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
-        return SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchMergeInputSide(seg);
+    public static ISparkBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+        return (ISparkBatchMergeInputSide)SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchMergeInputSide(seg);
     }
 
     public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index c55015b..2f25e50 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -19,16 +19,22 @@
 package org.apache.kylin.source.hive;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.HiveCmdBuilder;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.IInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
@@ -48,9 +54,82 @@ import com.google.common.collect.Sets;
 
 public class HiveInputBase {
 
-    @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(HiveInputBase.class);
 
+    public static class BaseBatchCubingInputSide implements IInput.IBatchCubingInputSide {
+
+        final protected IJoinedFlatTableDesc flatDesc;
+        final protected String flatTableDatabase;
+        final protected String hdfsWorkingDir;
+
+        List<String> hiveViewIntermediateTables = Lists.newArrayList();
+
+        public BaseBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            this.flatDesc = flatDesc;
+            this.flatTableDatabase = config.getHiveDatabaseForIntermediateTable();
+            this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
+        }
+
+        @Override
+        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
+            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+            CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+            final KylinConfig cubeConfig = cubeInstance.getConfig();
+
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+
+            // create flat table first
+            addStepPhase1_DoCreateFlatTable(jobFlow);
+
+            // then count and redistribute
+            if (cubeConfig.isHiveRedistributeEnabled()) {
+                jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc,
+                        cubeInstance.getDescriptor()));
+            }
+
+            // special for hive
+            addStepPhase1_DoMaterializeLookupTable(jobFlow);
+        }
+
+        protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
+            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
+
+            jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
+        }
+
+        protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
+
+            AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir,
+                    flatDesc, hiveViewIntermediateTables, jobFlow.getId());
+            if (task != null) {
+                jobFlow.addTask(task);
+            }
+        }
+
+        @Override
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
+
+            org.apache.kylin.source.hive.GarbageCollectionStep step = new org.apache.kylin.source.hive.GarbageCollectionStep();
+            step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
+            step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
+            step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));
+            step.setHiveViewIntermediateTableIdentities(StringUtil.join(hiveViewIntermediateTables, ","));
+            jobFlow.addTask(step);
+        }
+
+        protected String getIntermediateTableIdentity() {
+            return flatTableDatabase + "." + flatDesc.getTableName();
+        }
+    }
+
+    // ===== static methods ======
+
     protected static String getTableNameForHCat(TableDesc table, String uuid) {
         String tableName = (table.isView()) ? table.getMaterializedName(uuid) : table.getName();
         String database = (table.isView()) ? KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable()
@@ -58,15 +137,6 @@ public class HiveInputBase {
         return String.format(Locale.ROOT, "%s.%s", database, tableName).toUpperCase(Locale.ROOT);
     }
 
-    protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow, String hdfsWorkingDir,
-            IJoinedFlatTableDesc flatTableDesc, String flatTableDatabase) {
-        final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
-        final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-        final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
-        jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatTableDesc));
-    }
-
     protected static AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir,
             String cubeName, IJoinedFlatTableDesc flatDesc) {
         //from hive to hive
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index d6b85ed..df20b2c 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -28,42 +28,23 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.hive.hcatalog.mapreduce.HCatSplit;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
 
 public class HiveMRInput extends HiveInputBase implements IMRInput {
 
-    private static final Logger logger = LoggerFactory.getLogger(HiveMRInput.class);
-
     @Override
-    public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        return new BatchCubingInputSide(flatDesc);
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+        return new HiveMRBatchCubingInputSide(flatDesc);
     }
 
     @Override
-    public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
-        return new HiveTableInputFormat(getTableNameForHCat(table, uuid));
-    }
-
-    @Override
-    public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
         return new IMRBatchMergeInputSide() {
             @Override
             public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
@@ -72,6 +53,11 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
         };
     }
 
+    @Override
+    public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
+        return new HiveTableInputFormat(getTableNameForHCat(table, uuid));
+    }
+
     public static class HiveTableInputFormat implements IMRTableInputFormat {
         final String dbName;
         final String tableName;
@@ -111,80 +97,15 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
         }
     }
 
-    public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
-
-        final protected IJoinedFlatTableDesc flatDesc;
-        final protected String flatTableDatabase;
-        final protected String hdfsWorkingDir;
-
-        List<String> hiveViewIntermediateTables = Lists.newArrayList();
-
-        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-            this.flatDesc = flatDesc;
-            this.flatTableDatabase = config.getHiveDatabaseForIntermediateTable();
-            this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
-        }
-
-        @Override
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
-            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
-            final KylinConfig cubeConfig = cubeInstance.getConfig();
-
-            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-
-            // create flat table first
-            addStepPhase1_DoCreateFlatTable(jobFlow);
-
-            // then count and redistribute
-            if (cubeConfig.isHiveRedistributeEnabled()) {
-                jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc,
-                        cubeInstance.getDescriptor()));
-            }
+    public static class HiveMRBatchCubingInputSide extends BaseBatchCubingInputSide implements IMRBatchCubingInputSide {
 
-            // special for hive
-            addStepPhase1_DoMaterializeLookupTable(jobFlow);
-        }
-
-        protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
-            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
-            jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
-        }
-
-        protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
-            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
-            AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir,
-                    flatDesc, hiveViewIntermediateTables, jobFlow.getId());
-            if (task != null) {
-                jobFlow.addTask(task);
-            }
-        }
-
-        @Override
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
-            org.apache.kylin.source.hive.GarbageCollectionStep step = new org.apache.kylin.source.hive.GarbageCollectionStep();
-            step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
-            step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
-            step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));
-            step.setHiveViewIntermediateTableIdentities(StringUtil.join(hiveViewIntermediateTables, ","));
-            jobFlow.addTask(step);
+        public HiveMRBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            super(flatDesc);
         }
 
         @Override
         public IMRTableInputFormat getFlatTableInputFormat() {
-            return new HiveTableInputFormat(getIntermediateTableIdentity());
-        }
-
-        private String getIntermediateTableIdentity() {
-            return flatTableDatabase + "." + flatDesc.getTableName();
+            return new HiveMRInput.HiveTableInputFormat(getIntermediateTableIdentity());
         }
     }
 
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
index d710db7..0660a66 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
@@ -18,39 +18,26 @@
 
 package org.apache.kylin.source.hive;
 
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.spark.ISparkInput;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-
 public class HiveSparkInput extends HiveInputBase implements ISparkInput {
 
     @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(HiveSparkInput.class);
 
     @Override
-    public ISparkInput.ISparkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        return new BatchCubingInputSide(flatDesc);
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+        return new SparkBatchCubingInputSide(flatDesc);
     }
 
     @Override
-    public ISparkInput.ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
-        return new ISparkInput.ISparkBatchMergeInputSide() {
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new ISparkBatchMergeInputSide() {
             @Override
             public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
                 // doing nothing
@@ -58,67 +45,10 @@ public class HiveSparkInput extends HiveInputBase implements ISparkInput {
         };
     }
 
-    public class BatchCubingInputSide implements ISparkBatchCubingInputSide {
-
-        final protected IJoinedFlatTableDesc flatDesc;
-        final protected String flatTableDatabase;
-        final protected String hdfsWorkingDir;
-
-        List<String> hiveViewIntermediateTables = Lists.newArrayList();
-
-        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-            this.flatDesc = flatDesc;
-            this.flatTableDatabase = config.getHiveDatabaseForIntermediateTable();
-            this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
-        }
-
-        @Override
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
-            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
-            final KylinConfig cubeConfig = cubeInstance.getConfig();
-            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-
-            // create flat table first
-            addStepPhase1_DoCreateFlatTable(jobFlow, hdfsWorkingDir, flatDesc, flatTableDatabase);
-
-            // then count and redistribute
-            if (cubeConfig.isHiveRedistributeEnabled()) {
-                jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc,
-                        cubeInstance.getDescriptor()));
-            }
-
-            // special for hive
-            addStepPhase1_DoMaterializeLookupTable(jobFlow);
-        }
-
-        protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
-            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
-            AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir,
-                    flatDesc, hiveViewIntermediateTables, jobFlow.getId());
-            if (task != null) {
-                jobFlow.addTask(task);
-            }
-        }
-
-        @Override
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
+    public static class SparkBatchCubingInputSide extends BaseBatchCubingInputSide implements ISparkBatchCubingInputSide {
 
-            GarbageCollectionStep step = new GarbageCollectionStep();
-            step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
-            step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
-            step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));
-            step.setHiveViewIntermediateTableIdentities(StringUtil.join(hiveViewIntermediateTables, ","));
-            jobFlow.addTask(step);
-        }
-
-        private String getIntermediateTableIdentity() {
-            return flatTableDatabase + "." + flatDesc.getTableName();
+        public SparkBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            super(flatDesc);
         }
     }
-
 }
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java
similarity index 93%
copy from source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
copy to source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java
index 3460dd2..3769473 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java
@@ -6,21 +6,18 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 package org.apache.kylin.source.jdbc;
 
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
+import com.google.common.collect.Maps;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.SourceConfigurationUtil;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
@@ -34,32 +31,28 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableExtDesc;
-import org.apache.kylin.metadata.model.TableExtDesc.ColumnStats;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.hive.HiveMRInput;
+import org.apache.kylin.source.hive.HiveInputBase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
-
-public class JdbcHiveMRInput extends HiveMRInput {
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
 
-    private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class);
+public class JdbcHiveInputBase extends HiveInputBase {
+    private static final Logger logger = LoggerFactory.getLogger(JdbcHiveInputBase.class);
     private static final String MR_OVERRIDE_QUEUE_KEY = "mapreduce.job.queuename";
     private static final String DEFAULT_QUEUE = "default";
 
-    public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        return new BatchCubingInputSide(flatDesc);
-    }
-
-    public static class BatchCubingInputSide extends HiveMRInput.BatchCubingInputSide {
+    public static class JdbcBaseBatchCubingInputSide extends BaseBatchCubingInputSide {
 
-        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+        public JdbcBaseBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
             super(flatDesc);
         }
 
-        private KylinConfig getConfig() {
+        protected KylinConfig getConfig() {
             return flatDesc.getDataModel().getConfig();
         }
 
@@ -73,6 +66,11 @@ public class JdbcHiveMRInput extends HiveMRInput {
             jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, jobWorkingDir));
         }
 
+        @Override
+        protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
+            // skip
+        }
+
         private AbstractExecutable createFlatHiveTableFromFiles(String hiveInitStatements, String jobWorkingDir) {
             final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
             String filedDelimiter = getConfig().getJdbcSourceFieldDelimiter();
@@ -112,7 +110,7 @@ public class JdbcHiveMRInput extends HiveMRInput {
             long maxCardinality = 0;
             for (TableRef tableRef : flatDesc.getDataModel().getAllTables()) {
                 TableExtDesc tableExtDesc = tblManager.getTableExt(tableRef.getTableDesc());
-                List<ColumnStats> columnStatses = tableExtDesc.getColumnStats();
+                List<TableExtDesc.ColumnStats> columnStatses = tableExtDesc.getColumnStats();
                 if (!columnStatses.isEmpty()) {
                     for (TblColRef colRef : tableRef.getColumns()) {
                         long cardinality = columnStatses.get(colRef.getColumnDesc().getZeroBasedIndex())
@@ -175,14 +173,14 @@ public class JdbcHiveMRInput extends HiveMRInput {
             String filedDelimiter = config.getJdbcSourceFieldDelimiter();
             int mapperNum = config.getSqoopMapperNum();
 
-            String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM \"%s\".%s as %s", splitColumn,
+            String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM %s.%s as %s", splitColumn,
                     splitColumn, splitDatabase, splitTable, splitTableAlias);
             if (partitionDesc.isPartitioned()) {
                 SegmentRange segRange = flatDesc.getSegRange();
                 if (segRange != null && !segRange.isInfinite()) {
                     if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
                             && (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
-                                    .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
+                            .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
                         String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
                                 partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
                                         flatDesc.getSegment(), segRange),
@@ -210,11 +208,6 @@ public class JdbcHiveMRInput extends HiveMRInput {
             return step;
         }
 
-        @Override
-        protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
-            // skip
-        }
-
         protected String generateSqoopConfigArgString() {
             KylinConfig kylinConfig = getConfig();
             Map<String, String> config = Maps.newHashMap();
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index 3460dd2..19f354c 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@ -17,222 +17,45 @@
 */
 package org.apache.kylin.source.jdbc;
 
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.SourceConfigurationUtil;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.util.FlatTableSqlQuoteUtils;
-import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.SegmentRange;
-import org.apache.kylin.metadata.model.TableExtDesc;
-import org.apache.kylin.metadata.model.TableExtDesc.ColumnStats;
-import org.apache.kylin.metadata.model.TableRef;
-import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.hive.HiveMRInput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-
-public class JdbcHiveMRInput extends HiveMRInput {
 
-    private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class);
-    private static final String MR_OVERRIDE_QUEUE_KEY = "mapreduce.job.queuename";
-    private static final String DEFAULT_QUEUE = "default";
+public class JdbcHiveMRInput extends JdbcHiveInputBase implements IMRInput {
 
-    public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        return new BatchCubingInputSide(flatDesc);
+    @Override
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+        return new JdbcMRBatchCubingInputSide(flatDesc);
     }
 
-    public static class BatchCubingInputSide extends HiveMRInput.BatchCubingInputSide {
-
-        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-            super(flatDesc);
-        }
-
-        private KylinConfig getConfig() {
-            return flatDesc.getDataModel().getConfig();
-        }
-
-        @Override
-        protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
-            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
-            jobFlow.addTask(createSqoopToFlatHiveStep(jobWorkingDir, cubeName));
-            jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, jobWorkingDir));
-        }
-
-        private AbstractExecutable createFlatHiveTableFromFiles(String hiveInitStatements, String jobWorkingDir) {
-            final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
-            String filedDelimiter = getConfig().getJdbcSourceFieldDelimiter();
-            // Sqoop does not support exporting SEQUENSEFILE to Hive now SQOOP-869
-            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir,
-                    "TEXTFILE", filedDelimiter);
-
-            HiveCmdStep step = new HiveCmdStep();
-            step.setCmd(hiveInitStatements + dropTableHql + createTableHql);
-            step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
-            return step;
-        }
-
-        /**
-         * Choose a better split-by column for sqoop. The strategy is:
-         * 1. Prefer ClusteredBy column
-         * 2. Prefer DistributedBy column
-         * 3. Prefer Partition date column
-         * 4. Prefer Higher cardinality column
-         * 5. Prefer numeric column
-         * 6. Pick a column at first glance
-         * @return A column reference <code>TblColRef</code>for sqoop split-by
-         */
-        protected TblColRef determineSplitColumn() {
-            if (null != flatDesc.getClusterBy()) {
-                return flatDesc.getClusterBy();
-            }
-            if (null != flatDesc.getDistributedBy()) {
-                return flatDesc.getDistributedBy();
-            }
-            PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc();
-            if (partitionDesc.isPartitioned()) {
-                return partitionDesc.getPartitionDateColumnRef();
-            }
-            TblColRef splitColumn = null;
-            TableMetadataManager tblManager = TableMetadataManager.getInstance(getConfig());
-            long maxCardinality = 0;
-            for (TableRef tableRef : flatDesc.getDataModel().getAllTables()) {
-                TableExtDesc tableExtDesc = tblManager.getTableExt(tableRef.getTableDesc());
-                List<ColumnStats> columnStatses = tableExtDesc.getColumnStats();
-                if (!columnStatses.isEmpty()) {
-                    for (TblColRef colRef : tableRef.getColumns()) {
-                        long cardinality = columnStatses.get(colRef.getColumnDesc().getZeroBasedIndex())
-                                .getCardinality();
-                        splitColumn = cardinality > maxCardinality ? colRef : splitColumn;
-                    }
-                }
-            }
-            if (null == splitColumn) {
-                for (TblColRef colRef : flatDesc.getAllColumns()) {
-                    if (colRef.getType().isIntegerFamily()) {
-                        return colRef;
-                    }
-                }
-                splitColumn = flatDesc.getAllColumns().get(0);
-            }
-
-            return splitColumn;
-        }
-
-        private String getSqoopJobQueueName(KylinConfig config) {
-            Map<String, String> mrConfigOverride = config.getMRConfigOverride();
-            if (mrConfigOverride.containsKey(MR_OVERRIDE_QUEUE_KEY)) {
-                return mrConfigOverride.get(MR_OVERRIDE_QUEUE_KEY);
+    @Override
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new IMRBatchMergeInputSide() {
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+                // doing nothing
             }
-            return DEFAULT_QUEUE;
-        }
-
-        protected AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, String cubeName) {
-            KylinConfig config = getConfig();
-            PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc();
-            String partCol = null;
-
-            if (partitionDesc.isPartitioned()) {
-                partCol = partitionDesc.getPartitionDateColumn();//tablename.colname
-            }
-
-            String splitTable;
-            String splitTableAlias;
-            String splitColumn;
-            String splitDatabase;
-            TblColRef splitColRef = determineSplitColumn();
-            splitTable = splitColRef.getTableRef().getTableName();
-            splitTableAlias = splitColRef.getTableAlias();
-            splitColumn = JoinedFlatTable.getQuotedColExpressionInSourceDB(flatDesc, splitColRef);
-            splitDatabase = splitColRef.getColumnDesc().getTable().getDatabase();
-
-            //using sqoop to extract data from jdbc source and dump them to hive
-            String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { partCol });
-            selectSql = escapeQuotationInSql(selectSql);
-
-
-
-            String hiveTable = flatDesc.getTableName();
-            String connectionUrl = config.getJdbcSourceConnectionUrl();
-            String driverClass = config.getJdbcSourceDriver();
-            String jdbcUser = config.getJdbcSourceUser();
-            String jdbcPass = config.getJdbcSourcePass();
-            String sqoopHome = config.getSqoopHome();
-            String filedDelimiter = config.getJdbcSourceFieldDelimiter();
-            int mapperNum = config.getSqoopMapperNum();
+        };
+    }
 
-            String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM \"%s\".%s as %s", splitColumn,
-                    splitColumn, splitDatabase, splitTable, splitTableAlias);
-            if (partitionDesc.isPartitioned()) {
-                SegmentRange segRange = flatDesc.getSegRange();
-                if (segRange != null && !segRange.isInfinite()) {
-                    if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
-                            && (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
-                                    .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
-                        String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
-                                partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
-                                        flatDesc.getSegment(), segRange),
-                                "`");
-                        bquery += " WHERE " + quotedPartCond;
-                    }
-                }
-            }
-            bquery = escapeQuotationInSql(bquery);
+    @Override
+    public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
+        return new HiveMRInput.HiveTableInputFormat(getTableNameForHCat(table, uuid));
+    }
 
-            // escape ` in cmd
-            splitColumn = escapeQuotationInSql(splitColumn);
+    public static class JdbcMRBatchCubingInputSide extends JdbcBaseBatchCubingInputSide implements IMRBatchCubingInputSide {
 
-            String cmd = String.format(Locale.ROOT,
-                    "%s/bin/sqoop import" + generateSqoopConfigArgString()
-                            + "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
-                            + "--target-dir %s/%s --split-by %s --boundary-query \"%s\" --null-string '' "
-                            + "--fields-terminated-by '%s' --num-mappers %d",
-                    sqoopHome, connectionUrl, driverClass, jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable,
-                    splitColumn, bquery, filedDelimiter, mapperNum);
-            logger.debug(String.format(Locale.ROOT, "sqoop cmd:%s", cmd));
-            CmdStep step = new CmdStep();
-            step.setCmd(cmd);
-            step.setName(ExecutableConstants.STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE);
-            return step;
+        public JdbcMRBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            super(flatDesc);
         }
 
         @Override
-        protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
-            // skip
-        }
-
-        protected String generateSqoopConfigArgString() {
-            KylinConfig kylinConfig = getConfig();
-            Map<String, String> config = Maps.newHashMap();
-            config.put("mapreduce.job.queuename", getSqoopJobQueueName(kylinConfig)); // override job queue from mapreduce config
-            config.putAll(SourceConfigurationUtil.loadSqoopConfiguration());
-            config.putAll(kylinConfig.getSqoopConfigOverride());
-
-            StringBuilder args = new StringBuilder(" -Dorg.apache.sqoop.splitter.allow_text_splitter=true ");
-            for (Map.Entry<String, String> entry : config.entrySet()) {
-                args.append(" -D" + entry.getKey() + "=" + entry.getValue() + " ");
-            }
-            return args.toString();
+        public IMRTableInputFormat getFlatTableInputFormat() {
+            return new HiveMRInput.HiveTableInputFormat(getIntermediateTableIdentity());
         }
     }
 
-    protected static String escapeQuotationInSql(String sqlExpr) {
-        sqlExpr = sqlExpr.replaceAll("\"", "\\\\\"");
-        sqlExpr = sqlExpr.replaceAll("`", "\\\\`");
-        return sqlExpr;
-    }
 }
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveSparkInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveSparkInput.java
new file mode 100644
index 0000000..8a8471a
--- /dev/null
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveSparkInput.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc;
+
+import org.apache.kylin.engine.spark.ISparkInput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+
+public class JdbcHiveSparkInput extends JdbcHiveInputBase implements ISparkInput {
+
+    @Override
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+        return new JdbcSparkBatchCubingInputSide(flatDesc);
+    }
+
+    @Override
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new ISparkBatchMergeInputSide() {
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+                // doing nothing
+            }
+        };
+    }
+
+    public static class JdbcSparkBatchCubingInputSide extends JdbcBaseBatchCubingInputSide implements ISparkBatchCubingInputSide {
+
+        public JdbcSparkBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            super(flatDesc);
+        }
+    }
+}
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
index 3bf7498..1bda6c2 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.spark.ISparkInput;
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.IReadableTable;
@@ -44,6 +45,8 @@ public class JdbcSource implements ISource {
     public <I> I adaptToBuildEngine(Class<I> engineInterface) {
         if (engineInterface == IMRInput.class) {
             return (I) new JdbcHiveMRInput();
+        } else if (engineInterface == ISparkInput.class) {
+            return (I) new JdbcHiveSparkInput();
         } else {
             throw new RuntimeException("Cannot adapt to " + engineInterface);
         }
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
similarity index 90%
copy from source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
copy to source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
index 2e57a44..10eb31e 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.kylin.source.jdbc.extensible;
 
 import org.apache.hadoop.util.StringUtils;
@@ -34,23 +35,14 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Locale;
 
-public class JdbcHiveMRInput extends org.apache.kylin.source.jdbc.JdbcHiveMRInput {
-    private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class);
-
-    private final JdbcConnector dataSource;
+public class JdbcHiveInputBase extends org.apache.kylin.source.jdbc.JdbcHiveInputBase {
 
-    JdbcHiveMRInput(JdbcConnector dataSource) {
-        this.dataSource = dataSource;
-    }
-
-    public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        return new BatchCubingInputSide(flatDesc, dataSource);
-    }
+    private static final Logger logger = LoggerFactory.getLogger(JdbcHiveInputBase.class);
 
-    public static class BatchCubingInputSide extends org.apache.kylin.source.jdbc.JdbcHiveMRInput.BatchCubingInputSide {
+    public static class JDBCBaseBatchCubingInputSide extends JdbcBaseBatchCubingInputSide {
         private final JdbcConnector dataSource;
 
-        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc, JdbcConnector dataSource) {
+        public JDBCBaseBatchCubingInputSide(IJoinedFlatTableDesc flatDesc, JdbcConnector dataSource) {
             super(flatDesc);
             this.dataSource = dataSource;
         }
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
index 2e57a44..7df4ab5 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
@@ -17,25 +17,15 @@
  */
 package org.apache.kylin.source.jdbc.extensible;
 
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.util.FlatTableSqlQuoteUtils;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.SegmentRange;
-import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.sdk.datasource.framework.JdbcConnector;
-import org.apache.kylin.source.jdbc.sqoop.SqoopCmdStep;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.kylin.source.hive.HiveMRInput;
 
-import java.util.Locale;
-
-public class JdbcHiveMRInput extends org.apache.kylin.source.jdbc.JdbcHiveMRInput {
-    private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class);
+public class JdbcHiveMRInput extends JdbcHiveInputBase implements IMRInput {
 
     private final JdbcConnector dataSource;
 
@@ -44,86 +34,34 @@ public class JdbcHiveMRInput extends org.apache.kylin.source.jdbc.JdbcHiveMRInpu
     }
 
     public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        return new BatchCubingInputSide(flatDesc, dataSource);
+        return new JdbcMRBatchCubingInputSide(flatDesc, dataSource);
     }
 
-    public static class BatchCubingInputSide extends org.apache.kylin.source.jdbc.JdbcHiveMRInput.BatchCubingInputSide {
-        private final JdbcConnector dataSource;
-
-        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc, JdbcConnector dataSource) {
-            super(flatDesc);
-            this.dataSource = dataSource;
-        }
-
-        protected JdbcConnector getDataSource() {
-            return dataSource;
-        }
-
-        @Override
-        protected AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, String cubeName) {
-            KylinConfig config = flatDesc.getDataModel().getConfig();
-            PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc();
-            String partCol = null;
-
-            if (partitionDesc.isPartitioned()) {
-                partCol = partitionDesc.getPartitionDateColumn(); //tablename.colname
+    @Override
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new IMRBatchMergeInputSide() {
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+                // doing nothing
             }
+        };
+    }
 
-            String splitTable;
-            String splitTableAlias;
-            String splitColumn;
-            String splitDatabase;
-            TblColRef splitColRef = determineSplitColumn();
-            splitTable = splitColRef.getTableRef().getTableName();
-            splitTable = splitColRef.getTableRef().getTableDesc().getName();
-            splitTableAlias = splitColRef.getTableAlias();
-            //to solve case sensitive if necessary
-            splitColumn = JoinedFlatTable.getQuotedColExpressionInSourceDB(flatDesc, splitColRef);
-            splitDatabase = splitColRef.getColumnDesc().getTable().getDatabase().toLowerCase(Locale.ROOT);
-
-            //using sqoop to extract data from jdbc source and dump them to hive
-            String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { partCol });
-            selectSql = escapeQuotationInSql(dataSource.convertSql(selectSql));
-
-            String hiveTable = flatDesc.getTableName();
-            String sqoopHome = config.getSqoopHome();
-            String filedDelimiter = config.getJdbcSourceFieldDelimiter();
-            int mapperNum = config.getSqoopMapperNum();
-
-            String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM `%s`.%s as `%s`", splitColumn, splitColumn,
-                    splitDatabase, splitTable, splitTableAlias);
-            bquery = dataSource.convertSql(bquery);
-            if (partitionDesc.isPartitioned()) {
-                SegmentRange segRange = flatDesc.getSegRange();
-                if (segRange != null && !segRange.isInfinite()) {
-                    if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
-                            && (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
-                            .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
-                        String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
-                                partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
-                                        flatDesc.getSegment(), segRange),
-                                "`");
-                        bquery += " WHERE " + quotedPartCond;
-                    }
-                }
-            }
-            bquery = escapeQuotationInSql(bquery);
+    @Override
+    public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
+        return new HiveMRInput.HiveTableInputFormat(getTableNameForHCat(table, uuid));
+    }
 
-            splitColumn = escapeQuotationInSql(dataSource.convertColumn(splitColumn, FlatTableSqlQuoteUtils.QUOTE));
+    public static class JdbcMRBatchCubingInputSide extends JDBCBaseBatchCubingInputSide implements IMRInput.IMRBatchCubingInputSide {
 
-            String cmd = StringUtils.format(
-                    "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
-                            + "--target-dir %s/%s --split-by %s --boundary-query \"%s\" --null-string '' "
-                            + "--fields-terminated-by '%s' --num-mappers %d",
-                    dataSource.getJdbcUrl(), dataSource.getJdbcDriver(), dataSource.getJdbcUser(),
-                    dataSource.getJdbcPassword(), selectSql, jobWorkingDir, hiveTable, splitColumn, bquery,
-                    filedDelimiter, mapperNum);
-            logger.debug("sqoop cmd: {}", cmd);
+        public JdbcMRBatchCubingInputSide(IJoinedFlatTableDesc flatDesc, JdbcConnector dataSource) {
+            super(flatDesc, dataSource);
+        }
 
-            SqoopCmdStep step = new SqoopCmdStep();
-            step.setCmd(cmd);
-            step.setName(ExecutableConstants.STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE);
-            return step;
+        @Override
+        public IMRInput.IMRTableInputFormat getFlatTableInputFormat() {
+            return new HiveMRInput.HiveTableInputFormat(getIntermediateTableIdentity());
         }
     }
+
 }
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveSparkInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveSparkInput.java
new file mode 100644
index 0000000..a5701ad
--- /dev/null
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveSparkInput.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc.extensible;
+
+import org.apache.kylin.engine.spark.ISparkInput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.sdk.datasource.framework.JdbcConnector;
+
+public class JdbcHiveSparkInput extends JdbcHiveInputBase implements ISparkInput {
+
+    private final JdbcConnector dataSource;
+
+    JdbcHiveSparkInput(JdbcConnector dataSource) {
+        this.dataSource = dataSource;
+    }
+
+    @Override
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+        return new JdbcSparkBatchCubingInputSide(flatDesc, dataSource);
+    }
+
+    @Override
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new ISparkBatchMergeInputSide() {
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+                // doing nothing
+            }
+        };
+    }
+
+    public static class JdbcSparkBatchCubingInputSide extends JDBCBaseBatchCubingInputSide implements ISparkBatchCubingInputSide {
+
+        public JdbcSparkBatchCubingInputSide(IJoinedFlatTableDesc flatDesc, JdbcConnector dataSource) {
+            super(flatDesc, dataSource);
+        }
+    }
+}
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcSource.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcSource.java
index 3e8f0fd..da055e1 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcSource.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcSource.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.spark.ISparkInput;
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.sdk.datasource.framework.JdbcConnector;
@@ -60,6 +61,8 @@ public class JdbcSource implements ISource {
     public <I> I adaptToBuildEngine(Class<I> engineInterface) {
         if (engineInterface == IMRInput.class) {
             return (I) new JdbcHiveMRInput(dataSource);
+        } else if (engineInterface == ISparkInput.class) {
+            return (I) new JdbcHiveSparkInput(dataSource);
         } else {
             throw new RuntimeException("Cannot adapt to " + engineInterface);
         }
diff --git a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
index 956f86c..20c37ef 100644
--- a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
+++ b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
@@ -60,7 +60,7 @@ public class JdbcHiveMRInputTest extends TestBase {
         CubeSegment seg = cubeManager.appendSegment(cubeManager.getCube(cubeDesc.getName()),
                 new SegmentRange.TSRange(System.currentTimeMillis() - 100L, System.currentTimeMillis() + 100L));
         CubeJoinedFlatTableDesc flatDesc = new CubeJoinedFlatTableDesc(seg);
-        JdbcHiveMRInput.BatchCubingInputSide inputSide = (JdbcHiveMRInput.BatchCubingInputSide) input
+        JdbcHiveMRInput.JdbcMRBatchCubingInputSide inputSide = (JdbcHiveMRInput.JdbcMRBatchCubingInputSide) input
                 .getBatchCubingInputSide(flatDesc);
 
         AbstractExecutable executable = new MockInputSide(flatDesc, inputSide).createSqoopToFlatHiveStep("/tmp",
@@ -86,7 +86,7 @@ public class JdbcHiveMRInputTest extends TestBase {
         CubeSegment seg = cubeManager.appendSegment(cubeManager.getCube(cubeDesc.getName()),
                 new SegmentRange.TSRange(0L, Long.MAX_VALUE));
         CubeJoinedFlatTableDesc flatDesc = new CubeJoinedFlatTableDesc(seg);
-        JdbcHiveMRInput.BatchCubingInputSide inputSide = (JdbcHiveMRInput.BatchCubingInputSide) input
+        JdbcHiveMRInput.JdbcMRBatchCubingInputSide inputSide = (JdbcHiveMRInput.JdbcMRBatchCubingInputSide) input
                 .getBatchCubingInputSide(flatDesc);
 
         AbstractExecutable executable = new MockInputSide(flatDesc, inputSide).createSqoopToFlatHiveStep("/tmp",
@@ -111,7 +111,7 @@ public class JdbcHiveMRInputTest extends TestBase {
         CubeSegment seg = cubeManager.appendSegment(cubeManager.getCube(cubeDesc.getName()),
                 new SegmentRange.TSRange(System.currentTimeMillis() - 100L, System.currentTimeMillis() + 100L));
         CubeJoinedFlatTableDesc flatDesc = new CubeJoinedFlatTableDesc(seg);
-        JdbcHiveMRInput.BatchCubingInputSide inputSide = (JdbcHiveMRInput.BatchCubingInputSide) input
+        JdbcHiveMRInput.JdbcMRBatchCubingInputSide inputSide = (JdbcHiveMRInput.JdbcMRBatchCubingInputSide) input
                 .getBatchCubingInputSide(flatDesc);
 
         AbstractExecutable executable = new MockInputSide(flatDesc, inputSide).createSqoopToFlatHiveStep("/tmp",
@@ -127,10 +127,10 @@ public class JdbcHiveMRInputTest extends TestBase {
         source.close();
     }
 
-    private static class MockInputSide extends JdbcHiveMRInput.BatchCubingInputSide {
-        JdbcHiveMRInput.BatchCubingInputSide input;
+    private static class MockInputSide extends JdbcHiveMRInput.JdbcMRBatchCubingInputSide {
+        JdbcHiveMRInput.JdbcMRBatchCubingInputSide input;
 
-        public MockInputSide(IJoinedFlatTableDesc flatDesc, JdbcHiveMRInput.BatchCubingInputSide input) {
+        public MockInputSide(IJoinedFlatTableDesc flatDesc, JdbcHiveMRInput.JdbcMRBatchCubingInputSide input) {
             super(flatDesc, input.getDataSource());
             this.input = input;
         }
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
index cb2e14c..7620ab3 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
@@ -20,18 +20,23 @@ package org.apache.kylin.source.kafka;
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Locale;
 import java.util.Set;
 
-import com.google.common.collect.Sets;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.IInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
@@ -45,8 +50,78 @@ import org.apache.kylin.source.hive.GarbageCollectionStep;
 import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
 import org.apache.kylin.source.kafka.job.MergeOffsetStep;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 public class KafkaInputBase {
 
+    public static class BaseBatchCubingInputSide implements IInput.IBatchCubingInputSide {
+
+        final JobEngineConfig conf;
+        final CubeSegment seg;
+        private CubeDesc cubeDesc;
+        private KylinConfig config;
+        protected IJoinedFlatTableDesc flatDesc;
+        protected String hiveTableDatabase;
+        final private List<String> intermediateTables = Lists.newArrayList();
+        final private List<String> intermediatePaths = Lists.newArrayList();
+        private String cubeName;
+
+        public BaseBatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
+            this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+            this.config = seg.getConfig();
+            this.flatDesc = flatDesc;
+            this.hiveTableDatabase = config.getHiveDatabaseForIntermediateTable();
+            this.seg = seg;
+            this.cubeDesc = seg.getCubeDesc();
+            this.cubeName = seg.getCubeInstance().getName();
+        }
+
+        @Override
+        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
+
+            boolean onlyOneTable = cubeDesc.getModel().getLookupTables().size() == 0;
+            final String baseLocation = getJobWorkingDir(jobFlow);
+            if (onlyOneTable) {
+                // directly use flat table location
+                final String intermediateFactTable = flatDesc.getTableName();
+                final String tableLocation = baseLocation + "/" + intermediateFactTable;
+                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation, seg));
+                intermediatePaths.add(tableLocation);
+            } else {
+                final String mockFactTableName = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX
+                        + cubeName.toLowerCase(Locale.ROOT) + "_" + seg.getUuid().replaceAll("-", "_") + "_fact";
+                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName, seg));
+                jobFlow.addTask(createFlatTable(hiveTableDatabase, mockFactTableName, baseLocation, cubeName, cubeDesc,
+                        flatDesc, intermediateTables, intermediatePaths));
+            }
+        }
+
+        protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
+            return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
+        }
+
+        @Override
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+            jobFlow.addTask(createGCStep(intermediateTables, intermediatePaths));
+
+        }
+    }
+
+    public static  class BaseBatchMergeInputSide implements IInput.IBatchMergeInputSide {
+
+        private CubeSegment cubeSegment;
+
+        BaseBatchMergeInputSide(CubeSegment cubeSegment) {
+            this.cubeSegment = cubeSegment;
+        }
+
+        @Override
+        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+            jobFlow.addTask(createMergeOffsetStep(jobFlow.getId(), cubeSegment));
+        }
+    }
+
     protected static AbstractExecutable createMergeOffsetStep(String jobId, CubeSegment cubeSegment) {
 
         final MergeOffsetStep result = new MergeOffsetStep();
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 1c94f9c..d709572 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -20,8 +20,6 @@ package org.apache.kylin.source.kafka;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -30,46 +28,36 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
 
 public class KafkaMRInput extends KafkaInputBase implements IMRInput {
 
-    private static final Logger logger = LoggerFactory.getLogger(KafkaMRInput.class);
     private CubeSegment cubeSegment;
 
     @Override
-    public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
         this.cubeSegment = (CubeSegment) flatDesc.getSegment();
-        return new BatchCubingInputSide(cubeSegment, flatDesc);
+        return new KafkaMRBatchCubingInputSide(cubeSegment, flatDesc);
     }
 
     @Override
-    public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
-
-        return new KafkaTableInputFormat(cubeSegment, null);
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new KafkaMRBatchMergeInputSide((CubeSegment)seg);
     }
 
     @Override
-    public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
-        return new KafkaMRBatchMergeInputSide((CubeSegment) seg);
+    public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
+        return new KafkaTableInputFormat(cubeSegment, null);
     }
 
     public static class KafkaTableInputFormat implements IMRTableInputFormat {
@@ -110,56 +98,10 @@ public class KafkaMRInput extends KafkaInputBase implements IMRInput {
         }
     }
 
-    public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
-
-        final JobEngineConfig conf;
-        final CubeSegment seg;
-        private CubeDesc cubeDesc;
-        private KylinConfig config;
-        protected IJoinedFlatTableDesc flatDesc;
-        protected String hiveTableDatabase;
-        private List<String> intermediateTables = Lists.newArrayList();
-        private List<String> intermediatePaths = Lists.newArrayList();
-        private String cubeName;
-
-        public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
-            this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
-            this.config = seg.getConfig();
-            this.flatDesc = flatDesc;
-            this.hiveTableDatabase = config.getHiveDatabaseForIntermediateTable();
-            this.seg = seg;
-            this.cubeDesc = seg.getCubeDesc();
-            this.cubeName = seg.getCubeInstance().getName();
-        }
-
-        @Override
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
-
-            boolean onlyOneTable = cubeDesc.getModel().getLookupTables().size() == 0;
-            final String baseLocation = getJobWorkingDir(jobFlow);
-            if (onlyOneTable) {
-                // directly use flat table location
-                final String intermediateFactTable = flatDesc.getTableName();
-                final String tableLocation = baseLocation + "/" + intermediateFactTable;
-                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation, seg));
-                intermediatePaths.add(tableLocation);
-            } else {
-                final String mockFactTableName = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX
-                        + cubeName.toLowerCase(Locale.ROOT) + "_" + seg.getUuid().replaceAll("-", "_") + "_fact";
-                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName, seg));
-                jobFlow.addTask(createFlatTable(hiveTableDatabase, mockFactTableName, baseLocation, cubeName, cubeDesc,
-                        flatDesc, intermediateTables, intermediatePaths));
-            }
-        }
-
-        protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
-            return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
-        }
-
-        @Override
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            jobFlow.addTask(createGCStep(intermediateTables, intermediatePaths));
+    public static class KafkaMRBatchCubingInputSide extends BaseBatchCubingInputSide implements IMRBatchCubingInputSide {
 
+        public KafkaMRBatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
+            super(seg, flatDesc);
         }
 
         @Override
@@ -168,18 +110,10 @@ public class KafkaMRInput extends KafkaInputBase implements IMRInput {
         }
     }
 
-    class KafkaMRBatchMergeInputSide implements IMRBatchMergeInputSide {
-
-        private CubeSegment cubeSegment;
+    public static class KafkaMRBatchMergeInputSide extends BaseBatchMergeInputSide implements IMRBatchMergeInputSide {
 
         KafkaMRBatchMergeInputSide(CubeSegment cubeSegment) {
-            this.cubeSegment = cubeSegment;
-        }
-
-        @Override
-        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
-            jobFlow.addTask(createMergeOffsetStep(jobFlow.getId(), cubeSegment));
+            super(cubeSegment);
         }
     }
-
 }
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
index 7db6c32..edbc002 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
@@ -17,105 +17,37 @@
 */
 package org.apache.kylin.source.kafka;
 
-import java.util.List;
-import java.util.Locale;
-
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.spark.ISparkInput;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
 
 public class KafkaSparkInput extends KafkaInputBase implements ISparkInput {
 
-    private static final Logger logger = LoggerFactory.getLogger(KafkaSparkInput.class);
     private CubeSegment cubeSegment;
 
     @Override
-    public ISparkInput.ISparkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
         this.cubeSegment = (CubeSegment) flatDesc.getSegment();
-        return new BatchCubingInputSide(cubeSegment, flatDesc);
+        return new KafkaSparkBatchCubingInputSide(cubeSegment, flatDesc);
     }
 
     @Override
-    public ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
-        return new KafkaSparkBatchMergeInputSide((CubeSegment) seg);
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new KafkaSparkBatchMergeInputSide((CubeSegment)seg);
     }
 
-    public static class BatchCubingInputSide implements ISparkBatchCubingInputSide {
-
-        final JobEngineConfig conf;
-        final CubeSegment seg;
-        private CubeDesc cubeDesc;
-        private KylinConfig config;
-        protected IJoinedFlatTableDesc flatDesc;
-        protected String hiveTableDatabase;
-        final private List<String> intermediateTables = Lists.newArrayList();
-        final private List<String> intermediatePaths = Lists.newArrayList();
-        private String cubeName;
-
-        public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
-            this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
-            this.config = seg.getConfig();
-            this.flatDesc = flatDesc;
-            this.hiveTableDatabase = config.getHiveDatabaseForIntermediateTable();
-            this.seg = seg;
-            this.cubeDesc = seg.getCubeDesc();
-            this.cubeName = seg.getCubeInstance().getName();
-        }
-
-        @Override
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
-
-            boolean onlyOneTable = cubeDesc.getModel().getLookupTables().size() == 0;
-            final String baseLocation = getJobWorkingDir(jobFlow);
-            if (onlyOneTable) {
-                // directly use flat table location
-                final String intermediateFactTable = flatDesc.getTableName();
-                final String tableLocation = baseLocation + "/" + intermediateFactTable;
-                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation, seg));
-                intermediatePaths.add(tableLocation);
-            } else {
-                final String mockFactTableName = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX
-                        + cubeName.toLowerCase(Locale.ROOT) + "_" + seg.getUuid().replaceAll("-", "_") + "_fact";
-                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName, seg));
-                jobFlow.addTask(createFlatTable(hiveTableDatabase, mockFactTableName, baseLocation, cubeName, cubeDesc,
-                        flatDesc, intermediateTables, intermediatePaths));
-            }
-        }
-
-        protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
-            return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
-        }
-
-        @Override
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            jobFlow.addTask(createGCStep(intermediateTables, intermediatePaths));
+    public static class KafkaSparkBatchCubingInputSide extends BaseBatchCubingInputSide implements ISparkBatchCubingInputSide {
 
+        public KafkaSparkBatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
+            super(seg, flatDesc);
         }
     }
 
-    class KafkaSparkBatchMergeInputSide implements ISparkBatchMergeInputSide {
-
-        private CubeSegment cubeSegment;
+    public static class KafkaSparkBatchMergeInputSide extends BaseBatchMergeInputSide implements ISparkBatchMergeInputSide {
 
         KafkaSparkBatchMergeInputSide(CubeSegment cubeSegment) {
-            this.cubeSegment = cubeSegment;
-        }
-
-        @Override
-        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
-            jobFlow.addTask(createMergeOffsetStep(jobFlow.getId(), cubeSegment));
+            super(cubeSegment);
         }
     }
-
 }