You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/06/06 15:05:32 UTC

[kylin] branch master updated (e5accc5 -> 995a719)

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from e5accc5  minor,update query chart config
     new f85d561  KYLIN-3378 Support Kafka table join with Hive tables
     new 995a719  KYLIN-3137 allowing config Spark storage level

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../org/apache/kylin/common/util/BasicTest.java    |   7 +-
 .../kylin/cube/model/CubeJoinedFlatTableDesc.java  |  22 ++-
 .../cube/model/CubeJoinedFlatTableEnrich.java      |  10 ++
 .../model/validation/rule/StreamingCubeRule.java   |  11 --
 .../java/org/apache/kylin/job/JoinedFlatTable.java |  35 ++---
 .../kylin/metadata/model/IJoinedFlatTableDesc.java |   4 +
 .../kylin/engine/mr/common/BaseCuboidBuilder.java  |   1 -
 .../kylin/engine/mr/common/BatchConstants.java     |   3 +
 .../kylin/engine/spark/SparkCubingByLayer.java     |   9 +-
 .../org/apache/kylin/source/hive/HiveMRInput.java  |  68 +++++----
 source-kafka/pom.xml                               |   4 +
 .../apache/kylin/source/kafka/KafkaMRInput.java    | 156 +++++++++++++++++----
 .../org/apache/kylin/source/kafka/KafkaSource.java |   5 +-
 .../kylin/source/kafka/config/KafkaConfig.java     |  12 ++
 .../source/kafka/hadoop/KafkaFlatTableJob.java     |   7 +-
 .../source/kafka/hadoop/KafkaFlatTableMapper.java  |  24 ++--
 .../source/kafka/hadoop/KafkaInputFormat.java      |  28 +++-
 18 files changed, 301 insertions(+), 109 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.

[kylin] 01/02: KYLIN-3378 Support Kafka table join with Hive tables

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit f85d561293cd2cfd36a1674978c719c86cbbee9b
Author: shaofengshi <sh...@apache.org>
AuthorDate: Tue Jun 5 17:22:23 2018 +0800

    KYLIN-3378 Support Kafka table join with Hive tables
---
 .../org/apache/kylin/common/util/BasicTest.java    |   7 +-
 .../kylin/cube/model/CubeJoinedFlatTableDesc.java  |  22 ++-
 .../cube/model/CubeJoinedFlatTableEnrich.java      |  10 ++
 .../model/validation/rule/StreamingCubeRule.java   |  11 --
 .../java/org/apache/kylin/job/JoinedFlatTable.java |  35 ++---
 .../kylin/metadata/model/IJoinedFlatTableDesc.java |   4 +
 .../kylin/engine/mr/common/BaseCuboidBuilder.java  |   1 -
 .../kylin/engine/mr/common/BatchConstants.java     |   3 +
 .../kylin/engine/spark/SparkCubingByLayer.java     |   7 +-
 .../org/apache/kylin/source/hive/HiveMRInput.java  |  68 +++++----
 source-kafka/pom.xml                               |   4 +
 .../apache/kylin/source/kafka/KafkaMRInput.java    | 156 +++++++++++++++++----
 .../org/apache/kylin/source/kafka/KafkaSource.java |   5 +-
 .../kylin/source/kafka/config/KafkaConfig.java     |  12 ++
 .../source/kafka/hadoop/KafkaFlatTableJob.java     |   7 +-
 .../source/kafka/hadoop/KafkaFlatTableMapper.java  |  24 ++--
 .../source/kafka/hadoop/KafkaInputFormat.java      |  28 +++-
 17 files changed, 296 insertions(+), 108 deletions(-)

diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 1c1e389..4b81daf 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -231,10 +231,15 @@ public class BasicTest {
 
         String[] origin = new String[] {"ab,c", "cd|e"};
 
-        String delimiter = "\u001F"; // "\t";
+        // test with sequence file default delimiter
+        String delimiter = "\01"; //"\u001F"; "\t";
         String concated = StringUtils.join(Arrays.asList(origin), delimiter);
+        System.out.println(concated);
+
         String[] newValues = concated.split(delimiter);
+        Assert.assertEquals(origin, newValues);
 
+        newValues = concated.split("\\" + delimiter);
         Assert.assertEquals(origin, newValues);
     }
 
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index d50a5af..2ab7aac 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -42,7 +42,7 @@ import com.google.common.collect.Maps;
 @SuppressWarnings("serial")
 public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializable {
 
-    protected final String tableName;
+    protected String tableName;
     protected final CubeDesc cubeDesc;
     protected final CubeSegment cubeSegment;
     protected final boolean includingDerived;
@@ -135,6 +135,18 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
         }
     }
 
+    @Override
+    public List<TblColRef> getFactColumns() {
+        final List<TblColRef> factColumns = Lists.newArrayList();
+        for (TblColRef col : this.getAllColumns()) {
+            if (col.getTableRef().equals(getDataModel().getRootFactTable())) {
+                // only fetch the columns from fact table
+                factColumns.add(col);
+            }
+        }
+        return factColumns;
+    }
+
     // sanity check the input record (in bytes) matches what's expected
     public void sanityCheck(BytesSplitter bytesSplitter) {
         if (columnCount != bytesSplitter.getBufferSize()) {
@@ -171,6 +183,9 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
 
     @Override
     public SegmentRange getSegRange() {
+        if (cubeSegment.isOffsetCube()) {
+            return null;
+        }
         return cubeSegment.getSegRange();
     }
 
@@ -185,6 +200,11 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
     }
 
     @Override
+    public boolean useAlias() {
+        return true;
+    }
+
+    @Override
     public TblColRef getClusterBy() {
         return cubeDesc.getClusteredByColumn();
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
index 73da802..f09478e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
@@ -105,6 +105,11 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, Serializ
     }
 
     @Override
+    public List<TblColRef> getFactColumns() {
+        return flatDesc.getFactColumns();
+    }
+
+    @Override
     public DataModelDesc getDataModel() {
         return flatDesc.getDataModel();
     }
@@ -130,6 +135,11 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, Serializ
     }
 
     @Override
+    public boolean useAlias() {
+        return flatDesc.useAlias();
+    }
+
+    @Override
     public TblColRef getClusterBy() {
         return flatDesc.getClusterBy();
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
index 4438706..647f4c1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
@@ -24,7 +24,6 @@ import org.apache.kylin.cube.model.validation.IValidatorRule;
 import org.apache.kylin.cube.model.validation.ResultLevel;
 import org.apache.kylin.cube.model.validation.ValidateContext;
 import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.IEngineAware;
 import org.apache.kylin.metadata.model.ISourceAware;
 
 import org.apache.kylin.metadata.model.TblColRef;
@@ -49,16 +48,6 @@ public class StreamingCubeRule implements IValidatorRule<CubeDesc> {
             return;
         }
 
-        if (model.getLookupTables().size() > 0) {
-            context.addResult(ResultLevel.ERROR, "Streaming Cube doesn't support star-schema so far; only one fact table is allowed.");
-            return;
-        }
-
-        if (cube.getEngineType() == IEngineAware.ID_SPARK) {
-            context.addResult(ResultLevel.ERROR, "Spark engine doesn't support streaming source, select MapReduce engine instead.");
-            return;
-        }
-
         if (model.getPartitionDesc() == null || model.getPartitionDesc().getPartitionDateColumn() == null) {
             context.addResult(ResultLevel.ERROR, "Must define a partition column.");
             return;
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 528bcf0..57e1c40 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -44,6 +44,9 @@ import org.apache.kylin.metadata.model.TblColRef;
 
 public class JoinedFlatTable {
 
+    public static final String TEXTFILE = "TEXTFILE";
+    public static final String SEQUENCEFILE = "SEQUENCEFILE";
+
     public static String getTableDir(IJoinedFlatTableDesc flatDesc, String storageDfsDir) {
         return storageDfsDir + "/" + flatDesc.getTableName();
     }
@@ -78,10 +81,10 @@ public class JoinedFlatTable {
             if (i > 0) {
                 ddl.append(",");
             }
-            ddl.append(colName(col) + " " + getHiveDataType(col.getDatatype()) + "\n");
+            ddl.append(colName(col, flatDesc.useAlias()) + " " + getHiveDataType(col.getDatatype()) + "\n");
         }
         ddl.append(")" + "\n");
-        if ("TEXTFILE".equals(storageFormat)) {
+        if (TEXTFILE.equals(storageFormat)) {
             ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\" + fieldDelimiter + "'\n");
         }
         ddl.append("STORED AS " + storageFormat + "\n");
@@ -120,11 +123,6 @@ public class JoinedFlatTable {
                 + ";\n";
     }
 
-    public static String generateInsertPartialDataStatement(IJoinedFlatTableDesc flatDesc) {
-        return "INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc)
-                + ";\n";
-    }
-
     public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc) {
         return generateSelectDataStatement(flatDesc, false, null);
     }
@@ -146,7 +144,7 @@ public class JoinedFlatTable {
             if (skipAsList.contains(colTotalName)) {
                 sql.append(col.getExpressionInSourceDB() + sep);
             } else {
-                sql.append(col.getExpressionInSourceDB() + " as " + colName(col) + sep);
+                sql.append(col.getExpressionInSourceDB() + " as " + colName(col, true) + sep);
             }
         }
         appendJoinStatement(flatDesc, sql, singleLine);
@@ -154,16 +152,6 @@ public class JoinedFlatTable {
         return sql.toString();
     }
 
-    public static String generateCountDataStatement(IJoinedFlatTableDesc flatDesc, final String outputDir) {
-        final StringBuilder sql = new StringBuilder();
-        final TableRef rootTbl = flatDesc.getDataModel().getRootFactTable();
-        sql.append("dfs -mkdir -p " + outputDir + ";\n");
-        sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + rootTbl.getTableIdentity()
-                + " " + rootTbl.getAlias() + "\n");
-        appendWhereStatement(flatDesc, sql);
-        return sql.toString();
-    }
-
     public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) {
         final String sep = singleLine ? " " : "\n";
         Set<TableRef> dimTableCache = new HashSet<>();
@@ -175,7 +163,6 @@ public class JoinedFlatTable {
         for (JoinTableDesc lookupDesc : model.getJoinTables()) {
             JoinDesc join = lookupDesc.getJoin();
             if (join != null && join.getType().equals("") == false) {
-                String joinType = join.getType().toUpperCase();
                 TableRef dimTable = lookupDesc.getTableRef();
                 if (!dimTableCache.contains(dimTable)) {
                     TblColRef[] pk = join.getPrimaryKeyColumns();
@@ -183,6 +170,8 @@ public class JoinedFlatTable {
                     if (pk.length != fk.length) {
                         throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
                     }
+                    String joinType = join.getType().toUpperCase();
+
                     sql.append(joinType + " JOIN " + dimTable.getTableIdentity() + " as " + dimTable.getAlias() + sep);
                     sql.append("ON ");
                     for (int i = 0; i < pk.length; i++) {
@@ -201,7 +190,7 @@ public class JoinedFlatTable {
 
     private static void appendDistributeStatement(StringBuilder sql, TblColRef redistCol) {
         if (redistCol != null) {
-            sql.append(" DISTRIBUTE BY ").append(colName(redistCol)).append(";\n");
+            sql.append(" DISTRIBUTE BY ").append(colName(redistCol, true)).append(";\n");
         } else {
             sql.append(" DISTRIBUTE BY RAND()").append(";\n");
         }
@@ -244,7 +233,11 @@ public class JoinedFlatTable {
     }
 
     private static String colName(TblColRef col) {
-        return col.getTableAlias() + "_" + col.getName();
+        return colName(col, true);
+    }
+
+    private static String colName(TblColRef col, boolean useAlias) {
+        return useAlias ? col.getTableAlias() + "_" + col.getName() : col.getName();
     }
 
     private static String getHiveDataType(String javaDataType) {
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index 0589829..8f86a52 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -30,6 +30,8 @@ public interface IJoinedFlatTableDesc {
 
     List<TblColRef> getAllColumns();
 
+    List<TblColRef> getFactColumns();
+
     int getColumnIndex(TblColRef colRef);
 
     SegmentRange getSegRange();
@@ -41,4 +43,6 @@ public interface IJoinedFlatTableDesc {
     // optionally present
     ISegment getSegment();
 
+    boolean useAlias();
+
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
index 7cc7779..40f1ac5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
@@ -47,7 +47,6 @@ public class BaseCuboidBuilder implements java.io.Serializable {
 
     protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidBuilder.class);
     public static final String HIVE_NULL = "\\N";
-    public static final String SEQUENCEFILE_DELIMITER = "\\01";
     protected String cubeName;
     protected Cuboid baseCuboid;
     protected CubeDesc cubeDesc;
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 18ac4ac..d38f7a4 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -25,6 +25,9 @@ public interface BatchConstants {
      */
     char INTERMEDIATE_TABLE_ROW_DELIMITER = 127;
 
+
+    String SEQUENCE_FILE_DEFAULT_DELIMITER = "\01";
+
     /**
      * ConFiGuration entry names for MR jobs
      */
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 711c9ac..80a23ac 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,8 +17,6 @@
 */
 package org.apache.kylin.engine.spark;
 
-import static org.apache.kylin.engine.mr.common.BaseCuboidBuilder.SEQUENCEFILE_DELIMITER;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
@@ -64,6 +62,7 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
 import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.measure.MeasureIngester;
@@ -175,7 +174,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure);
         StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER();
 
-        boolean isSequenceFile = "SEQUENCEFILE".equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+        boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
 
         final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD;
 
@@ -185,7 +184,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
                         @Override
                         public String[] call(Text text) throws Exception {
                             String s = Bytes.toString(text.getBytes(), 0, text.getLength());
-                            return s.split(SEQUENCEFILE_DELIMITER);
+                            return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
                         }
                     }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
         } else {
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index a96f4d5..0e791eb 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -33,6 +34,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.HiveCmdBuilder;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -284,8 +286,8 @@ public class HiveMRInput implements IMRInput {
 
             GarbageCollectionStep step = new GarbageCollectionStep();
             step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
-            step.setIntermediateTableIdentity(getIntermediateTableIdentity());
-            step.setExternalDataPath(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir));
+            step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
+            step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));
             step.setHiveViewIntermediateTableIdentities(hiveViewIntermediateTables);
             jobFlow.addTask(step);
         }
@@ -435,42 +437,58 @@ public class HiveMRInput implements IMRInput {
 
         private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException {
             StringBuffer output = new StringBuffer();
-            final String hiveTable = this.getIntermediateTableIdentity();
-            if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
-                final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
-                hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
-                hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  " + hiveTable + ";");
-                config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
-                output.append("Hive table " + hiveTable + " is dropped. \n");
-                rmdirOnHDFS(getExternalDataPath());
-                output.append(
-                        "Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n");
+            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+            final List<String> hiveTables = this.getIntermediateTables();
+            for (String hiveTable : hiveTables) {
+                if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
+                    hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
+                    hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  " + hiveTable + ";");
+
+                    output.append("Hive table " + hiveTable + " is dropped. \n");
+                }
             }
+            config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
+            rmdirOnHDFS(getExternalDataPaths());
+            output.append(
+                    "Path " + getExternalDataPaths() + " is deleted. \n");
+
             return output.toString();
         }
 
-        private void rmdirOnHDFS(String path) throws IOException {
-            Path externalDataPath = new Path(path);
-            FileSystem fs = HadoopUtil.getWorkingFileSystem();
-            if (fs.exists(externalDataPath)) {
-                fs.delete(externalDataPath, true);
+        private void rmdirOnHDFS(List<String> paths) throws IOException {
+            for (String path : paths) {
+                Path externalDataPath = new Path(path);
+                FileSystem fs = HadoopUtil.getWorkingFileSystem();
+                if (fs.exists(externalDataPath)) {
+                    fs.delete(externalDataPath, true);
+                }
             }
         }
 
-        public void setIntermediateTableIdentity(String tableIdentity) {
-            setParam("oldHiveTable", tableIdentity);
+        public void setIntermediateTables(List<String> tableIdentity) {
+            setParam("oldHiveTables", StringUtil.join(tableIdentity, ","));
         }
 
-        private String getIntermediateTableIdentity() {
-            return getParam("oldHiveTable");
+        private List<String> getIntermediateTables() {
+            List<String> intermediateTables = Lists.newArrayList();
+            String[] tables = StringUtil.splitAndTrim(getParam("oldHiveTables"), ",");
+            for (String t : tables) {
+                intermediateTables.add(t);
+            }
+            return intermediateTables;
         }
 
-        public void setExternalDataPath(String externalDataPath) {
-            setParam("externalDataPath", externalDataPath);
+        public void setExternalDataPaths(List<String> externalDataPaths) {
+            setParam("externalDataPaths", StringUtil.join(externalDataPaths, ","));
         }
 
-        private String getExternalDataPath() {
-            return getParam("externalDataPath");
+        private List<String> getExternalDataPaths() {
+            String[] paths = StringUtil.splitAndTrim(getParam("externalDataPaths"), ",");
+            List<String> result = Lists.newArrayList();
+            for (String s : paths) {
+                result.add(s);
+            }
+            return result;
         }
 
         public void setHiveViewIntermediateTableIdentities(String tableIdentities) {
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index 2ef4cdf..55df7f0 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -66,5 +66,9 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-source-hive</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 223e303..bc2426d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -32,6 +33,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
@@ -46,11 +48,15 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.hive.CreateFlatHiveTableStep;
+import org.apache.kylin.source.hive.HiveMRInput;
 import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
 import org.apache.kylin.source.kafka.job.MergeOffsetStep;
 import org.slf4j.Logger;
@@ -58,18 +64,19 @@ import org.slf4j.LoggerFactory;
 
 public class KafkaMRInput implements IMRInput {
 
-    CubeSegment cubeSegment;
+    private static final Logger logger = LoggerFactory.getLogger(KafkaMRInput.class);
+    private CubeSegment cubeSegment;
 
     @Override
     public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
         this.cubeSegment = (CubeSegment) flatDesc.getSegment();
-        return new BatchCubingInputSide(cubeSegment);
+        return new BatchCubingInputSide(cubeSegment, flatDesc);
     }
 
     @Override
     public IMRTableInputFormat getTableInputFormat(TableDesc table) {
 
-        return new KafkaTableInputFormat(cubeSegment, null, null, null);
+        return new KafkaTableInputFormat(cubeSegment, null);
     }
 
     @Override
@@ -80,12 +87,11 @@ public class KafkaMRInput implements IMRInput {
     public static class KafkaTableInputFormat implements IMRTableInputFormat {
         private final CubeSegment cubeSegment;
         private final JobEngineConfig conf;
-        private final String delimiter;
+        private String delimiter = BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER;
 
-        public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) {
+        public KafkaTableInputFormat(CubeSegment cubeSegment, JobEngineConfig conf) {
             this.cubeSegment = cubeSegment;
             this.conf = conf;
-            this.delimiter = cubeSegment.getConfig().getFlatTableFieldDelimiter();
         }
 
         @Override
@@ -114,30 +120,132 @@ public class KafkaMRInput implements IMRInput {
 
         final JobEngineConfig conf;
         final CubeSegment seg;
-        private String outputPath;
-
-        public BatchCubingInputSide(CubeSegment seg) {
+        private CubeDesc cubeDesc ;
+        private KylinConfig config;
+        protected IJoinedFlatTableDesc flatDesc;
+        protected String hiveTableDatabase;
+        private List<String> intermediateTables = Lists.newArrayList();
+        private List<String> intermediatePaths = Lists.newArrayList();
+        private String cubeName;
+
+        public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
             this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+            this.config = seg.getConfig();
+            this.flatDesc = flatDesc;
+            this.hiveTableDatabase = config.getHiveDatabaseForIntermediateTable();
             this.seg = seg;
+            this.cubeDesc = seg.getCubeDesc();
+            this.cubeName = seg.getCubeInstance().getName();
         }
 
         @Override
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
-            jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId()));
+
+            boolean onlyOneTable = cubeDesc.getModel().getLookupTables().size() == 0;
+            final String baseLocation = getJobWorkingDir(jobFlow);
+            if (onlyOneTable) {
+                // directly use flat table location
+                final String intermediateFactTable = flatDesc.getTableName();
+                final String tableLocation = baseLocation + "/" + intermediateFactTable;
+                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation));
+                intermediatePaths.add(tableLocation);
+            } else {
+                final String mockFactTableName =  MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + cubeName.toLowerCase() + "_"
+                        + seg.getUuid().replaceAll("-", "_") + "_fact";
+                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName));
+                jobFlow.addTask(createFlatTable(mockFactTableName, baseLocation));
+            }
+        }
+        private AbstractExecutable createFlatTable(final String mockFactTableName, String baseLocation) {
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase);
+
+            final IJoinedFlatTableDesc mockfactDesc = new IJoinedFlatTableDesc() {
+
+                @Override
+                public String getTableName() {
+                    return mockFactTableName;
+                }
+
+                @Override
+                public DataModelDesc getDataModel() {
+                    return cubeDesc.getModel();
+                }
+
+                @Override
+                public List<TblColRef> getAllColumns() {
+                    return flatDesc.getFactColumns();
+                }
+
+                @Override
+                public List<TblColRef> getFactColumns() {
+                    return null;
+                }
+
+                @Override
+                public int getColumnIndex(TblColRef colRef) {
+                    return 0;
+                }
+
+                @Override
+                public SegmentRange getSegRange() {
+                    return null;
+                }
+
+                @Override
+                public TblColRef getDistributedBy() {
+                    return null;
+                }
+
+                @Override
+                public TblColRef getClusterBy() {
+                    return null;
+                }
+
+                @Override
+                public ISegment getSegment() {
+                    return null;
+                }
+
+                @Override
+                public boolean useAlias() {
+                    return false;
+                }
+            };
+            final String dropFactTableHql = JoinedFlatTable.generateDropTableStatement(mockfactDesc);
+            // the table inputformat is sequence file
+            final String createFactTableHql = JoinedFlatTable.generateCreateTableStatement(mockfactDesc, baseLocation, JoinedFlatTable.SEQUENCEFILE);
+
+            final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
+            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, baseLocation);
+            String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
+            insertDataHqls = insertDataHqls.replace(flatDesc.getDataModel().getRootFactTableName() + " ", mockFactTableName + " ");
+
+            CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
+            CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+            step.setInitStatement(hiveInitStatements);
+            step.setCreateTableStatement(dropFactTableHql + createFactTableHql + dropTableHql + createTableHql + insertDataHqls);
+            step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+
+            intermediateTables.add(flatDesc.getTableName());
+            intermediateTables.add(mockFactTableName);
+            intermediatePaths.add(baseLocation + "/" + flatDesc.getTableName());
+            intermediatePaths.add(baseLocation + "/" + mockFactTableName);
+            return step;
         }
 
-        private MapReduceExecutable createSaveKafkaDataStep(String jobId) {
-            MapReduceExecutable result = new MapReduceExecutable();
+        protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
+            return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
+        }
 
-            IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg);
-            outputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
+        private MapReduceExecutable createSaveKafkaDataStep(String jobId, String location) {
+            MapReduceExecutable result = new MapReduceExecutable();
             result.setName("Save data from Kafka");
             result.setMapReduceJobClass(KafkaFlatTableJob.class);
             JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system");
             StringBuilder cmd = new StringBuilder();
             jobBuilderSupport.appendMapReduceParameters(cmd);
             JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
-            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, location);
             JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
             JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
 
@@ -147,20 +255,17 @@ public class KafkaMRInput implements IMRInput {
 
         @Override
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            GarbageCollectionStep step = new GarbageCollectionStep();
-            step.setName(ExecutableConstants.STEP_NAME_KAFKA_CLEANUP);
-            step.setDataPath(outputPath);
+            HiveMRInput.GarbageCollectionStep step = new HiveMRInput.GarbageCollectionStep();
+            step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
+            step.setIntermediateTables(intermediateTables);
+            step.setExternalDataPaths(intermediatePaths);
             jobFlow.addTask(step);
 
         }
 
         @Override
         public IMRTableInputFormat getFlatTableInputFormat() {
-            KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
-            KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(seg.getCubeInstance().getRootFactTable());
-            List<TblColRef> columns = new CubeJoinedFlatTableDesc(seg).getAllColumns();
-
-            return new KafkaTableInputFormat(seg, columns, kafkaConfig, conf);
+            return new KafkaTableInputFormat(seg, conf);
         }
     }
 
@@ -178,13 +283,14 @@ public class KafkaMRInput implements IMRInput {
             final MergeOffsetStep result = new MergeOffsetStep();
             result.setName("Merge offset step");
 
-            CubingExecutableUtil.setCubeName(cubeSegment.getRealization().getName(), result.getParams());
+            CubingExecutableUtil.setCubeName(cubeSegment.getCubeInstance().getName(), result.getParams());
             CubingExecutableUtil.setSegmentId(cubeSegment.getUuid(), result.getParams());
             CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams());
             jobFlow.addTask(result);
         }
     }
 
+    @Deprecated
     public static class GarbageCollectionStep extends AbstractExecutable {
         private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
 
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 0ab83c6..1d65b96 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -31,6 +31,7 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
@@ -224,7 +225,9 @@ public class KafkaSource implements ISource {
             public List<String> getRelatedKylinResources(TableDesc table) {
                 List<String> dependentResources = Lists.newArrayList();
                 dependentResources.add(KafkaConfig.concatResourcePath(table.getIdentity()));
-                dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity()));
+                if (table.getSourceType() == ISourceAware.ID_STREAMING) {
+                    dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity()));
+                }
                 return dependentResources;
             }
 
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 696c20c..c31132b 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -67,6 +67,9 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("margin")
     private long margin;
 
+    @JsonProperty("splitRows")
+    private int splitRows=1000000;
+
     //"configA=1;configB=2"
     @JsonProperty("parserProperties")
     private String parserProperties;
@@ -157,6 +160,15 @@ public class KafkaConfig extends RootPersistentEntity {
         return sb.toString();
     }
 
+
+    public int getSplitRows() {
+        return splitRows;
+    }
+
+    public void setSplitRows(int splitRows) {
+        this.splitRows = splitRows;
+    }
+
     @Override
     public KafkaConfig clone() {
         try {
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index b71ca84..e106a0a 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -64,6 +64,8 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
     public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format";
     public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name";
 
+    public static final String CONFIG_KAFKA_SPLIT_ROWS = "kafka.split.rows";
+
     @Override
     public int run(String[] args) throws Exception {
         Options options = new Options();
@@ -111,6 +113,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
             job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
             job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
             job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
+            job.getConfiguration().set(CONFIG_KAFKA_SPLIT_ROWS, String.valueOf(kafkaConfig.getSplitRows()));
             job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
             setupMapper(cube.getSegmentById(segmentId));
             job.setNumReduceTasks(0);
@@ -152,7 +155,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
 
         job.setMapperClass(KafkaFlatTableMapper.class);
         job.setInputFormatClass(KafkaInputFormat.class);
-        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputKeyClass(BytesWritable.class);
         job.setOutputValueClass(Text.class);
         job.setOutputFormatClass(SequenceFileOutputFormat.class);
         job.setNumReduceTasks(0);
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
index 9fe29ca..7f3b5d9 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
@@ -25,7 +25,6 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
@@ -38,14 +37,18 @@ import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.StreamingParser;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritable, NullWritable, Text> {
+public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritable, BytesWritable, Text> {
 
-    private NullWritable outKey = NullWritable.get();
+    private BytesWritable outKey = new BytesWritable();
+    private static final Logger logger = LoggerFactory.getLogger(KafkaFlatTableMapper.class);
     private Text outValue = new Text();
     private KylinConfig config;
     private CubeSegment cubeSegment;
@@ -60,15 +63,18 @@ public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritabl
 
         config = AbstractHadoopJob.loadKylinPropsAndMetadata();
         String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
-        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+        final CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
         this.cubeSegment = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
-        this.delimiter = cubeSegment.getConfig().getFlatTableFieldDelimiter();
-        KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(config);
-        KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cubeSegment.getCubeInstance().getRootFactTable());
-        List<TblColRef> columns = new CubeJoinedFlatTableDesc(cubeSegment).getAllColumns();
+        this.delimiter = BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER; //sequence file default delimiter
+        logger.info("Use delimiter: " + delimiter);
+        final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(config);
+        final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cubeSegment.getCubeInstance().getRootFactTable());
+
+        final IJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
+        final List<TblColRef> allColumns = flatTableDesc.getFactColumns();
 
         try {
-            streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getAllParserProperties(), columns);
+            streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getAllParserProperties(), allColumns);
         } catch (ReflectiveOperationException e) {
             throw new IllegalArgumentException(e);
         }
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
index c996c5f..d7fed24 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -55,6 +55,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
         final String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
         final Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN));
         final Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX));
+        final Integer spiltRows = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_SPLIT_ROWS));
 
         final Map<Integer, Long> startOffsetMap = Maps.newHashMap();
         final Map<Integer, Long> endOffsetMap = Maps.newHashMap();
@@ -69,9 +70,11 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
 
         Properties kafkaProperties = KafkaConsumerProperties.extractKafkaConfigToProperties(conf);
         final List<InputSplit> splits = new ArrayList<InputSplit>();
-        try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, kafkaProperties)) {
+        try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup,
+                kafkaProperties)) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);
-            Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), "partition number mismatch with server side");
+            Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(),
+                    "partition number mismatch with server side");
             for (int i = 0; i < partitionInfos.size(); i++) {
                 final PartitionInfo partition = partitionInfos.get(i);
                 int partitionId = partition.partition();
@@ -79,9 +82,19 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
                     throw new IllegalStateException("Partition '" + partitionId + "' not exists.");
                 }
 
-                if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) {
-                    InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, startOffsetMap.get(partitionId), endOffsetMap.get(partitionId));
-                    splits.add(split);
+                long new_start = startOffsetMap.get(partitionId);
+                long end = endOffsetMap.get(partitionId);
+                while (end > new_start) {
+                    if ((end - new_start) <= spiltRows && (end > new_start)) {
+                        InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, end);
+                        splits.add(split);
+                        break;
+                    } else {
+                        InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start,
+                                new_start + spiltRows);
+                        splits.add(split);
+                        new_start += spiltRows;
+                    }
                 }
             }
         }
@@ -89,8 +102,9 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
     }
 
     @Override
-    public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
+    public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
+            throws IOException, InterruptedException {
         return new KafkaInputRecordReader();
     }
 
-}
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.

[kylin] 02/02: KYLIN-3137 allowing config Spark storage level

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 995a7198e6d78f0b21bac108a368306ca8cc48fa
Author: shaofengshi <sh...@apache.org>
AuthorDate: Wed Jun 6 09:59:47 2018 +0800

    KYLIN-3137 allowing config Spark storage level
---
 .../src/main/java/org/apache/kylin/common/KylinConfigBase.java        | 4 ++++
 .../main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java   | 2 +-
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index b289891..b2d8e99 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1200,6 +1200,10 @@ abstract public class KylinConfigBase implements Serializable {
         return Integer.valueOf(getOptional("kylin.engine.spark.max-partition", "5000"));
     }
 
+    public String getSparkStorageLevel() {
+        return getOptional("kylin.engine.spark.storage-level", "MEMORY_AND_DISK_SER");
+    }
+
     public boolean isSparkSanityCheckEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.spark.sanity-check-enabled", "false"));
     }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 80a23ac..d609edb 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -172,7 +172,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
             allNormalMeasure = allNormalMeasure && needAggr[i];
         }
         logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure);
-        StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER();
+        StorageLevel storageLevel = StorageLevel.fromString(envConfig.getSparkStorageLevel());
 
         boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
 

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.