You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/06/06 15:05:33 UTC

[kylin] 01/02: KYLIN-3378 Support Kafka table join with Hive tables

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit f85d561293cd2cfd36a1674978c719c86cbbee9b
Author: shaofengshi <sh...@apache.org>
AuthorDate: Tue Jun 5 17:22:23 2018 +0800

    KYLIN-3378 Support Kafka table join with Hive tables
---
 .../org/apache/kylin/common/util/BasicTest.java    |   7 +-
 .../kylin/cube/model/CubeJoinedFlatTableDesc.java  |  22 ++-
 .../cube/model/CubeJoinedFlatTableEnrich.java      |  10 ++
 .../model/validation/rule/StreamingCubeRule.java   |  11 --
 .../java/org/apache/kylin/job/JoinedFlatTable.java |  35 ++---
 .../kylin/metadata/model/IJoinedFlatTableDesc.java |   4 +
 .../kylin/engine/mr/common/BaseCuboidBuilder.java  |   1 -
 .../kylin/engine/mr/common/BatchConstants.java     |   3 +
 .../kylin/engine/spark/SparkCubingByLayer.java     |   7 +-
 .../org/apache/kylin/source/hive/HiveMRInput.java  |  68 +++++----
 source-kafka/pom.xml                               |   4 +
 .../apache/kylin/source/kafka/KafkaMRInput.java    | 156 +++++++++++++++++----
 .../org/apache/kylin/source/kafka/KafkaSource.java |   5 +-
 .../kylin/source/kafka/config/KafkaConfig.java     |  12 ++
 .../source/kafka/hadoop/KafkaFlatTableJob.java     |   7 +-
 .../source/kafka/hadoop/KafkaFlatTableMapper.java  |  24 ++--
 .../source/kafka/hadoop/KafkaInputFormat.java      |  28 +++-
 17 files changed, 296 insertions(+), 108 deletions(-)

diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 1c1e389..4b81daf 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -231,10 +231,15 @@ public class BasicTest {
 
         String[] origin = new String[] {"ab,c", "cd|e"};
 
-        String delimiter = "\u001F"; // "\t";
+        // test with sequence file default delimiter
+        String delimiter = "\01"; //"\u001F"; "\t";
         String concated = StringUtils.join(Arrays.asList(origin), delimiter);
+        System.out.println(concated);
+
         String[] newValues = concated.split(delimiter);
+        Assert.assertEquals(origin, newValues);
 
+        newValues = concated.split("\\" + delimiter);
         Assert.assertEquals(origin, newValues);
     }
 
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index d50a5af..2ab7aac 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -42,7 +42,7 @@ import com.google.common.collect.Maps;
 @SuppressWarnings("serial")
 public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializable {
 
-    protected final String tableName;
+    protected String tableName;
     protected final CubeDesc cubeDesc;
     protected final CubeSegment cubeSegment;
     protected final boolean includingDerived;
@@ -135,6 +135,18 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
         }
     }
 
+    @Override
+    public List<TblColRef> getFactColumns() {
+        final List<TblColRef> factColumns = Lists.newArrayList();
+        for (TblColRef col : this.getAllColumns()) {
+            if (col.getTableRef().equals(getDataModel().getRootFactTable())) {
+                // only fetch the columns from fact table
+                factColumns.add(col);
+            }
+        }
+        return factColumns;
+    }
+
     // sanity check the input record (in bytes) matches what's expected
     public void sanityCheck(BytesSplitter bytesSplitter) {
         if (columnCount != bytesSplitter.getBufferSize()) {
@@ -171,6 +183,9 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
 
     @Override
     public SegmentRange getSegRange() {
+        if (cubeSegment.isOffsetCube()) {
+            return null;
+        }
         return cubeSegment.getSegRange();
     }
 
@@ -185,6 +200,11 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
     }
 
     @Override
+    public boolean useAlias() {
+        return true;
+    }
+
+    @Override
     public TblColRef getClusterBy() {
         return cubeDesc.getClusteredByColumn();
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
index 73da802..f09478e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
@@ -105,6 +105,11 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, Serializ
     }
 
     @Override
+    public List<TblColRef> getFactColumns() {
+        return flatDesc.getFactColumns();
+    }
+
+    @Override
     public DataModelDesc getDataModel() {
         return flatDesc.getDataModel();
     }
@@ -130,6 +135,11 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, Serializ
     }
 
     @Override
+    public boolean useAlias() {
+        return flatDesc.useAlias();
+    }
+
+    @Override
     public TblColRef getClusterBy() {
         return flatDesc.getClusterBy();
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
index 4438706..647f4c1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
@@ -24,7 +24,6 @@ import org.apache.kylin.cube.model.validation.IValidatorRule;
 import org.apache.kylin.cube.model.validation.ResultLevel;
 import org.apache.kylin.cube.model.validation.ValidateContext;
 import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.IEngineAware;
 import org.apache.kylin.metadata.model.ISourceAware;
 
 import org.apache.kylin.metadata.model.TblColRef;
@@ -49,16 +48,6 @@ public class StreamingCubeRule implements IValidatorRule<CubeDesc> {
             return;
         }
 
-        if (model.getLookupTables().size() > 0) {
-            context.addResult(ResultLevel.ERROR, "Streaming Cube doesn't support star-schema so far; only one fact table is allowed.");
-            return;
-        }
-
-        if (cube.getEngineType() == IEngineAware.ID_SPARK) {
-            context.addResult(ResultLevel.ERROR, "Spark engine doesn't support streaming source, select MapReduce engine instead.");
-            return;
-        }
-
         if (model.getPartitionDesc() == null || model.getPartitionDesc().getPartitionDateColumn() == null) {
             context.addResult(ResultLevel.ERROR, "Must define a partition column.");
             return;
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 528bcf0..57e1c40 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -44,6 +44,9 @@ import org.apache.kylin.metadata.model.TblColRef;
 
 public class JoinedFlatTable {
 
+    public static final String TEXTFILE = "TEXTFILE";
+    public static final String SEQUENCEFILE = "SEQUENCEFILE";
+
     public static String getTableDir(IJoinedFlatTableDesc flatDesc, String storageDfsDir) {
         return storageDfsDir + "/" + flatDesc.getTableName();
     }
@@ -78,10 +81,10 @@ public class JoinedFlatTable {
             if (i > 0) {
                 ddl.append(",");
             }
-            ddl.append(colName(col) + " " + getHiveDataType(col.getDatatype()) + "\n");
+            ddl.append(colName(col, flatDesc.useAlias()) + " " + getHiveDataType(col.getDatatype()) + "\n");
         }
         ddl.append(")" + "\n");
-        if ("TEXTFILE".equals(storageFormat)) {
+        if (TEXTFILE.equals(storageFormat)) {
             ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\" + fieldDelimiter + "'\n");
         }
         ddl.append("STORED AS " + storageFormat + "\n");
@@ -120,11 +123,6 @@ public class JoinedFlatTable {
                 + ";\n";
     }
 
-    public static String generateInsertPartialDataStatement(IJoinedFlatTableDesc flatDesc) {
-        return "INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc)
-                + ";\n";
-    }
-
     public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc) {
         return generateSelectDataStatement(flatDesc, false, null);
     }
@@ -146,7 +144,7 @@ public class JoinedFlatTable {
             if (skipAsList.contains(colTotalName)) {
                 sql.append(col.getExpressionInSourceDB() + sep);
             } else {
-                sql.append(col.getExpressionInSourceDB() + " as " + colName(col) + sep);
+                sql.append(col.getExpressionInSourceDB() + " as " + colName(col, true) + sep);
             }
         }
         appendJoinStatement(flatDesc, sql, singleLine);
@@ -154,16 +152,6 @@ public class JoinedFlatTable {
         return sql.toString();
     }
 
-    public static String generateCountDataStatement(IJoinedFlatTableDesc flatDesc, final String outputDir) {
-        final StringBuilder sql = new StringBuilder();
-        final TableRef rootTbl = flatDesc.getDataModel().getRootFactTable();
-        sql.append("dfs -mkdir -p " + outputDir + ";\n");
-        sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + rootTbl.getTableIdentity()
-                + " " + rootTbl.getAlias() + "\n");
-        appendWhereStatement(flatDesc, sql);
-        return sql.toString();
-    }
-
     public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) {
         final String sep = singleLine ? " " : "\n";
         Set<TableRef> dimTableCache = new HashSet<>();
@@ -175,7 +163,6 @@ public class JoinedFlatTable {
         for (JoinTableDesc lookupDesc : model.getJoinTables()) {
             JoinDesc join = lookupDesc.getJoin();
             if (join != null && join.getType().equals("") == false) {
-                String joinType = join.getType().toUpperCase();
                 TableRef dimTable = lookupDesc.getTableRef();
                 if (!dimTableCache.contains(dimTable)) {
                     TblColRef[] pk = join.getPrimaryKeyColumns();
@@ -183,6 +170,8 @@ public class JoinedFlatTable {
                     if (pk.length != fk.length) {
                         throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
                     }
+                    String joinType = join.getType().toUpperCase();
+
                     sql.append(joinType + " JOIN " + dimTable.getTableIdentity() + " as " + dimTable.getAlias() + sep);
                     sql.append("ON ");
                     for (int i = 0; i < pk.length; i++) {
@@ -201,7 +190,7 @@ public class JoinedFlatTable {
 
     private static void appendDistributeStatement(StringBuilder sql, TblColRef redistCol) {
         if (redistCol != null) {
-            sql.append(" DISTRIBUTE BY ").append(colName(redistCol)).append(";\n");
+            sql.append(" DISTRIBUTE BY ").append(colName(redistCol, true)).append(";\n");
         } else {
             sql.append(" DISTRIBUTE BY RAND()").append(";\n");
         }
@@ -244,7 +233,11 @@ public class JoinedFlatTable {
     }
 
     private static String colName(TblColRef col) {
-        return col.getTableAlias() + "_" + col.getName();
+        return colName(col, true);
+    }
+
+    private static String colName(TblColRef col, boolean useAlias) {
+        return useAlias ? col.getTableAlias() + "_" + col.getName() : col.getName();
     }
 
     private static String getHiveDataType(String javaDataType) {
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index 0589829..8f86a52 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -30,6 +30,8 @@ public interface IJoinedFlatTableDesc {
 
     List<TblColRef> getAllColumns();
 
+    List<TblColRef> getFactColumns();
+
     int getColumnIndex(TblColRef colRef);
 
     SegmentRange getSegRange();
@@ -41,4 +43,6 @@ public interface IJoinedFlatTableDesc {
     // optionally present
     ISegment getSegment();
 
+    boolean useAlias();
+
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
index 7cc7779..40f1ac5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
@@ -47,7 +47,6 @@ public class BaseCuboidBuilder implements java.io.Serializable {
 
     protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidBuilder.class);
     public static final String HIVE_NULL = "\\N";
-    public static final String SEQUENCEFILE_DELIMITER = "\\01";
     protected String cubeName;
     protected Cuboid baseCuboid;
     protected CubeDesc cubeDesc;
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 18ac4ac..d38f7a4 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -25,6 +25,9 @@ public interface BatchConstants {
      */
     char INTERMEDIATE_TABLE_ROW_DELIMITER = 127;
 
+
+    String SEQUENCE_FILE_DEFAULT_DELIMITER = "\01";
+
     /**
      * ConFiGuration entry names for MR jobs
      */
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 711c9ac..80a23ac 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,8 +17,6 @@
 */
 package org.apache.kylin.engine.spark;
 
-import static org.apache.kylin.engine.mr.common.BaseCuboidBuilder.SEQUENCEFILE_DELIMITER;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
@@ -64,6 +62,7 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
 import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.measure.MeasureIngester;
@@ -175,7 +174,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure);
         StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER();
 
-        boolean isSequenceFile = "SEQUENCEFILE".equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+        boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
 
         final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD;
 
@@ -185,7 +184,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
                         @Override
                         public String[] call(Text text) throws Exception {
                             String s = Bytes.toString(text.getBytes(), 0, text.getLength());
-                            return s.split(SEQUENCEFILE_DELIMITER);
+                            return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
                         }
                     }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
         } else {
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index a96f4d5..0e791eb 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -33,6 +34,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.HiveCmdBuilder;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -284,8 +286,8 @@ public class HiveMRInput implements IMRInput {
 
             GarbageCollectionStep step = new GarbageCollectionStep();
             step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
-            step.setIntermediateTableIdentity(getIntermediateTableIdentity());
-            step.setExternalDataPath(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir));
+            step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
+            step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));
             step.setHiveViewIntermediateTableIdentities(hiveViewIntermediateTables);
             jobFlow.addTask(step);
         }
@@ -435,42 +437,58 @@ public class HiveMRInput implements IMRInput {
 
         private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException {
             StringBuffer output = new StringBuffer();
-            final String hiveTable = this.getIntermediateTableIdentity();
-            if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
-                final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
-                hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
-                hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  " + hiveTable + ";");
-                config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
-                output.append("Hive table " + hiveTable + " is dropped. \n");
-                rmdirOnHDFS(getExternalDataPath());
-                output.append(
-                        "Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n");
+            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+            final List<String> hiveTables = this.getIntermediateTables();
+            for (String hiveTable : hiveTables) {
+                if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
+                    hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
+                    hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  " + hiveTable + ";");
+
+                    output.append("Hive table " + hiveTable + " is dropped. \n");
+                }
             }
+            config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
+            rmdirOnHDFS(getExternalDataPaths());
+            output.append(
+                    "Path " + getExternalDataPaths() + " is deleted. \n");
+
             return output.toString();
         }
 
-        private void rmdirOnHDFS(String path) throws IOException {
-            Path externalDataPath = new Path(path);
-            FileSystem fs = HadoopUtil.getWorkingFileSystem();
-            if (fs.exists(externalDataPath)) {
-                fs.delete(externalDataPath, true);
+        private void rmdirOnHDFS(List<String> paths) throws IOException {
+            for (String path : paths) {
+                Path externalDataPath = new Path(path);
+                FileSystem fs = HadoopUtil.getWorkingFileSystem();
+                if (fs.exists(externalDataPath)) {
+                    fs.delete(externalDataPath, true);
+                }
             }
         }
 
-        public void setIntermediateTableIdentity(String tableIdentity) {
-            setParam("oldHiveTable", tableIdentity);
+        public void setIntermediateTables(List<String> tableIdentity) {
+            setParam("oldHiveTables", StringUtil.join(tableIdentity, ","));
         }
 
-        private String getIntermediateTableIdentity() {
-            return getParam("oldHiveTable");
+        private List<String> getIntermediateTables() {
+            List<String> intermediateTables = Lists.newArrayList();
+            String[] tables = StringUtil.splitAndTrim(getParam("oldHiveTables"), ",");
+            for (String t : tables) {
+                intermediateTables.add(t);
+            }
+            return intermediateTables;
         }
 
-        public void setExternalDataPath(String externalDataPath) {
-            setParam("externalDataPath", externalDataPath);
+        public void setExternalDataPaths(List<String> externalDataPaths) {
+            setParam("externalDataPaths", StringUtil.join(externalDataPaths, ","));
         }
 
-        private String getExternalDataPath() {
-            return getParam("externalDataPath");
+        private List<String> getExternalDataPaths() {
+            String[] paths = StringUtil.splitAndTrim(getParam("externalDataPaths"), ",");
+            List<String> result = Lists.newArrayList();
+            for (String s : paths) {
+                result.add(s);
+            }
+            return result;
         }
 
         public void setHiveViewIntermediateTableIdentities(String tableIdentities) {
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index 2ef4cdf..55df7f0 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -66,5 +66,9 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-source-hive</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 223e303..bc2426d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -32,6 +33,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
@@ -46,11 +48,15 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.hive.CreateFlatHiveTableStep;
+import org.apache.kylin.source.hive.HiveMRInput;
 import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
 import org.apache.kylin.source.kafka.job.MergeOffsetStep;
 import org.slf4j.Logger;
@@ -58,18 +64,19 @@ import org.slf4j.LoggerFactory;
 
 public class KafkaMRInput implements IMRInput {
 
-    CubeSegment cubeSegment;
+    private static final Logger logger = LoggerFactory.getLogger(KafkaMRInput.class);
+    private CubeSegment cubeSegment;
 
     @Override
     public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
         this.cubeSegment = (CubeSegment) flatDesc.getSegment();
-        return new BatchCubingInputSide(cubeSegment);
+        return new BatchCubingInputSide(cubeSegment, flatDesc);
     }
 
     @Override
     public IMRTableInputFormat getTableInputFormat(TableDesc table) {
 
-        return new KafkaTableInputFormat(cubeSegment, null, null, null);
+        return new KafkaTableInputFormat(cubeSegment, null);
     }
 
     @Override
@@ -80,12 +87,11 @@ public class KafkaMRInput implements IMRInput {
     public static class KafkaTableInputFormat implements IMRTableInputFormat {
         private final CubeSegment cubeSegment;
         private final JobEngineConfig conf;
-        private final String delimiter;
+        private String delimiter = BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER;
 
-        public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) {
+        public KafkaTableInputFormat(CubeSegment cubeSegment, JobEngineConfig conf) {
             this.cubeSegment = cubeSegment;
             this.conf = conf;
-            this.delimiter = cubeSegment.getConfig().getFlatTableFieldDelimiter();
         }
 
         @Override
@@ -114,30 +120,132 @@ public class KafkaMRInput implements IMRInput {
 
         final JobEngineConfig conf;
         final CubeSegment seg;
-        private String outputPath;
-
-        public BatchCubingInputSide(CubeSegment seg) {
+        private CubeDesc cubeDesc ;
+        private KylinConfig config;
+        protected IJoinedFlatTableDesc flatDesc;
+        protected String hiveTableDatabase;
+        private List<String> intermediateTables = Lists.newArrayList();
+        private List<String> intermediatePaths = Lists.newArrayList();
+        private String cubeName;
+
+        public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
             this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+            this.config = seg.getConfig();
+            this.flatDesc = flatDesc;
+            this.hiveTableDatabase = config.getHiveDatabaseForIntermediateTable();
             this.seg = seg;
+            this.cubeDesc = seg.getCubeDesc();
+            this.cubeName = seg.getCubeInstance().getName();
         }
 
         @Override
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
-            jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId()));
+
+            boolean onlyOneTable = cubeDesc.getModel().getLookupTables().size() == 0;
+            final String baseLocation = getJobWorkingDir(jobFlow);
+            if (onlyOneTable) {
+                // directly use flat table location
+                final String intermediateFactTable = flatDesc.getTableName();
+                final String tableLocation = baseLocation + "/" + intermediateFactTable;
+                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation));
+                intermediatePaths.add(tableLocation);
+            } else {
+                final String mockFactTableName =  MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + cubeName.toLowerCase() + "_"
+                        + seg.getUuid().replaceAll("-", "_") + "_fact";
+                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName));
+                jobFlow.addTask(createFlatTable(mockFactTableName, baseLocation));
+            }
+        }
+        private AbstractExecutable createFlatTable(final String mockFactTableName, String baseLocation) {
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase);
+
+            final IJoinedFlatTableDesc mockfactDesc = new IJoinedFlatTableDesc() {
+
+                @Override
+                public String getTableName() {
+                    return mockFactTableName;
+                }
+
+                @Override
+                public DataModelDesc getDataModel() {
+                    return cubeDesc.getModel();
+                }
+
+                @Override
+                public List<TblColRef> getAllColumns() {
+                    return flatDesc.getFactColumns();
+                }
+
+                @Override
+                public List<TblColRef> getFactColumns() {
+                    return null;
+                }
+
+                @Override
+                public int getColumnIndex(TblColRef colRef) {
+                    return 0;
+                }
+
+                @Override
+                public SegmentRange getSegRange() {
+                    return null;
+                }
+
+                @Override
+                public TblColRef getDistributedBy() {
+                    return null;
+                }
+
+                @Override
+                public TblColRef getClusterBy() {
+                    return null;
+                }
+
+                @Override
+                public ISegment getSegment() {
+                    return null;
+                }
+
+                @Override
+                public boolean useAlias() {
+                    return false;
+                }
+            };
+            final String dropFactTableHql = JoinedFlatTable.generateDropTableStatement(mockfactDesc);
+            // the table inputformat is sequence file
+            final String createFactTableHql = JoinedFlatTable.generateCreateTableStatement(mockfactDesc, baseLocation, JoinedFlatTable.SEQUENCEFILE);
+
+            final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
+            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, baseLocation);
+            String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
+            insertDataHqls = insertDataHqls.replace(flatDesc.getDataModel().getRootFactTableName() + " ", mockFactTableName + " ");
+
+            CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
+            CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+            step.setInitStatement(hiveInitStatements);
+            step.setCreateTableStatement(dropFactTableHql + createFactTableHql + dropTableHql + createTableHql + insertDataHqls);
+            step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+
+            intermediateTables.add(flatDesc.getTableName());
+            intermediateTables.add(mockFactTableName);
+            intermediatePaths.add(baseLocation + "/" + flatDesc.getTableName());
+            intermediatePaths.add(baseLocation + "/" + mockFactTableName);
+            return step;
         }
 
-        private MapReduceExecutable createSaveKafkaDataStep(String jobId) {
-            MapReduceExecutable result = new MapReduceExecutable();
+        protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
+            return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
+        }
 
-            IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg);
-            outputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
+        private MapReduceExecutable createSaveKafkaDataStep(String jobId, String location) {
+            MapReduceExecutable result = new MapReduceExecutable();
             result.setName("Save data from Kafka");
             result.setMapReduceJobClass(KafkaFlatTableJob.class);
             JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system");
             StringBuilder cmd = new StringBuilder();
             jobBuilderSupport.appendMapReduceParameters(cmd);
             JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
-            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, location);
             JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
             JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
 
@@ -147,20 +255,17 @@ public class KafkaMRInput implements IMRInput {
 
         @Override
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            GarbageCollectionStep step = new GarbageCollectionStep();
-            step.setName(ExecutableConstants.STEP_NAME_KAFKA_CLEANUP);
-            step.setDataPath(outputPath);
+            HiveMRInput.GarbageCollectionStep step = new HiveMRInput.GarbageCollectionStep();
+            step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
+            step.setIntermediateTables(intermediateTables);
+            step.setExternalDataPaths(intermediatePaths);
             jobFlow.addTask(step);
 
         }
 
         @Override
         public IMRTableInputFormat getFlatTableInputFormat() {
-            KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
-            KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(seg.getCubeInstance().getRootFactTable());
-            List<TblColRef> columns = new CubeJoinedFlatTableDesc(seg).getAllColumns();
-
-            return new KafkaTableInputFormat(seg, columns, kafkaConfig, conf);
+            return new KafkaTableInputFormat(seg, conf);
         }
     }
 
@@ -178,13 +283,14 @@ public class KafkaMRInput implements IMRInput {
             final MergeOffsetStep result = new MergeOffsetStep();
             result.setName("Merge offset step");
 
-            CubingExecutableUtil.setCubeName(cubeSegment.getRealization().getName(), result.getParams());
+            CubingExecutableUtil.setCubeName(cubeSegment.getCubeInstance().getName(), result.getParams());
             CubingExecutableUtil.setSegmentId(cubeSegment.getUuid(), result.getParams());
             CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams());
             jobFlow.addTask(result);
         }
     }
 
+    @Deprecated
     public static class GarbageCollectionStep extends AbstractExecutable {
         private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
 
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 0ab83c6..1d65b96 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -31,6 +31,7 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
@@ -224,7 +225,9 @@ public class KafkaSource implements ISource {
             public List<String> getRelatedKylinResources(TableDesc table) {
                 List<String> dependentResources = Lists.newArrayList();
                 dependentResources.add(KafkaConfig.concatResourcePath(table.getIdentity()));
-                dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity()));
+                if (table.getSourceType() == ISourceAware.ID_STREAMING) {
+                    dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity()));
+                }
                 return dependentResources;
             }
 
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 696c20c..c31132b 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -67,6 +67,9 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("margin")
     private long margin;
 
+    @JsonProperty("splitRows")
+    private int splitRows=1000000;
+
     //"configA=1;configB=2"
     @JsonProperty("parserProperties")
     private String parserProperties;
@@ -157,6 +160,15 @@ public class KafkaConfig extends RootPersistentEntity {
         return sb.toString();
     }
 
+
+    public int getSplitRows() {
+        return splitRows;
+    }
+
+    public void setSplitRows(int splitRows) {
+        this.splitRows = splitRows;
+    }
+
     @Override
     public KafkaConfig clone() {
         try {
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index b71ca84..e106a0a 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -64,6 +64,8 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
     public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format";
     public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name";
 
+    public static final String CONFIG_KAFKA_SPLIT_ROWS = "kafka.split.rows";
+
     @Override
     public int run(String[] args) throws Exception {
         Options options = new Options();
@@ -111,6 +113,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
             job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
             job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
             job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
+            job.getConfiguration().set(CONFIG_KAFKA_SPLIT_ROWS, String.valueOf(kafkaConfig.getSplitRows()));
             job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
             setupMapper(cube.getSegmentById(segmentId));
             job.setNumReduceTasks(0);
@@ -152,7 +155,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
 
         job.setMapperClass(KafkaFlatTableMapper.class);
         job.setInputFormatClass(KafkaInputFormat.class);
-        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputKeyClass(BytesWritable.class);
         job.setOutputValueClass(Text.class);
         job.setOutputFormatClass(SequenceFileOutputFormat.class);
         job.setNumReduceTasks(0);
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
index 9fe29ca..7f3b5d9 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
@@ -25,7 +25,6 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
@@ -38,14 +37,18 @@ import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.StreamingParser;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritable, NullWritable, Text> {
+public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritable, BytesWritable, Text> {
 
-    private NullWritable outKey = NullWritable.get();
+    private BytesWritable outKey = new BytesWritable();
+    private static final Logger logger = LoggerFactory.getLogger(KafkaFlatTableMapper.class);
     private Text outValue = new Text();
     private KylinConfig config;
     private CubeSegment cubeSegment;
@@ -60,15 +63,18 @@ public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritabl
 
         config = AbstractHadoopJob.loadKylinPropsAndMetadata();
         String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
-        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+        final CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
         this.cubeSegment = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
-        this.delimiter = cubeSegment.getConfig().getFlatTableFieldDelimiter();
-        KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(config);
-        KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cubeSegment.getCubeInstance().getRootFactTable());
-        List<TblColRef> columns = new CubeJoinedFlatTableDesc(cubeSegment).getAllColumns();
+        this.delimiter = BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER; //sequence file default delimiter
+        logger.info("Use delimiter: " + delimiter);
+        final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(config);
+        final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cubeSegment.getCubeInstance().getRootFactTable());
+
+        final IJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
+        final List<TblColRef> allColumns = flatTableDesc.getFactColumns();
 
         try {
-            streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getAllParserProperties(), columns);
+            streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getAllParserProperties(), allColumns);
         } catch (ReflectiveOperationException e) {
             throw new IllegalArgumentException(e);
         }
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
index c996c5f..d7fed24 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -55,6 +55,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
         final String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
         final Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN));
         final Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX));
+        final Integer spiltRows = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_SPLIT_ROWS));
 
         final Map<Integer, Long> startOffsetMap = Maps.newHashMap();
         final Map<Integer, Long> endOffsetMap = Maps.newHashMap();
@@ -69,9 +70,11 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
 
         Properties kafkaProperties = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
         final List<InputSplit> splits = new ArrayList<InputSplit>();
-        try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties)) {
+        try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup,
+                kafkaProperties)) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);
-            Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), "partition number mismatch with server side");
+            Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(),
+                    "partition number mismatch with server side");
             for (int i = 0; i < partitionInfos.size(); i++) {
                 final PartitionInfo partition = partitionInfos.get(i);
                 int partitionId = partition.partition();
@@ -79,9 +82,19 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
                     throw new IllegalStateException("Partition '" + partitionId + "' not exists.");
                 }
 
-                if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) {
-                    InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, startOffsetMap.get(partitionId), endOffsetMap.get(partitionId));
-                    splits.add(split);
+                long new_start = startOffsetMap.get(partitionId);
+                long end = endOffsetMap.get(partitionId);
+                while (end > new_start) {
+                    if ((end - new_start) <= spiltRows && (end > new_start)) {
+                        InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, end);
+                        splits.add(split);
+                        break;
+                    } else {
+                        InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start,
+                                new_start + spiltRows);
+                        splits.add(split);
+                        new_start += spiltRows;
+                    }
                 }
             }
         }
@@ -89,8 +102,9 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
     }
 
     @Override
-    public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
+    public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
+            throws IOException, InterruptedException {
         return new KafkaInputRecordReader();
     }
 
-}
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.