You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/05/30 09:36:25 UTC

[kylin] 01/02: KYLIN-3369 Kafka join with hive

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch KYLIN-3369
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 6d623664ad513bd5a2ad7e9694cdd70f5ef7e6f6
Author: GinaZhai <na...@kyligence.io>
AuthorDate: Fri May 25 18:25:48 2018 +0800

    KYLIN-3369 Kafka join with hive
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  43 +++-
 .../org/apache/kylin/common/util/BasicTest.java    |  12 +
 .../kylin/cube/model/CubeJoinedFlatTableDesc.java  |  18 +-
 .../cube/model/CubeJoinedFlatTableEnrich.java      |  11 +
 .../model/validation/rule/StreamingCubeRule.java   |   5 -
 .../java/org/apache/kylin/job/JoinedFlatTable.java | 167 ++++++++++++-
 .../kylin/job/constant/ExecutableConstants.java    |   1 +
 .../kylin/metadata/model/IJoinedFlatTableDesc.java |   9 +
 .../org/apache/kylin/metadata/model/TblColRef.java |  15 +-
 examples/test_case_data/sandbox/kylin.properties   |   2 +-
 source-kafka/pom.xml                               |   4 +
 .../apache/kylin/source/kafka/KafkaMRInput.java    | 268 ++++++++++++++++++++-
 .../org/apache/kylin/source/kafka/KafkaSource.java |   5 +-
 .../source/kafka/hadoop/KafkaFlatTableMapper.java  |   5 +
 .../source/kafka/hadoop/KafkaInputFormat.java      |  18 +-
 .../apache/kylin/source/kafka/SpiltNumTest.java    | 163 +++++++++++++
 16 files changed, 721 insertions(+), 25 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 689d08f..5d543f5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -225,7 +225,11 @@ abstract public class KylinConfigBase implements Serializable {
         return getOptional("kylin.env", "DEV");
     }
 
-    private String cachedHdfsWorkingDirectory;
+    static public String cachedHdfsWorkingDirectory;//////
+
+    public void setHdfsWorkingDirectory(String cachedHdfsWorkingDirectory){///////
+        this.cachedHdfsWorkingDirectory = cachedHdfsWorkingDirectory;
+    }
 
     public String getHdfsWorkingDirectory() {
         if (cachedHdfsWorkingDirectory != null)
@@ -260,6 +264,39 @@ abstract public class KylinConfigBase implements Serializable {
         return cachedHdfsWorkingDirectory;
     }
 
+    public String getHdfsWorkingDirectory(String cachedHdfsWorkingDirectory) {//
+        if (cachedHdfsWorkingDirectory != null)
+            return cachedHdfsWorkingDirectory;
+
+        String root = getOptional("kylin.env.hdfs-working-dir", "/kylin");
+
+        Path path = new Path(root);
+        if (!path.isAbsolute())
+            throw new IllegalArgumentException("kylin.env.hdfs-working-dir must be absolute, but got " + root);
+
+        // make sure path is qualified
+        try {
+            FileSystem fs = path.getFileSystem(HadoopUtil.getCurrentConfiguration());
+            path = fs.makeQualified(path);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        // append metadata-url prefix
+        root = new Path(path, StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).toString();
+
+        if (!root.endsWith("/"))
+            root += "/";
+
+        cachedHdfsWorkingDirectory = root;
+        if (cachedHdfsWorkingDirectory.startsWith("file:")) {
+            cachedHdfsWorkingDirectory = cachedHdfsWorkingDirectory.replace("file:", "file://");
+        } else if (cachedHdfsWorkingDirectory.startsWith("maprfs:")) {
+            cachedHdfsWorkingDirectory = cachedHdfsWorkingDirectory.replace("maprfs:", "maprfs://");
+        }
+        return cachedHdfsWorkingDirectory;
+    }
+
     public String getZookeeperBasePath() {
         return getOptional("kylin.env.zookeeper-base-path", "/kylin");
     }
@@ -723,6 +760,10 @@ abstract public class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(this.getOptional("kylin.source.hive.keep-flat-table", "false"));
     }
 
+    public String getHiveDatabaseForStreamingTable() {
+        return this.getOptional("kylin.source.hive.database-for-streaming-table", "default");
+    }
+
     public String getHiveDatabaseForIntermediateTable() {
         return this.getOptional("kylin.source.hive.database-for-flat-table", "default");
     }
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 1c1e389..fcf302d 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -244,4 +244,16 @@ public class BasicTest {
         cal.setTimeInMillis(t);
         return dateFormat.format(cal.getTime());
     }
+
+    @Test
+    public void testStringSplit() {
+        String[] strings = new String[] {"abc", "bcd"};
+
+        String delimeter = ",";
+        String concated = StringUtils.join(Arrays.asList(strings), delimeter);
+
+        String[] newStrings = concated.split("\\" + delimeter);
+
+        Assert.assertEquals(strings, newStrings);
+    }
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index d50a5af..63df4aa 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -31,6 +31,8 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.base.Preconditions;
@@ -42,10 +44,12 @@ import com.google.common.collect.Maps;
 @SuppressWarnings("serial")
 public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializable {
 
-    protected final String tableName;
-    protected final CubeDesc cubeDesc;
+    protected String tableName;/////
+    protected final CubeDesc cubeDesc;///
     protected final CubeSegment cubeSegment;
     protected final boolean includingDerived;
+    TableRef tableRef;
+    DataModelDesc dataModelDesc;///
 
     private int columnCount = 0;
     private List<TblColRef> columnList = Lists.newArrayList();
@@ -185,6 +189,16 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
     }
 
     @Override
+    public TableRef getTable() {/////
+        return tableRef;
+    }
+
+    @Override
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    @Override
     public TblColRef getClusterBy() {
         return cubeDesc.getClusteredByColumn();
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
index 73da802..b314cc2 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
@@ -28,6 +28,7 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 
 /**
@@ -130,6 +131,16 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, Serializ
     }
 
     @Override
+    public TableRef getTable() {///
+        return null;
+    }
+
+    @Override
+    public void setTableName(String tableName) {
+
+    }
+
+    @Override
     public TblColRef getClusterBy() {
         return flatDesc.getClusterBy();
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
index 4438706..dab8fa4 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
@@ -49,11 +49,6 @@ public class StreamingCubeRule implements IValidatorRule<CubeDesc> {
             return;
         }
 
-        if (model.getLookupTables().size() > 0) {
-            context.addResult(ResultLevel.ERROR, "Streaming Cube doesn't support star-schema so far; only one fact table is allowed.");
-            return;
-        }
-
         if (cube.getEngineType() == IEngineAware.ID_SPARK) {
             context.addResult(ResultLevel.ERROR, "Spark engine doesn't support streaming source, select MapReduce engine instead.");
             return;
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 528bcf0..ac38730 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -61,13 +61,13 @@ public class JoinedFlatTable {
     }
 
     public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir,
-            String storageFormat) {
+                                                      String storageFormat) {
         String fieldDelimiter = flatDesc.getDataModel().getConfig().getFlatTableFieldDelimiter();
         return generateCreateTableStatement(flatDesc, storageDfsDir, storageFormat, fieldDelimiter);
     }
 
     public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir,
-            String storageFormat, String fieldDelimiter) {
+                                                      String storageFormat, String fieldDelimiter) {
         StringBuilder ddl = new StringBuilder();
 
         ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + flatDesc.getTableName() + "\n");
@@ -90,12 +90,53 @@ public class JoinedFlatTable {
         return ddl.toString();
     }
 
+    public static String generateCreateTableStatement1(IJoinedFlatTableDesc flatDesc, String storageDfsDir) {
+        String storageFormat = flatDesc.getDataModel().getConfig().getFlatTableStorageFormat();
+        return generateCreateTableStatement1(flatDesc, storageDfsDir, storageFormat);
+    }
+
+    public static String generateCreateTableStatement1(IJoinedFlatTableDesc flatDesc, String storageDfsDir,
+            String storageFormat) {
+        String fieldDelimiter = flatDesc.getDataModel().getConfig().getFlatTableFieldDelimiter();
+        return generateCreateTableStatement1(flatDesc, storageDfsDir, storageFormat, fieldDelimiter);
+    }
+
+    public static String generateCreateTableStatement1(IJoinedFlatTableDesc flatDesc, String storageDfsDir,
+            String storageFormat, String fieldDelimiter) {
+        StringBuilder ddl = new StringBuilder();
+
+        ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + flatDesc.getDataModel().getRootFactTableName() + "\n");////flatDesc.getTableName()
+
+        ddl.append("(" + "\n");
+        for (int i = 0; i < flatDesc.getAllColumns().size(); i++) {
+            TblColRef col = flatDesc.getAllColumns().get(i);
+            if (i > 0) {
+                ddl.append(",");
+            }
+            ddl.append(col.getName() + " " + getHiveDataType(col.getDatatype()) + "\n");
+        }
+        ddl.append(")" + "\n");
+        if ("TEXTFILE".equals(storageFormat)) {
+            ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '" + fieldDelimiter + "'\n");
+        }
+        ddl.append("STORED AS " + storageFormat + "\n");
+        ddl.append("LOCATION '" + getTableDir(flatDesc, storageDfsDir) + "';").append("\n");
+        ddl.append("ALTER TABLE " + flatDesc.getDataModel().getRootFactTableName() + " SET TBLPROPERTIES('auto.purge'='true');\n");
+        return ddl.toString();
+    }
+
     public static String generateDropTableStatement(IJoinedFlatTableDesc flatDesc) {
         StringBuilder ddl = new StringBuilder();
         ddl.append("DROP TABLE IF EXISTS " + flatDesc.getTableName() + ";").append("\n");
         return ddl.toString();
     }
 
+    public static String generateDropTableStatement1(IJoinedFlatTableDesc flatDesc) {
+        StringBuilder ddl = new StringBuilder();
+        ddl.append("DROP TABLE IF EXISTS " + flatDesc.getDataModel().getRootFactTableName() + ";").append("\n");
+        return ddl.toString();
+    }///
+
     public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc) {
         CubeSegment segment = ((CubeSegment) flatDesc.getSegment());
         KylinConfig kylinConfig;
@@ -120,6 +161,30 @@ public class JoinedFlatTable {
                 + ";\n";
     }
 
+    public static String generateInsertDataStatement1(IJoinedFlatTableDesc flatDesc) {
+        CubeSegment segment = ((CubeSegment) flatDesc.getSegment());
+        KylinConfig kylinConfig;
+        if (null == segment) {
+            kylinConfig = KylinConfig.getInstanceFromEnv();
+        } else {
+            kylinConfig = (flatDesc.getSegment()).getConfig();
+        }
+
+        if (kylinConfig.isAdvancedFlatTableUsed()) {
+            try {
+                Class advancedFlatTable = Class.forName(kylinConfig.getAdvancedFlatTableClass());
+                Method method = advancedFlatTable.getMethod("generateInsertDataStatement", IJoinedFlatTableDesc.class,
+                        JobEngineConfig.class);
+                return (String) method.invoke(null, flatDesc);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        return "INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement1(flatDesc)
+                + ";\n";
+    }
+
     public static String generateInsertPartialDataStatement(IJoinedFlatTableDesc flatDesc) {
         return "INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc)
                 + ";\n";
@@ -129,6 +194,10 @@ public class JoinedFlatTable {
         return generateSelectDataStatement(flatDesc, false, null);
     }
 
+    public static String generateSelectDataStatement1(IJoinedFlatTableDesc flatDesc) {
+        return generateSelectDataStatement1(flatDesc, false, null);
+    }
+
     public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc, boolean singleLine,
             String[] skipAs) {
         final String sep = singleLine ? " " : "\n";
@@ -154,6 +223,31 @@ public class JoinedFlatTable {
         return sql.toString();
     }
 
+    public static String generateSelectDataStatement1(IJoinedFlatTableDesc flatDesc, boolean singleLine,
+                                                     String[] skipAs) {
+        final String sep = singleLine ? " " : "\n";
+        final List<String> skipAsList = (skipAs == null) ? new ArrayList<String>() : Arrays.asList(skipAs);
+
+        StringBuilder sql = new StringBuilder();
+        sql.append("SELECT" + sep);
+
+        for (int i = 0; i < flatDesc.getAllColumns().size(); i++) {
+            TblColRef col = flatDesc.getAllColumns().get(i);
+            if (i > 0) {
+                sql.append(",");
+            }
+            String colTotalName = String.format("%s.%s", col.getTableRef().getTableName(), col.getName());
+            if (skipAsList.contains(colTotalName)) {
+                sql.append(col.getExpressionInSourceDB() + sep);
+            } else {
+                sql.append(col.getExpressionInSourceDB() + " as " + colName(col) + sep);
+            }
+        }
+        appendJoinStatement1(flatDesc, sql, singleLine);
+        appendWhereStatement1(flatDesc, sql, singleLine);
+        return sql.toString();
+    }
+
     public static String generateCountDataStatement(IJoinedFlatTableDesc flatDesc, final String outputDir) {
         final StringBuilder sql = new StringBuilder();
         final TableRef rootTbl = flatDesc.getDataModel().getRootFactTable();
@@ -170,26 +264,61 @@ public class JoinedFlatTable {
 
         DataModelDesc model = flatDesc.getDataModel();
         TableRef rootTable = model.getRootFactTable();
-        sql.append("FROM " + rootTable.getTableIdentity() + " as " + rootTable.getAlias() + " " + sep);
+        sql.append("FROM " + rootTable.getTableIdentity() + " as " + rootTable.getAlias() + " " + sep);//这flatDesc.getTableName()    rootTable.getTableIdentity()
 
         for (JoinTableDesc lookupDesc : model.getJoinTables()) {
             JoinDesc join = lookupDesc.getJoin();
             if (join != null && join.getType().equals("") == false) {
-                String joinType = join.getType().toUpperCase();
                 TableRef dimTable = lookupDesc.getTableRef();
                 if (!dimTableCache.contains(dimTable)) {
                     TblColRef[] pk = join.getPrimaryKeyColumns();
                     TblColRef[] fk = join.getForeignKeyColumns();
                     if (pk.length != fk.length) {
                         throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
+                    }                String joinType = join.getType().toUpperCase();
+
+                    sql.append(joinType + " JOIN " + dimTable.getTableIdentity() + " as " + dimTable.getAlias() + sep);
+                    sql.append("ON ");
+                    for (int i = 0; i < pk.length; i++) {
+                        if (i > 0) {
+                            sql.append(" AND ");
+                        }
+                        sql.append(fk[i].getExpressionInSourceDB1() + " = " + pk[i].getExpressionInSourceDB());
                     }
+                    sql.append(sep);
+
+                    dimTableCache.add(dimTable);
+                }
+            }
+        }
+    }
+
+    public static void appendJoinStatement1(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) {
+        final String sep = singleLine ? " " : "\n";
+        Set<TableRef> dimTableCache = new HashSet<>();
+
+        DataModelDesc model = flatDesc.getDataModel();
+        TableRef rootTable = model.getRootFactTable();
+        sql.append("FROM " + rootTable.getTableIdentity() + " as " + rootTable.getAlias() + " " + sep);//这flatDesc.getTableName()    rootTable.getTableIdentity()
+
+        for (JoinTableDesc lookupDesc : model.getJoinTables()) {
+            JoinDesc join = lookupDesc.getJoin();
+            if (join != null && join.getType().equals("") == false) {
+                TableRef dimTable = lookupDesc.getTableRef();
+                if (!dimTableCache.contains(dimTable)) {
+                    TblColRef[] pk = join.getPrimaryKeyColumns();
+                    TblColRef[] fk = join.getForeignKeyColumns();
+                    if (pk.length != fk.length) {
+                        throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
+                    }                String joinType = join.getType().toUpperCase();
+
                     sql.append(joinType + " JOIN " + dimTable.getTableIdentity() + " as " + dimTable.getAlias() + sep);
                     sql.append("ON ");
                     for (int i = 0; i < pk.length; i++) {
                         if (i > 0) {
                             sql.append(" AND ");
                         }
-                        sql.append(fk[i].getExpressionInSourceDB() + " = " + pk[i].getExpressionInSourceDB());
+                        sql.append(fk[i].getExpressionInSourceDB1() + " = " + pk[i].getExpressionInSourceDB());
                     }
                     sql.append(sep);
 
@@ -243,6 +372,34 @@ public class JoinedFlatTable {
         sql.append(whereBuilder.toString());
     }
 
+    private static void appendWhereStatement1(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) {
+        final String sep = singleLine ? " " : "\n";
+
+        StringBuilder whereBuilder = new StringBuilder();
+        whereBuilder.append("WHERE 1=1");
+
+        DataModelDesc model = flatDesc.getDataModel();
+        if (StringUtils.isNotEmpty(model.getFilterCondition())) {
+            whereBuilder.append(" AND (").append(model.getFilterCondition()).append(") ");
+        }
+
+        /*if (flatDesc.getSegment() != null) {
+            PartitionDesc partDesc = model.getPartitionDesc();
+            if (partDesc != null && partDesc.getPartitionDateColumn() != null) {
+                SegmentRange segRange = flatDesc.getSegRange();
+
+                if (segRange != null && !segRange.isInfinite()) {
+                    whereBuilder.append(" AND (");
+                    whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc,
+                            flatDesc.getSegment(), segRange));
+                    whereBuilder.append(")" + sep);
+                }
+            }
+        }*/
+
+        sql.append(whereBuilder.toString());
+    }
+
     private static String colName(TblColRef col) {
         return col.getTableAlias() + "_" + col.getName();
     }
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index b9a3651..42f0dbf 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -34,6 +34,7 @@ public final class ExecutableConstants {
 
     public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
     public static final String STEP_NAME_BUILD_UHC_DICTIONARY = "Build UHC Dictionary";
+    public static final String STEP_NAME_CREATE_HIVE_TABLE = "Create Hive Table";
     public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
     public static final String STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE = "Sqoop To Flat Hive Table";
     public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables";
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index 0589829..46e21f3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -30,6 +30,8 @@ public interface IJoinedFlatTableDesc {
 
     List<TblColRef> getAllColumns();
 
+    void setAllColumns(List<TblColRef> tblColRefList);
+
     int getColumnIndex(TblColRef colRef);
 
     SegmentRange getSegRange();
@@ -41,4 +43,11 @@ public interface IJoinedFlatTableDesc {
     // optionally present
     ISegment getSegment();
 
+    ///
+    TableRef getTable();
+
+    void setDataModel(DataModelDesc dataModelDesc);
+
+    void setTableName(String tableName);
+
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
index ee33e8a..928d3d2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
@@ -31,6 +31,7 @@ import org.apache.kylin.metadata.datatype.DataType;
 public class TblColRef implements Serializable {
 
     private static final String INNER_TABLE_NAME = "_kylin_table";
+    private String name;
 
     // used by projection rewrite, see OLAPProjectRel
     public enum InnerDataTypeEnum {
@@ -119,7 +120,7 @@ public class TblColRef implements Serializable {
     private String identity;
     private String parserDescription;
 
-    TblColRef(ColumnDesc column) {
+    public TblColRef(ColumnDesc column) {/////
         this.column = column;
     }
 
@@ -148,6 +149,10 @@ public class TblColRef implements Serializable {
         return column.getName();
     }
 
+    public void setName(String name) {
+        this.name = name;
+    }
+
     public TableRef getTableRef() {
         return table;
     }
@@ -168,6 +173,14 @@ public class TblColRef implements Serializable {
         }
     }
 
+    public String getExpressionInSourceDB1() {
+        return identity;
+    }
+
+    public void setExpressionInSourceDB(String identity) {
+        this.identity = identity;
+    }///
+
     public String getTable() {
         if (column.getTable() == null) {
             return null;
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index ae9dad2..d428d44 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -70,7 +70,7 @@ kylin.job.retry=0
 # you will have to specify kylin.job.remote-cli-hostname, kylin.job.remote-cli-username and kylin.job.remote-cli-password
 # It should not be set to "true" unless you're NOT running Kylin.sh on a hadoop client machine
 # (Thus kylin instance has to ssh to another real hadoop client machine to execute hbase,hive,hadoop commands)
-kylin.job.use-remote-cli=false
+kylin.job.use-remote-cli=true
 
 # Only necessary when kylin.job.use-remote-cli=true
 kylin.job.remote-cli-hostname=sandbox
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index 2ef4cdf..55df7f0 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -66,5 +66,9 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-source-hive</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 223e303..4ca60c5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -14,14 +14,20 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 package org.apache.kylin.source.kafka;
 
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.UUID;
 
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -31,7 +37,11 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.HiveCmdBuilder;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
@@ -39,6 +49,7 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -46,24 +57,35 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.JoinTableDesc;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.hive.CreateFlatHiveTableStep;
+import org.apache.kylin.source.hive.HiveMRInput;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
 import org.apache.kylin.source.kafka.job.MergeOffsetStep;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.kylin.source.hive.HiveMRInput.BatchCubingInputSide.materializeViewHql;
+
 public class KafkaMRInput implements IMRInput {
 
+    private static final Logger logger = LoggerFactory.getLogger(KafkaMRInput.class);
     CubeSegment cubeSegment;
 
     @Override
     public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
         this.cubeSegment = (CubeSegment) flatDesc.getSegment();
-        return new BatchCubingInputSide(cubeSegment);
+        return new BatchCubingInputSide(cubeSegment, flatDesc);
     }
 
     @Override
@@ -80,12 +102,14 @@ public class KafkaMRInput implements IMRInput {
     public static class KafkaTableInputFormat implements IMRTableInputFormat {
         private final CubeSegment cubeSegment;
         private final JobEngineConfig conf;
-        private final String delimiter;
+        private String delimiter;
+        private static final Logger logger = LoggerFactory.getLogger(KafkaTableInputFormat.class);
 
         public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) {
             this.cubeSegment = cubeSegment;
             this.conf = conf;
             this.delimiter = cubeSegment.getConfig().getFlatTableFieldDelimiter();
+            //delimiter="|"; //fixme
         }
 
         @Override
@@ -114,16 +138,250 @@ public class KafkaMRInput implements IMRInput {
 
         final JobEngineConfig conf;
         final CubeSegment seg;
+        CubeDesc cubeDesc ;
+        KylinConfig config;
         private String outputPath;
-
-        public BatchCubingInputSide(CubeSegment seg) {
+        protected IJoinedFlatTableDesc flatDesc;//
+        final protected String hiveTableDatabase;//
+        final protected String hiveIntermediateTableDatabase1;//
+        final protected String hdfsWorkingDir;//
+
+        String hiveViewIntermediateTables = "";//
+
+        public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
+            config = KylinConfig.getInstanceFromEnv();//
+            this.flatDesc = flatDesc;//
+            this.hiveTableDatabase = config.getHiveDatabaseForStreamingTable();//
+            this.hiveIntermediateTableDatabase1 = config.getHiveDatabaseForIntermediateTable();//
+            this.hdfsWorkingDir = config.getHdfsWorkingDirectory();//
             this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
             this.seg = seg;
         }
 
+        //这下面7个方法都是我新加的
+        protected void addStepPhase1_DoCreateHiveTable(DefaultChainedExecutable jobFlow) {
+            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase);
+            outputPath = getJobWorkingDir(jobFlow);///
+
+            jobFlow.addTask(createHiveTable(hiveInitStatements, cubeName));
+        }
+
+        protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
+            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveIntermediateTableDatabase1);
+            final String jobWorkingDir = getJobWorkingDir(jobFlow);
+
+            //change hdfspath
+            ///outputPath = hdfsWorkingDir + "kylin-" + UUID.randomUUID().toString();///
+            jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName));
+        }
+
+        protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
+            String jobWorkingDir = JobBuilderSupport.getJobWorkingDir(hdfsWorkingDir, jobFlow.getId());
+            if (KylinConfig.getInstanceFromEnv().getHiveTableDirCreateFirst()) {
+                checkAndCreateWorkDir(jobWorkingDir);
+            }
+            return jobWorkingDir;
+        }
+
+        private void checkAndCreateWorkDir(String jobWorkingDir) {
+            try {
+                Path path = new Path(jobWorkingDir);
+                FileSystem fileSystem = HadoopUtil.getFileSystem(path);
+                if (!fileSystem.exists(path)) {
+                    logger.info("Create jobWorkDir : " + jobWorkingDir);
+                    fileSystem.mkdirs(path);
+                }
+            } catch (IOException e) {
+                logger.error("Could not create lookUp table dir : " + jobWorkingDir);
+            }
+        }
+
+        private AbstractExecutable createHiveTable(String hiveInitStatements,
+                                                   String cubeName) {
+            final String dropTableHql = JoinedFlatTable.generateDropTableStatement1(flatDesc);
+            final String createTableHql = JoinedFlatTable.generateCreateTableStatement1(flatDesc, outputPath);
+
+            CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
+            step.setInitStatement(hiveInitStatements);
+            step.setCreateTableStatement(dropTableHql + createTableHql);
+            CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+            step.setName(ExecutableConstants.STEP_NAME_CREATE_HIVE_TABLE);
+            return step;
+        }
+
+        private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir,
+                                                           String cubeName) {
+
+            //TableRef tableRef = flatDesc.getDataModel().getRootFactTable();
+            //change table name and columns
+            //ColumnDesc columnDesc = new ColumnDesc();
+            //TableDesc tableDesc = tableRef.getTableDesc();
+            //tableDesc.setName(flatDesc.getDataModel().getRootFactTableName());/////////
+            //tableRef.setTableIdentity(tableDesc.getIdentity());
+
+            /*TblColRef tblColRef = null;
+            List<TblColRef> tblColRefList = Lists.newArrayList();
+            ColumnDesc[] columnDescs = tableDesc.getColumns();
+            for (int i = 0; i < flatDesc.getAllColumns().size(); i++) {
+                TblColRef col = flatDesc.getAllColumns().get(i);
+                col.setName(colName(col));
+                columnDescs[i].setName(colName(col));
+                columnDesc.setName(columnDescs[i].getName());
+                tblColRef = new TblColRef(columnDesc);
+                tblColRef.setExpressionInSourceDB(tableRef.getAlias() + "." + tableRef.getAlias() + "_" + columnDesc.getName());
+                tblColRefList.add(tblColRef);
+            }
+            CubeJoinedFlatTableDesc cubeJoinedFlatTableDesc = new CubeJoinedFlatTableDesc(seg);
+            cubeJoinedFlatTableDesc.setAllColumns(tblColRefList);*/
+            //tblColRef.setName(tableRef.getAlias() + "_" + columnDesc.getName());
+
+            //tableDesc.setColumns(columnDescs);
+            //tableRef.setTableDesc(tableDesc);
+            //tableRef.setColumn(tblColRef);
+            /*DataModelDesc dataModelDesc = flatDesc.getDataModel();
+            dataModelDesc.setRootFactTable(tableRef);
+            dataModelDesc.setRootFactTableName(flatDesc.getTableName());*/
+            cubeDesc = CubeDescManager.getInstance(config).getCubeDesc(cubeName);
+            flatDesc.setTableName(MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + cubeDesc.getName().toLowerCase() + "_"
+                    + seg.getUuid().replaceAll("-", "_") + Math.round(Math.random() * 10));
+            //flatDesc.setDataModel(dataModelDesc);
+
+            //from hive to hive
+            final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
+            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
+            String insertDataHqls = JoinedFlatTable.generateInsertDataStatement1(flatDesc);
+            outputPath = jobWorkingDir + "/" + flatDesc.getTableName();
+
+            CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
+            step.setInitStatement(hiveInitStatements);
+            step.setCreateTableStatement(dropTableHql + createTableHql + insertDataHqls);
+            CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+            step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+            return step;
+        }
+
+        private AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName) {
+            HiveMRInput.RedistributeFlatHiveTableStep step = new HiveMRInput.RedistributeFlatHiveTableStep();
+            step.setInitStatement(hiveInitStatements);
+            step.setIntermediateTable(flatDesc.getTableName());
+            step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc));
+            CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+            step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE);
+            return step;
+        }
+
+        protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase);
+            final String jobWorkingDir = getJobWorkingDir(jobFlow);
+
+            AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir);
+            if (task != null) {
+                jobFlow.addTask(task);
+            }
+        }
+
+        private ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements,
+                                                                        String jobWorkingDir) {
+            ShellExecutable step = new ShellExecutable();
+            step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
+
+            KylinConfig kylinConfig = ((CubeSegment) flatDesc.getSegment()).getConfig();
+            TableMetadataManager metadataManager = TableMetadataManager.getInstance(kylinConfig);
+            final Set<TableDesc> lookupViewsTables = Sets.newHashSet();
+
+            String prj = flatDesc.getDataModel().getProject();
+            for (JoinTableDesc lookupDesc : flatDesc.getDataModel().getJoinTables()) {
+                TableDesc tableDesc = metadataManager.getTableDesc(lookupDesc.getTable(), prj);
+                if (lookupDesc.getKind() == DataModelDesc.TableKind.LOOKUP && tableDesc.isView()) {
+                    lookupViewsTables.add(tableDesc);
+                }
+            }
+
+            if (lookupViewsTables.size() == 0) {
+                return null;
+            }
+
+            HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+            hiveCmdBuilder.overwriteHiveProps(kylinConfig.getHiveConfigOverride());
+            hiveCmdBuilder.addStatement(hiveInitStatements);
+            for (TableDesc lookUpTableDesc : lookupViewsTables) {
+                String identity = lookUpTableDesc.getIdentity();
+                String intermediate = lookUpTableDesc.getMaterializedName();
+                if (lookUpTableDesc.isView()) {
+                    String materializeViewHql = materializeViewHql(intermediate, identity, jobWorkingDir);
+                    hiveCmdBuilder.addStatement(materializeViewHql);
+                    hiveViewIntermediateTables = hiveViewIntermediateTables + intermediate + ";";
+                }
+            }
+
+            hiveViewIntermediateTables = hiveViewIntermediateTables.substring(0,
+                    hiveViewIntermediateTables.length() - 1);
+
+            step.setCmd(hiveCmdBuilder.build());
+            return step;
+        }
+
+        private static String colName(TblColRef col) {
+            return col.getTableAlias() + "_" + col.getName();
+        }
+
         @Override
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
             jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId()));
+
+            //下面开始是我的
+            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+            final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName)
+                    .getConfig();
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase);
+            ////String jobWorkingDir = getJobWorkingDir(jobFlow);/////
+
+            //judge the number of tables
+            Set<TableRef> dimTableCache = new HashSet<>();
+            DataModelDesc model = flatDesc.getDataModel();
+            for (JoinTableDesc lookupDesc : model.getJoinTables()) {
+                JoinDesc join = lookupDesc.getJoin();
+                if (join != null && join.getType().equals("") == false) {
+                    TableRef dimTable = lookupDesc.getTableRef();
+                    if (!dimTableCache.contains(dimTable)) {
+                        TblColRef[] pk = join.getPrimaryKeyColumns();
+                        TblColRef[] fk = join.getForeignKeyColumns();
+                        if (pk.length != fk.length) {
+                            throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
+                        }
+                        dimTable.getTableIdentity();
+                        dimTable.getAlias();
+                        dimTableCache.add(dimTable);
+                    }
+                }
+            }
+            if(dimTableCache.size() == 0){
+                logger.info("you only have a table");
+            }else{
+                // create hive table first
+                addStepPhase1_DoCreateHiveTable(jobFlow);
+                //createHiveTable(hiveInitStatements, jobWorkingDir, cubeName);
+
+                //change hdfspath
+                //jobWorkingDir = hdfsWorkingDir + "kylin-" + UUID.randomUUID().toString();
+
+                // next create flat table
+                //addStepPhase1_DoCreateFlatTable(jobFlow);
+                addStepPhase1_DoCreateFlatTable(jobFlow);//addStepPhase1_DoCreateFlatTable(jobFlow);
+                //createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName);
+
+
+                // then count and redistribute
+                if (cubeConfig.isHiveRedistributeEnabled()) {
+                    jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName));
+                }
+
+
+                // special for hive
+                addStepPhase1_DoMaterializeLookupTable(jobFlow);
+            }
         }
 
         private MapReduceExecutable createSaveKafkaDataStep(String jobId) {
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 0ab83c6..1d65b96 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -31,6 +31,7 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
@@ -224,7 +225,9 @@ public class KafkaSource implements ISource {
             public List<String> getRelatedKylinResources(TableDesc table) {
                 List<String> dependentResources = Lists.newArrayList();
                 dependentResources.add(KafkaConfig.concatResourcePath(table.getIdentity()));
-                dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity()));
+                if (table.getSourceType() == ISourceAware.ID_STREAMING) {
+                    dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity()));
+                }
                 return dependentResources;
             }
 
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
index 9fe29ca..a3c5e62 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
@@ -42,10 +42,13 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.StreamingParser;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritable, NullWritable, Text> {
 
     private NullWritable outKey = NullWritable.get();
+    private static final Logger logger = LoggerFactory.getLogger(KafkaFlatTableMapper.class);
     private Text outValue = new Text();
     private KylinConfig config;
     private CubeSegment cubeSegment;
@@ -63,6 +66,8 @@ public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritabl
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
         this.cubeSegment = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
         this.delimiter = cubeSegment.getConfig().getFlatTableFieldDelimiter();
+        //delimiter="|"; //fixme
+        logger.info("Use delimiter: " + delimiter);
         KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(config);
         KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cubeSegment.getCubeInstance().getRootFactTable());
         List<TblColRef> columns = new CubeJoinedFlatTableDesc(cubeSegment).getAllColumns();
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
index c996c5f..3681a98 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -55,6 +55,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
         final String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
         final Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN));
         final Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX));
+        final Integer spiltsSetnum = 1000;//这个参数是用来将来获取到从前台传来的数据的,现先假定一个固定值
 
         final Map<Integer, Long> startOffsetMap = Maps.newHashMap();
         final Map<Integer, Long> endOffsetMap = Maps.newHashMap();
@@ -79,9 +80,18 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
                     throw new IllegalStateException("Partition '" + partitionId + "' not exists.");
                 }
 
-                if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) {
-                    InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, startOffsetMap.get(partitionId), endOffsetMap.get(partitionId));
-                    splits.add(split);
+                long new_start = startOffsetMap.get(partitionId);
+                long end = endOffsetMap.get(partitionId);
+                while (end > new_start) {
+                    if ((end - new_start) <= spiltsSetnum && (end > new_start)) {
+                        InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, end);
+                        splits.add(split);
+                        break;
+                    } else {
+                        InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, new_start + spiltsSetnum);
+                        splits.add(split);
+                        new_start += spiltsSetnum;
+                    }
                 }
             }
         }
@@ -93,4 +103,4 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
         return new KafkaInputRecordReader();
     }
 
-}
+}
\ No newline at end of file
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/SpiltNumTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/SpiltNumTest.java
new file mode 100644
index 0000000..9dfb641
--- /dev/null
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/SpiltNumTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.kafka;
+
+import org.apache.kylin.source.kafka.hadoop.KafkaInputSplit;
+import org.junit.Assert;
+import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import com.google.common.collect.Maps;
+
+public class SpiltNumTest {
+
+    public static List<InputSplit> getSplits() {
+
+        final String brokers = "brokers";
+        final String inputTopic = "topic";
+        final Integer spiltsSetnum = 10;
+
+        final Map<Integer, Long> startOffsetMap = Maps.newHashMap();
+        final Map<Integer, Long> endOffsetMap = Maps.newHashMap();
+        startOffsetMap.put(0, Long.valueOf(0));
+        endOffsetMap.put(0, Long.valueOf(15));
+        startOffsetMap.put(1, Long.valueOf(4));
+        endOffsetMap.put(1, Long.valueOf(26));
+        startOffsetMap.put(2, Long.valueOf(15));
+        endOffsetMap.put(2, Long.valueOf(47));
+        startOffsetMap.put(3, Long.valueOf(39));
+        endOffsetMap.put(3, Long.valueOf(41));
+
+        final List<InputSplit> splits = new ArrayList<InputSplit>();
+            for (int i = 0; i < 4; i++) {
+                int partitionId = i;
+                long new_start = startOffsetMap.get(partitionId);
+                long end = endOffsetMap.get(partitionId);
+                while (end > new_start) {
+                    if ((end - new_start) <= spiltsSetnum && (end > new_start)) {
+                        InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, end);
+                        splits.add(split);
+                        break;
+                    } else {
+                        InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, new_start + spiltsSetnum);
+                        splits.add(split);
+                        new_start += spiltsSetnum;
+                    }
+                }
+            }
+        return splits;
+    }
+
+    @Test
+    public void testSpiltNum(){
+        int slen = 0;
+        List<InputSplit> splits = getSplits();
+        slen = splits.size();
+        Assert.assertEquals(slen, 10);
+    }
+
+    @Test
+    public void testSpilt(){
+        boolean flag = false;
+        boolean flag1 = false;
+        boolean flag2 = false;
+        boolean flag3 = false;
+        boolean flag4 = false;
+        boolean flag5 = false;
+        boolean flag6 = false;
+        boolean flag7 = false;
+        boolean flag8 = false;
+        boolean flag9 = false;
+        boolean flag10 = false;
+        boolean result = false;
+        List<InputSplit> splits = getSplits();
+        for(Object eachspilt : splits){
+            flag = eachspilt.toString().contains("brokers-topic-0-0-10");
+            if(flag){
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag1 = eachspilt.toString().contains("brokers-topic-0-10-15");
+            if(flag1){
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag2 = eachspilt.toString().contains("brokers-topic-1-4-14");
+            if(flag2){
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag3 = eachspilt.toString().contains("brokers-topic-1-14-24");
+            if(flag3){
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag4 = eachspilt.toString().contains("brokers-topic-1-24-26");
+            if(flag4){
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag5 = eachspilt.toString().contains("brokers-topic-2-15-25");
+            if(flag5) {
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag6 = eachspilt.toString().contains("brokers-topic-2-25-35");
+            if(flag6){
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag7 = eachspilt.toString().contains("brokers-topic-2-35-45");
+            if(flag7){
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag8 = eachspilt.toString().contains("brokers-topic-2-45-47");
+            if(flag8){
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag9 = eachspilt.toString().contains("brokers-topic-3-39-41");
+            if(flag9){
+                break;
+            }
+        }
+        for(Object eachspilt : splits){
+            flag10 = eachspilt.toString().contains("brokers-topic-0-4-47");
+            if(flag10){
+                break;
+            }
+        }
+        result = flag && flag1 && flag2 && flag3 && flag4 && flag5 && flag6 && flag7 && flag8 && flag9;
+        Assert.assertTrue(result);
+        Assert.assertNotEquals(flag10, true);
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.