You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/06/01 01:12:32 UTC

[kylin] branch KYLIN-3378 created (now d37c236)

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a change to branch KYLIN-3378
in repository https://gitbox.apache.org/repos/asf/kylin.git.


      at d37c236  KYLIN-3378 Kafka join with hive

This branch includes the following new commits:

     new d37c236  KYLIN-3378 Kafka join with hive

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.

[kylin] 01/01: KYLIN-3378 Kafka join with hive

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch KYLIN-3378
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit d37c236ee9865846fed4909eec983f7644f6fd35
Author: GinaZhai <na...@kyligence.io>
AuthorDate: Fri May 25 18:25:48 2018 +0800

    KYLIN-3378 Kafka join with hive
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  39 ++++-
 .../org/apache/kylin/common/util/BasicTest.java    |   3 +-
 .../kylin/cube/model/CubeJoinedFlatTableDesc.java  |  24 ++-
 .../cube/model/CubeJoinedFlatTableEnrich.java      |  10 ++
 .../model/validation/rule/StreamingCubeRule.java   |  11 --
 .../java/org/apache/kylin/job/JoinedFlatTable.java |  44 ++---
 .../kylin/job/constant/ExecutableConstants.java    |   1 +
 .../kylin/metadata/model/IJoinedFlatTableDesc.java |   4 +
 .../org/apache/kylin/source/hive/HiveMRInput.java  |  68 +++++---
 source-kafka/pom.xml                               |   4 +
 .../apache/kylin/source/kafka/KafkaMRInput.java    | 194 ++++++++++++++-------
 .../org/apache/kylin/source/kafka/KafkaSource.java |   5 +-
 .../kylin/source/kafka/config/KafkaConfig.java     |  12 ++
 .../source/kafka/hadoop/KafkaFlatTableJob.java     |   7 +-
 .../source/kafka/hadoop/KafkaFlatTableMapper.java  |  24 ++-
 .../source/kafka/hadoop/KafkaInputFormat.java      |  18 +-
 .../apache/kylin/source/kafka/SpiltNumTest.java    | 163 +++++++++++++++++
 17 files changed, 490 insertions(+), 141 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 689d08f..cdb3755 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -225,7 +225,11 @@ abstract public class KylinConfigBase implements Serializable {
         return getOptional("kylin.env", "DEV");
     }
 
-    private String cachedHdfsWorkingDirectory;
+    static public String cachedHdfsWorkingDirectory;//////
+
+    public void setHdfsWorkingDirectory(String cachedHdfsWorkingDirectory){///////
+        this.cachedHdfsWorkingDirectory = cachedHdfsWorkingDirectory;
+    }
 
     public String getHdfsWorkingDirectory() {
         if (cachedHdfsWorkingDirectory != null)
@@ -260,6 +264,39 @@ abstract public class KylinConfigBase implements Serializable {
         return cachedHdfsWorkingDirectory;
     }
 
+    public String getHdfsWorkingDirectory(String cachedHdfsWorkingDirectory) {//
+        if (cachedHdfsWorkingDirectory != null)
+            return cachedHdfsWorkingDirectory;
+
+        String root = getOptional("kylin.env.hdfs-working-dir", "/kylin");
+
+        Path path = new Path(root);
+        if (!path.isAbsolute())
+            throw new IllegalArgumentException("kylin.env.hdfs-working-dir must be absolute, but got " + root);
+
+        // make sure path is qualified
+        try {
+            FileSystem fs = path.getFileSystem(HadoopUtil.getCurrentConfiguration());
+            path = fs.makeQualified(path);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        // append metadata-url prefix
+        root = new Path(path, StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).toString();
+
+        if (!root.endsWith("/"))
+            root += "/";
+
+        cachedHdfsWorkingDirectory = root;
+        if (cachedHdfsWorkingDirectory.startsWith("file:")) {
+            cachedHdfsWorkingDirectory = cachedHdfsWorkingDirectory.replace("file:", "file://");
+        } else if (cachedHdfsWorkingDirectory.startsWith("maprfs:")) {
+            cachedHdfsWorkingDirectory = cachedHdfsWorkingDirectory.replace("maprfs:", "maprfs://");
+        }
+        return cachedHdfsWorkingDirectory;
+    }
+
     public String getZookeeperBasePath() {
         return getOptional("kylin.env.zookeeper-base-path", "/kylin");
     }
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 1c1e389..6ae238b 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -231,7 +231,8 @@ public class BasicTest {
 
         String[] origin = new String[] {"ab,c", "cd|e"};
 
-        String delimiter = "\u001F"; // "\t";
+        // test with sequence file default delimiter
+        String delimiter = "\01"; //"\u001F"; "\t";
         String concated = StringUtils.join(Arrays.asList(origin), delimiter);
         String[] newValues = concated.split(delimiter);
 
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index d50a5af..70ad13e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -42,8 +42,8 @@ import com.google.common.collect.Maps;
 @SuppressWarnings("serial")
 public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializable {
 
-    protected final String tableName;
-    protected final CubeDesc cubeDesc;
+    protected String tableName;/////
+    protected final CubeDesc cubeDesc;///
     protected final CubeSegment cubeSegment;
     protected final boolean includingDerived;
 
@@ -135,6 +135,18 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
         }
     }
 
+    @Override
+    public List<TblColRef> getFactColumns() {
+        final List<TblColRef> factColumns = Lists.newArrayList();
+        for (TblColRef col : this.getAllColumns()) {
+            if (col.getTableRef().equals(getDataModel().getRootFactTable())) {
+                // only fetch the columns from fact table
+                factColumns.add(col);
+            }
+        }
+        return factColumns;
+    }
+
     // sanity check the input record (in bytes) matches what's expected
     public void sanityCheck(BytesSplitter bytesSplitter) {
         if (columnCount != bytesSplitter.getBufferSize()) {
@@ -171,6 +183,9 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
 
     @Override
     public SegmentRange getSegRange() {
+        if (cubeSegment.isOffsetCube()) {
+            return null;
+        }
         return cubeSegment.getSegRange();
     }
 
@@ -185,6 +200,11 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
     }
 
     @Override
+    public boolean useAlias() {
+        return true;
+    }
+
+    @Override
     public TblColRef getClusterBy() {
         return cubeDesc.getClusteredByColumn();
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
index 73da802..f09478e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
@@ -105,6 +105,11 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, Serializ
     }
 
     @Override
+    public List<TblColRef> getFactColumns() {
+        return flatDesc.getFactColumns();
+    }
+
+    @Override
     public DataModelDesc getDataModel() {
         return flatDesc.getDataModel();
     }
@@ -130,6 +135,11 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, Serializ
     }
 
     @Override
+    public boolean useAlias() {
+        return flatDesc.useAlias();
+    }
+
+    @Override
     public TblColRef getClusterBy() {
         return flatDesc.getClusterBy();
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
index 4438706..647f4c1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
@@ -24,7 +24,6 @@ import org.apache.kylin.cube.model.validation.IValidatorRule;
 import org.apache.kylin.cube.model.validation.ResultLevel;
 import org.apache.kylin.cube.model.validation.ValidateContext;
 import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.IEngineAware;
 import org.apache.kylin.metadata.model.ISourceAware;
 
 import org.apache.kylin.metadata.model.TblColRef;
@@ -49,16 +48,6 @@ public class StreamingCubeRule implements IValidatorRule<CubeDesc> {
             return;
         }
 
-        if (model.getLookupTables().size() > 0) {
-            context.addResult(ResultLevel.ERROR, "Streaming Cube doesn't support star-schema so far; only one fact table is allowed.");
-            return;
-        }
-
-        if (cube.getEngineType() == IEngineAware.ID_SPARK) {
-            context.addResult(ResultLevel.ERROR, "Spark engine doesn't support streaming source, select MapReduce engine instead.");
-            return;
-        }
-
         if (model.getPartitionDesc() == null || model.getPartitionDesc().getPartitionDateColumn() == null) {
             context.addResult(ResultLevel.ERROR, "Must define a partition column.");
             return;
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 528bcf0..0769dcf 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -44,6 +44,8 @@ import org.apache.kylin.metadata.model.TblColRef;
 
 public class JoinedFlatTable {
 
+    public static final String TEXTFILE = "TEXTFILE";
+
     public static String getTableDir(IJoinedFlatTableDesc flatDesc, String storageDfsDir) {
         return storageDfsDir + "/" + flatDesc.getTableName();
     }
@@ -61,13 +63,13 @@ public class JoinedFlatTable {
     }
 
     public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir,
-            String storageFormat) {
+                                                      String storageFormat) {
         String fieldDelimiter = flatDesc.getDataModel().getConfig().getFlatTableFieldDelimiter();
         return generateCreateTableStatement(flatDesc, storageDfsDir, storageFormat, fieldDelimiter);
     }
 
     public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir,
-            String storageFormat, String fieldDelimiter) {
+                                                      String storageFormat, String fieldDelimiter) {
         StringBuilder ddl = new StringBuilder();
 
         ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + flatDesc.getTableName() + "\n");
@@ -78,10 +80,10 @@ public class JoinedFlatTable {
             if (i > 0) {
                 ddl.append(",");
             }
-            ddl.append(colName(col) + " " + getHiveDataType(col.getDatatype()) + "\n");
+            ddl.append(colName(col, flatDesc.useAlias()) + " " + getHiveDataType(col.getDatatype()) + "\n");
         }
         ddl.append(")" + "\n");
-        if ("TEXTFILE".equals(storageFormat)) {
+        if (TEXTFILE.equals(storageFormat)) {
             ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\" + fieldDelimiter + "'\n");
         }
         ddl.append("STORED AS " + storageFormat + "\n");
@@ -96,6 +98,12 @@ public class JoinedFlatTable {
         return ddl.toString();
     }
 
+    public static String generateDropTableStatement1(IJoinedFlatTableDesc flatDesc) {
+        StringBuilder ddl = new StringBuilder();
+        ddl.append("DROP TABLE IF EXISTS " + flatDesc.getDataModel().getRootFactTableName() + ";").append("\n");
+        return ddl.toString();
+    }///
+
     public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc) {
         CubeSegment segment = ((CubeSegment) flatDesc.getSegment());
         KylinConfig kylinConfig;
@@ -120,11 +128,6 @@ public class JoinedFlatTable {
                 + ";\n";
     }
 
-    public static String generateInsertPartialDataStatement(IJoinedFlatTableDesc flatDesc) {
-        return "INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc)
-                + ";\n";
-    }
-
     public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc) {
         return generateSelectDataStatement(flatDesc, false, null);
     }
@@ -146,7 +149,7 @@ public class JoinedFlatTable {
             if (skipAsList.contains(colTotalName)) {
                 sql.append(col.getExpressionInSourceDB() + sep);
             } else {
-                sql.append(col.getExpressionInSourceDB() + " as " + colName(col) + sep);
+                sql.append(col.getExpressionInSourceDB() + " as " + colName(col, true) + sep);
             }
         }
         appendJoinStatement(flatDesc, sql, singleLine);
@@ -154,15 +157,6 @@ public class JoinedFlatTable {
         return sql.toString();
     }
 
-    public static String generateCountDataStatement(IJoinedFlatTableDesc flatDesc, final String outputDir) {
-        final StringBuilder sql = new StringBuilder();
-        final TableRef rootTbl = flatDesc.getDataModel().getRootFactTable();
-        sql.append("dfs -mkdir -p " + outputDir + ";\n");
-        sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + rootTbl.getTableIdentity()
-                + " " + rootTbl.getAlias() + "\n");
-        appendWhereStatement(flatDesc, sql);
-        return sql.toString();
-    }
 
     public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) {
         final String sep = singleLine ? " " : "\n";
@@ -175,7 +169,6 @@ public class JoinedFlatTable {
         for (JoinTableDesc lookupDesc : model.getJoinTables()) {
             JoinDesc join = lookupDesc.getJoin();
             if (join != null && join.getType().equals("") == false) {
-                String joinType = join.getType().toUpperCase();
                 TableRef dimTable = lookupDesc.getTableRef();
                 if (!dimTableCache.contains(dimTable)) {
                     TblColRef[] pk = join.getPrimaryKeyColumns();
@@ -183,6 +176,8 @@ public class JoinedFlatTable {
                     if (pk.length != fk.length) {
                         throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
                     }
+                    String joinType = join.getType().toUpperCase();
+
                     sql.append(joinType + " JOIN " + dimTable.getTableIdentity() + " as " + dimTable.getAlias() + sep);
                     sql.append("ON ");
                     for (int i = 0; i < pk.length; i++) {
@@ -201,7 +196,7 @@ public class JoinedFlatTable {
 
     private static void appendDistributeStatement(StringBuilder sql, TblColRef redistCol) {
         if (redistCol != null) {
-            sql.append(" DISTRIBUTE BY ").append(colName(redistCol)).append(";\n");
+            sql.append(" DISTRIBUTE BY ").append(colName(redistCol, true)).append(";\n");
         } else {
             sql.append(" DISTRIBUTE BY RAND()").append(";\n");
         }
@@ -243,8 +238,13 @@ public class JoinedFlatTable {
         sql.append(whereBuilder.toString());
     }
 
+
     private static String colName(TblColRef col) {
-        return col.getTableAlias() + "_" + col.getName();
+        return colName(col, true);
+    }
+
+    private static String colName(TblColRef col, boolean useAlias) {
+        return useAlias ? col.getTableAlias() + "_" + col.getName() : col.getName();
     }
 
     private static String getHiveDataType(String javaDataType) {
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index b9a3651..42f0dbf 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -34,6 +34,7 @@ public final class ExecutableConstants {
 
     public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
     public static final String STEP_NAME_BUILD_UHC_DICTIONARY = "Build UHC Dictionary";
+    public static final String STEP_NAME_CREATE_HIVE_TABLE = "Create Hive Table";
     public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
     public static final String STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE = "Sqoop To Flat Hive Table";
     public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables";
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index 0589829..8f86a52 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -30,6 +30,8 @@ public interface IJoinedFlatTableDesc {
 
     List<TblColRef> getAllColumns();
 
+    List<TblColRef> getFactColumns();
+
     int getColumnIndex(TblColRef colRef);
 
     SegmentRange getSegRange();
@@ -41,4 +43,6 @@ public interface IJoinedFlatTableDesc {
     // optionally present
     ISegment getSegment();
 
+    boolean useAlias();
+
 }
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index a96f4d5..0e791eb 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -33,6 +34,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.HiveCmdBuilder;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -284,8 +286,8 @@ public class HiveMRInput implements IMRInput {
 
             GarbageCollectionStep step = new GarbageCollectionStep();
             step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
-            step.setIntermediateTableIdentity(getIntermediateTableIdentity());
-            step.setExternalDataPath(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir));
+            step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
+            step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));
             step.setHiveViewIntermediateTableIdentities(hiveViewIntermediateTables);
             jobFlow.addTask(step);
         }
@@ -435,42 +437,58 @@ public class HiveMRInput implements IMRInput {
 
         private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException {
             StringBuffer output = new StringBuffer();
-            final String hiveTable = this.getIntermediateTableIdentity();
-            if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
-                final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
-                hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
-                hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  " + hiveTable + ";");
-                config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
-                output.append("Hive table " + hiveTable + " is dropped. \n");
-                rmdirOnHDFS(getExternalDataPath());
-                output.append(
-                        "Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n");
+            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+            final List<String> hiveTables = this.getIntermediateTables();
+            for (String hiveTable : hiveTables) {
+                if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
+                    hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
+                    hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  " + hiveTable + ";");
+
+                    output.append("Hive table " + hiveTable + " is dropped. \n");
+                }
             }
+            config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
+            rmdirOnHDFS(getExternalDataPaths());
+            output.append(
+                    "Path " + getExternalDataPaths() + " is deleted. \n");
+
             return output.toString();
         }
 
-        private void rmdirOnHDFS(String path) throws IOException {
-            Path externalDataPath = new Path(path);
-            FileSystem fs = HadoopUtil.getWorkingFileSystem();
-            if (fs.exists(externalDataPath)) {
-                fs.delete(externalDataPath, true);
+        private void rmdirOnHDFS(List<String> paths) throws IOException {
+            for (String path : paths) {
+                Path externalDataPath = new Path(path);
+                FileSystem fs = HadoopUtil.getWorkingFileSystem();
+                if (fs.exists(externalDataPath)) {
+                    fs.delete(externalDataPath, true);
+                }
             }
         }
 
-        public void setIntermediateTableIdentity(String tableIdentity) {
-            setParam("oldHiveTable", tableIdentity);
+        public void setIntermediateTables(List<String> tableIdentity) {
+            setParam("oldHiveTables", StringUtil.join(tableIdentity, ","));
         }
 
-        private String getIntermediateTableIdentity() {
-            return getParam("oldHiveTable");
+        private List<String> getIntermediateTables() {
+            List<String> intermediateTables = Lists.newArrayList();
+            String[] tables = StringUtil.splitAndTrim(getParam("oldHiveTables"), ",");
+            for (String t : tables) {
+                intermediateTables.add(t);
+            }
+            return intermediateTables;
         }
 
-        public void setExternalDataPath(String externalDataPath) {
-            setParam("externalDataPath", externalDataPath);
+        public void setExternalDataPaths(List<String> externalDataPaths) {
+            setParam("externalDataPaths", StringUtil.join(externalDataPaths, ","));
         }
 
-        private String getExternalDataPath() {
-            return getParam("externalDataPath");
+        private List<String> getExternalDataPaths() {
+            String[] paths = StringUtil.splitAndTrim(getParam("externalDataPaths"), ",");
+            List<String> result = Lists.newArrayList();
+            for (String s : paths) {
+                result.add(s);
+            }
+            return result;
         }
 
         public void setHiveViewIntermediateTableIdentities(String tableIdentities) {
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index 2ef4cdf..55df7f0 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -66,5 +66,9 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-source-hive</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 223e303..f37bf50 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 package org.apache.kylin.source.kafka;
 
 import java.io.IOException;
@@ -22,7 +22,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.hadoop.fs.FileSystem;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -30,8 +30,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
@@ -41,16 +41,17 @@ import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.hive.CreateFlatHiveTableStep;
+import org.apache.kylin.source.hive.HiveMRInput;
 import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
 import org.apache.kylin.source.kafka.job.MergeOffsetStep;
 import org.slf4j.Logger;
@@ -58,18 +59,19 @@ import org.slf4j.LoggerFactory;
 
 public class KafkaMRInput implements IMRInput {
 
-    CubeSegment cubeSegment;
+    private static final Logger logger = LoggerFactory.getLogger(KafkaMRInput.class);
+    private CubeSegment cubeSegment;
 
     @Override
     public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
         this.cubeSegment = (CubeSegment) flatDesc.getSegment();
-        return new BatchCubingInputSide(cubeSegment);
+        return new BatchCubingInputSide(cubeSegment, flatDesc);
     }
 
     @Override
     public IMRTableInputFormat getTableInputFormat(TableDesc table) {
 
-        return new KafkaTableInputFormat(cubeSegment, null, null, null);
+        return new KafkaTableInputFormat(cubeSegment, null);
     }
 
     @Override
@@ -80,12 +82,11 @@ public class KafkaMRInput implements IMRInput {
     public static class KafkaTableInputFormat implements IMRTableInputFormat {
         private final CubeSegment cubeSegment;
         private final JobEngineConfig conf;
-        private final String delimiter;
+        private String delimiter = "\01";
 
-        public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) {
+        public KafkaTableInputFormat(CubeSegment cubeSegment, JobEngineConfig conf) {
             this.cubeSegment = cubeSegment;
             this.conf = conf;
-            this.delimiter = cubeSegment.getConfig().getFlatTableFieldDelimiter();
         }
 
         @Override
@@ -114,30 +115,132 @@ public class KafkaMRInput implements IMRInput {
 
         final JobEngineConfig conf;
         final CubeSegment seg;
-        private String outputPath;
-
-        public BatchCubingInputSide(CubeSegment seg) {
+        private CubeDesc cubeDesc ;
+        private KylinConfig config;
+        protected IJoinedFlatTableDesc flatDesc;
+        protected String hiveTableDatabase;
+        private List<String> intermediateTables = Lists.newArrayList();
+        private List<String> intermediatePaths = Lists.newArrayList();
+        private String cubeName;
+
+        public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
             this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+            this.config = seg.getConfig();
+            this.flatDesc = flatDesc;
+            this.hiveTableDatabase = config.getHiveDatabaseForIntermediateTable();
             this.seg = seg;
+            this.cubeDesc = seg.getCubeDesc();
+            this.cubeName = seg.getCubeInstance().getName();
         }
 
         @Override
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
-            jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId()));
+
+            boolean onlyOneTable = cubeDesc.getModel().getLookupTables().size() == 0;
+            final String baseLocation = getJobWorkingDir(jobFlow);
+            if (onlyOneTable) {
+                // directly use flat table location
+                final String intermediateFactTable = flatDesc.getTableName();
+                final String tableLocation = baseLocation + "/" + intermediateFactTable;
+                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation));
+                intermediatePaths.add(tableLocation);
+            } else {
+                final String mockFactTableName =  MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + cubeName.toLowerCase() + "_"
+                        + seg.getUuid().replaceAll("-", "_") + "_fact";
+                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName));
+                jobFlow.addTask(createFlatTable(mockFactTableName, baseLocation));
+            }
+        }
+        private AbstractExecutable createFlatTable(final String mockFactTableName, String baseLocation) {
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase);
+
+            final IJoinedFlatTableDesc mockfactDesc = new IJoinedFlatTableDesc() {
+
+                @Override
+                public String getTableName() {
+                    return mockFactTableName;
+                }
+
+                @Override
+                public DataModelDesc getDataModel() {
+                    return cubeDesc.getModel();
+                }
+
+                @Override
+                public List<TblColRef> getAllColumns() {
+                    return flatDesc.getFactColumns();
+                }
+
+                @Override
+                public List<TblColRef> getFactColumns() {
+                    return null;
+                }
+
+                @Override
+                public int getColumnIndex(TblColRef colRef) {
+                    return 0;
+                }
+
+                @Override
+                public SegmentRange getSegRange() {
+                    return null;
+                }
+
+                @Override
+                public TblColRef getDistributedBy() {
+                    return null;
+                }
+
+                @Override
+                public TblColRef getClusterBy() {
+                    return null;
+                }
+
+                @Override
+                public ISegment getSegment() {
+                    return null;
+                }
+
+                @Override
+                public boolean useAlias() {
+                    return false;
+                }
+            };
+            final String dropFactTableHql = JoinedFlatTable.generateDropTableStatement(mockfactDesc);
+            final String createFactTableHql = JoinedFlatTable.generateCreateTableStatement(mockfactDesc, baseLocation);
+
+
+            final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
+            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, baseLocation);
+            String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
+            insertDataHqls = insertDataHqls.replace(flatDesc.getDataModel().getRootFactTableName() + " ", mockFactTableName + " ");
+
+            CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
+            CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+            step.setInitStatement(hiveInitStatements);
+            step.setCreateTableStatement(dropFactTableHql + createFactTableHql + dropTableHql + createTableHql + insertDataHqls);
+            step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+
+            intermediateTables.add(flatDesc.getTableName());
+            intermediateTables.add(mockFactTableName);
+            intermediatePaths.add(baseLocation + "/" + flatDesc.getTableName());
+            intermediatePaths.add(baseLocation + "/" + mockFactTableName);
+            return step;
         }
 
-        private MapReduceExecutable createSaveKafkaDataStep(String jobId) {
-            MapReduceExecutable result = new MapReduceExecutable();
+        protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
+            return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
+        }
 
-            IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg);
-            outputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
+        private MapReduceExecutable createSaveKafkaDataStep(String jobId, String location) {
+            MapReduceExecutable result = new MapReduceExecutable();
             result.setName("Save data from Kafka");
             result.setMapReduceJobClass(KafkaFlatTableJob.class);
             JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system");
             StringBuilder cmd = new StringBuilder();
             jobBuilderSupport.appendMapReduceParameters(cmd);
             JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
-            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, location);
             JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
             JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
 
@@ -147,20 +250,17 @@ public class KafkaMRInput implements IMRInput {
 
         @Override
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            GarbageCollectionStep step = new GarbageCollectionStep();
-            step.setName(ExecutableConstants.STEP_NAME_KAFKA_CLEANUP);
-            step.setDataPath(outputPath);
+            HiveMRInput.GarbageCollectionStep step = new HiveMRInput.GarbageCollectionStep();
+            step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
+            step.setIntermediateTables(intermediateTables);
+            step.setExternalDataPaths(intermediatePaths);
             jobFlow.addTask(step);
 
         }
 
         @Override
         public IMRTableInputFormat getFlatTableInputFormat() {
-            KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
-            KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(seg.getCubeInstance().getRootFactTable());
-            List<TblColRef> columns = new CubeJoinedFlatTableDesc(seg).getAllColumns();
-
-            return new KafkaTableInputFormat(seg, columns, kafkaConfig, conf);
+            return new KafkaTableInputFormat(seg, conf);
         }
     }
 
@@ -178,43 +278,11 @@ public class KafkaMRInput implements IMRInput {
             final MergeOffsetStep result = new MergeOffsetStep();
             result.setName("Merge offset step");
 
-            CubingExecutableUtil.setCubeName(cubeSegment.getRealization().getName(), result.getParams());
+            CubingExecutableUtil.setCubeName(cubeSegment.getCubeInstance().getName(), result.getParams());
             CubingExecutableUtil.setSegmentId(cubeSegment.getUuid(), result.getParams());
             CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams());
             jobFlow.addTask(result);
         }
     }
 
-    public static class GarbageCollectionStep extends AbstractExecutable {
-        private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
-
-        @Override
-        protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-            try {
-                rmdirOnHDFS(getDataPath());
-            } catch (IOException e) {
-                logger.error("job:" + getId() + " execute finished with exception", e);
-                return ExecuteResult.createError(e);
-            }
-
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "HDFS path " + getDataPath() + " is dropped.\n");
-        }
-
-        private void rmdirOnHDFS(String path) throws IOException {
-            Path externalDataPath = new Path(path);
-            FileSystem fs = HadoopUtil.getWorkingFileSystem();
-            if (fs.exists(externalDataPath)) {
-                fs.delete(externalDataPath, true);
-            }
-        }
-
-        public void setDataPath(String externalDataPath) {
-            setParam("dataPath", externalDataPath);
-        }
-
-        private String getDataPath() {
-            return getParam("dataPath");
-        }
-
-    }
 }
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 0ab83c6..1d65b96 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -31,6 +31,7 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
@@ -224,7 +225,9 @@ public class KafkaSource implements ISource {
             public List<String> getRelatedKylinResources(TableDesc table) {
                 List<String> dependentResources = Lists.newArrayList();
                 dependentResources.add(KafkaConfig.concatResourcePath(table.getIdentity()));
-                dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity()));
+                if (table.getSourceType() == ISourceAware.ID_STREAMING) {
+                    dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity()));
+                }
                 return dependentResources;
             }
 
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 696c20c..c31132b 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -67,6 +67,9 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("margin")
     private long margin;
 
+    @JsonProperty("splitRows")
+    private int splitRows=1000000;
+
     //"configA=1;configB=2"
     @JsonProperty("parserProperties")
     private String parserProperties;
@@ -157,6 +160,15 @@ public class KafkaConfig extends RootPersistentEntity {
         return sb.toString();
     }
 
+
+    public int getSplitRows() {
+        return splitRows;
+    }
+
+    public void setSplitRows(int splitRows) {
+        this.splitRows = splitRows;
+    }
+
     @Override
     public KafkaConfig clone() {
         try {
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index b71ca84..e106a0a 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -64,6 +64,8 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
     public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format";
     public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name";
 
+    public static final String CONFIG_KAFKA_SPLIT_ROWS = "kafka.split.rows";
+
     @Override
     public int run(String[] args) throws Exception {
         Options options = new Options();
@@ -111,6 +113,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
             job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
             job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
             job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
+            job.getConfiguration().set(CONFIG_KAFKA_SPLIT_ROWS, String.valueOf(kafkaConfig.getSplitRows()));
             job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
             setupMapper(cube.getSegmentById(segmentId));
             job.setNumReduceTasks(0);
@@ -152,7 +155,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
 
         job.setMapperClass(KafkaFlatTableMapper.class);
         job.setInputFormatClass(KafkaInputFormat.class);
-        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputKeyClass(BytesWritable.class);
         job.setOutputValueClass(Text.class);
         job.setOutputFormatClass(SequenceFileOutputFormat.class);
         job.setNumReduceTasks(0);
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
index 9fe29ca..b452b12 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
@@ -25,7 +25,6 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
@@ -38,14 +37,18 @@ import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.StreamingParser;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritable, NullWritable, Text> {
+public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritable, BytesWritable, Text> {
 
-    private NullWritable outKey = NullWritable.get();
+    private BytesWritable outKey = new BytesWritable();
+    private static final Logger logger = LoggerFactory.getLogger(KafkaFlatTableMapper.class);
     private Text outValue = new Text();
     private KylinConfig config;
     private CubeSegment cubeSegment;
@@ -60,15 +63,18 @@ public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritabl
 
         config = AbstractHadoopJob.loadKylinPropsAndMetadata();
         String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
-        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+        final CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
         this.cubeSegment = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
-        this.delimiter = cubeSegment.getConfig().getFlatTableFieldDelimiter();
-        KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(config);
-        KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cubeSegment.getCubeInstance().getRootFactTable());
-        List<TblColRef> columns = new CubeJoinedFlatTableDesc(cubeSegment).getAllColumns();
+        this.delimiter = "\01";//sequence file default delimiter
+        logger.info("Use delimiter: " + delimiter);
+        final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(config);
+        final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cubeSegment.getCubeInstance().getRootFactTable());
+
+        final IJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
+        final List<TblColRef> allColumns = flatTableDesc.getFactColumns();
 
         try {
-            streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getAllParserProperties(), columns);
+            streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getAllParserProperties(), allColumns);
         } catch (ReflectiveOperationException e) {
             throw new IllegalArgumentException(e);
         }
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
index c996c5f..c3ed47f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -55,6 +55,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
         final String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
         final Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN));
         final Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX));
+        final Integer spiltRows = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_SPLIT_ROWS));
 
         final Map<Integer, Long> startOffsetMap = Maps.newHashMap();
         final Map<Integer, Long> endOffsetMap = Maps.newHashMap();
@@ -79,9 +80,18 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
                     throw new IllegalStateException("Partition '" + partitionId + "' not exists.");
                 }
 
-                if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) {
-                    InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, startOffsetMap.get(partitionId), endOffsetMap.get(partitionId));
-                    splits.add(split);
+                long new_start = startOffsetMap.get(partitionId);
+                long end = endOffsetMap.get(partitionId);
+                while (end > new_start) {
+                    if ((end - new_start) <= spiltRows && (end > new_start)) {
+                        InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, end);
+                        splits.add(split);
+                        break;
+                    } else {
+                        InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, new_start + spiltRows);
+                        splits.add(split);
+                        new_start += spiltRows;
+                    }
                 }
             }
         }
@@ -93,4 +103,4 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
         return new KafkaInputRecordReader();
     }
 
-}
+}
\ No newline at end of file
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/SpiltNumTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/SpiltNumTest.java
new file mode 100644
index 0000000..9dfb641
--- /dev/null
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/SpiltNumTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.kafka;
+
+import org.apache.kylin.source.kafka.hadoop.KafkaInputSplit;
+import org.junit.Assert;
+import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import com.google.common.collect.Maps;
+
+public class SpiltNumTest {
+
+    public static List<InputSplit> getSplits() {
+
+        final String brokers = "brokers";
+        final String inputTopic = "topic";
+        final Integer spiltsSetnum = 10;
+
+        final Map<Integer, Long> startOffsetMap = Maps.newHashMap();
+        final Map<Integer, Long> endOffsetMap = Maps.newHashMap();
+        startOffsetMap.put(0, Long.valueOf(0));
+        endOffsetMap.put(0, Long.valueOf(15));
+        startOffsetMap.put(1, Long.valueOf(4));
+        endOffsetMap.put(1, Long.valueOf(26));
+        startOffsetMap.put(2, Long.valueOf(15));
+        endOffsetMap.put(2, Long.valueOf(47));
+        startOffsetMap.put(3, Long.valueOf(39));
+        endOffsetMap.put(3, Long.valueOf(41));
+
+        final List<InputSplit> splits = new ArrayList<InputSplit>();
+            for (int i = 0; i < 4; i++) {
+                int partitionId = i;
+                long new_start = startOffsetMap.get(partitionId);
+                long end = endOffsetMap.get(partitionId);
+                while (end > new_start) {
+                    if ((end - new_start) <= spiltsSetnum && (end > new_start)) {
+                        InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, end);
+                        splits.add(split);
+                        break;
+                    } else {
+                        InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, new_start + spiltsSetnum);
+                        splits.add(split);
+                        new_start += spiltsSetnum;
+                    }
+                }
+            }
+        return splits;
+    }
+
+    @Test
+    public void testSpiltNum(){
+        int slen = 0;
+        List<InputSplit> splits = getSplits();
+        slen = splits.size();
+        Assert.assertEquals(slen, 10);
+    }
+
+    @Test
+    public void testSpilt(){
+        boolean flag = false;
+        boolean flag1 = false;
+        boolean flag2 = false;
+        boolean flag3 = false;
+        boolean flag4 = false;
+        boolean flag5 = false;
+        boolean flag6 = false;
+        boolean flag7 = false;
+        boolean flag8 = false;
+        boolean flag9 = false;
+        boolean flag10 = false;
+        boolean result = false;
+        List<InputSplit> splits = getSplits();
+        for(Object eachspilt : splits){
+            flag = eachspilt.toString().contains("brokers-topic-0-0-10");
+            if(flag){
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag1 = eachspilt.toString().contains("brokers-topic-0-10-15");
+            if(flag1){
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag2 = eachspilt.toString().contains("brokers-topic-1-4-14");
+            if(flag2){
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag3 = eachspilt.toString().contains("brokers-topic-1-14-24");
+            if(flag3){
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag4 = eachspilt.toString().contains("brokers-topic-1-24-26");
+            if(flag4){
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag5 = eachspilt.toString().contains("brokers-topic-2-15-25");
+            if(flag5) {
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag6 = eachspilt.toString().contains("brokers-topic-2-25-35");
+            if(flag6){
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag7 = eachspilt.toString().contains("brokers-topic-2-35-45");
+            if(flag7){
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag8 = eachspilt.toString().contains("brokers-topic-2-45-47");
+            if(flag8){
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag9 = eachspilt.toString().contains("brokers-topic-3-39-41");
+            if(flag9){
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag10 = eachspilt.toString().contains("brokers-topic-0-4-47");
+            if(flag10){
+                break;
+            }
+        }
+        result = flag && flag1 && flag2 && flag3 && flag4 && flag5 && flag6 && flag7 && flag8 && flag9;
+        Assert.assertTrue(result);
+        Assert.assertNotEquals(flag10, true);
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.