You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/05/30 09:36:26 UTC

[kylin] 02/02: KYLIN-3378 Support Kafka table join with Hive tables

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch KYLIN-3369
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit ddf9432884f14b745c62ad6e74a331f299397733
Author: shaofengshi <sh...@apache.org>
AuthorDate: Wed May 30 17:35:38 2018 +0800

    KYLIN-3378 Support Kafka table join with Hive tables
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 -
 .../org/apache/kylin/common/util/BasicTest.java    |  12 -
 .../kylin/cube/model/CubeJoinedFlatTableDesc.java  |  28 +-
 .../cube/model/CubeJoinedFlatTableEnrich.java      |  15 +-
 .../model/validation/rule/StreamingCubeRule.java   |   6 -
 .../java/org/apache/kylin/job/JoinedFlatTable.java | 183 +---------
 .../kylin/metadata/model/IJoinedFlatTableDesc.java |   9 +-
 .../org/apache/kylin/metadata/model/TblColRef.java |  15 +-
 examples/test_case_data/sandbox/kylin.properties   |   2 +-
 .../org/apache/kylin/source/hive/HiveMRInput.java  |  68 ++--
 .../apache/kylin/source/kafka/KafkaMRInput.java    | 375 ++++++---------------
 .../kylin/source/kafka/config/KafkaConfig.java     |  12 +
 .../source/kafka/hadoop/KafkaFlatTableJob.java     |   7 +-
 .../source/kafka/hadoop/KafkaFlatTableMapper.java  |  14 +-
 .../source/kafka/hadoop/KafkaInputFormat.java      |   8 +-
 15 files changed, 211 insertions(+), 547 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 5d543f5..cdb3755 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -760,10 +760,6 @@ abstract public class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(this.getOptional("kylin.source.hive.keep-flat-table", "false"));
     }
 
-    public String getHiveDatabaseForStreamingTable() {
-        return this.getOptional("kylin.source.hive.database-for-streaming-table", "default");
-    }
-
     public String getHiveDatabaseForIntermediateTable() {
         return this.getOptional("kylin.source.hive.database-for-flat-table", "default");
     }
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index fcf302d..1c1e389 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -244,16 +244,4 @@ public class BasicTest {
         cal.setTimeInMillis(t);
         return dateFormat.format(cal.getTime());
     }
-
-    @Test
-    public void testStringSplit() {
-        String[] strings = new String[] {"abc", "bcd"};
-
-        String delimeter = ",";
-        String concated = StringUtils.join(Arrays.asList(strings), delimeter);
-
-        String[] newStrings = concated.split("\\" + delimeter);
-
-        Assert.assertEquals(strings, newStrings);
-    }
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index 63df4aa..70ad13e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -31,8 +31,6 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentRange;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.base.Preconditions;
@@ -48,8 +46,6 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
     protected final CubeDesc cubeDesc;///
     protected final CubeSegment cubeSegment;
     protected final boolean includingDerived;
-    TableRef tableRef;
-    DataModelDesc dataModelDesc;///
 
     private int columnCount = 0;
     private List<TblColRef> columnList = Lists.newArrayList();
@@ -139,6 +135,18 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
         }
     }
 
+    @Override
+    public List<TblColRef> getFactColumns() {
+        final List<TblColRef> factColumns = Lists.newArrayList();
+        for (TblColRef col : this.getAllColumns()) {
+            if (col.getTableRef().equals(getDataModel().getRootFactTable())) {
+                // only fetch the columns from fact table
+                factColumns.add(col);
+            }
+        }
+        return factColumns;
+    }
+
     // sanity check the input record (in bytes) matches what's expected
     public void sanityCheck(BytesSplitter bytesSplitter) {
         if (columnCount != bytesSplitter.getBufferSize()) {
@@ -175,6 +183,9 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
 
     @Override
     public SegmentRange getSegRange() {
+        if (cubeSegment.isOffsetCube()) {
+            return null;
+        }
         return cubeSegment.getSegRange();
     }
 
@@ -189,13 +200,8 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
     }
 
     @Override
-    public TableRef getTable() {/////
-        return tableRef;
-    }
-
-    @Override
-    public void setTableName(String tableName) {
-        this.tableName = tableName;
+    public boolean useAlias() {
+        return true;
     }
 
     @Override
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
index b314cc2..f09478e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
@@ -28,7 +28,6 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentRange;
-import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 
 /**
@@ -106,6 +105,11 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, Serializ
     }
 
     @Override
+    public List<TblColRef> getFactColumns() {
+        return flatDesc.getFactColumns();
+    }
+
+    @Override
     public DataModelDesc getDataModel() {
         return flatDesc.getDataModel();
     }
@@ -131,13 +135,8 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, Serializ
     }
 
     @Override
-    public TableRef getTable() {///
-        return null;
-    }
-
-    @Override
-    public void setTableName(String tableName) {
-
+    public boolean useAlias() {
+        return flatDesc.useAlias();
     }
 
     @Override
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
index dab8fa4..647f4c1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java
@@ -24,7 +24,6 @@ import org.apache.kylin.cube.model.validation.IValidatorRule;
 import org.apache.kylin.cube.model.validation.ResultLevel;
 import org.apache.kylin.cube.model.validation.ValidateContext;
 import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.IEngineAware;
 import org.apache.kylin.metadata.model.ISourceAware;
 
 import org.apache.kylin.metadata.model.TblColRef;
@@ -49,11 +48,6 @@ public class StreamingCubeRule implements IValidatorRule<CubeDesc> {
             return;
         }
 
-        if (cube.getEngineType() == IEngineAware.ID_SPARK) {
-            context.addResult(ResultLevel.ERROR, "Spark engine doesn't support streaming source, select MapReduce engine instead.");
-            return;
-        }
-
         if (model.getPartitionDesc() == null || model.getPartitionDesc().getPartitionDateColumn() == null) {
             context.addResult(ResultLevel.ERROR, "Must define a partition column.");
             return;
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index ac38730..0769dcf 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -44,6 +44,8 @@ import org.apache.kylin.metadata.model.TblColRef;
 
 public class JoinedFlatTable {
 
+    public static final String TEXTFILE = "TEXTFILE";
+
     public static String getTableDir(IJoinedFlatTableDesc flatDesc, String storageDfsDir) {
         return storageDfsDir + "/" + flatDesc.getTableName();
     }
@@ -78,10 +80,10 @@ public class JoinedFlatTable {
             if (i > 0) {
                 ddl.append(",");
             }
-            ddl.append(colName(col) + " " + getHiveDataType(col.getDatatype()) + "\n");
+            ddl.append(colName(col, flatDesc.useAlias()) + " " + getHiveDataType(col.getDatatype()) + "\n");
         }
         ddl.append(")" + "\n");
-        if ("TEXTFILE".equals(storageFormat)) {
+        if (TEXTFILE.equals(storageFormat)) {
             ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\" + fieldDelimiter + "'\n");
         }
         ddl.append("STORED AS " + storageFormat + "\n");
@@ -90,41 +92,6 @@ public class JoinedFlatTable {
         return ddl.toString();
     }
 
-    public static String generateCreateTableStatement1(IJoinedFlatTableDesc flatDesc, String storageDfsDir) {
-        String storageFormat = flatDesc.getDataModel().getConfig().getFlatTableStorageFormat();
-        return generateCreateTableStatement1(flatDesc, storageDfsDir, storageFormat);
-    }
-
-    public static String generateCreateTableStatement1(IJoinedFlatTableDesc flatDesc, String storageDfsDir,
-            String storageFormat) {
-        String fieldDelimiter = flatDesc.getDataModel().getConfig().getFlatTableFieldDelimiter();
-        return generateCreateTableStatement1(flatDesc, storageDfsDir, storageFormat, fieldDelimiter);
-    }
-
-    public static String generateCreateTableStatement1(IJoinedFlatTableDesc flatDesc, String storageDfsDir,
-            String storageFormat, String fieldDelimiter) {
-        StringBuilder ddl = new StringBuilder();
-
-        ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + flatDesc.getDataModel().getRootFactTableName() + "\n");////flatDesc.getTableName()
-
-        ddl.append("(" + "\n");
-        for (int i = 0; i < flatDesc.getAllColumns().size(); i++) {
-            TblColRef col = flatDesc.getAllColumns().get(i);
-            if (i > 0) {
-                ddl.append(",");
-            }
-            ddl.append(col.getName() + " " + getHiveDataType(col.getDatatype()) + "\n");
-        }
-        ddl.append(")" + "\n");
-        if ("TEXTFILE".equals(storageFormat)) {
-            ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '" + fieldDelimiter + "'\n");
-        }
-        ddl.append("STORED AS " + storageFormat + "\n");
-        ddl.append("LOCATION '" + getTableDir(flatDesc, storageDfsDir) + "';").append("\n");
-        ddl.append("ALTER TABLE " + flatDesc.getDataModel().getRootFactTableName() + " SET TBLPROPERTIES('auto.purge'='true');\n");
-        return ddl.toString();
-    }
-
     public static String generateDropTableStatement(IJoinedFlatTableDesc flatDesc) {
         StringBuilder ddl = new StringBuilder();
         ddl.append("DROP TABLE IF EXISTS " + flatDesc.getTableName() + ";").append("\n");
@@ -161,43 +128,10 @@ public class JoinedFlatTable {
                 + ";\n";
     }
 
-    public static String generateInsertDataStatement1(IJoinedFlatTableDesc flatDesc) {
-        CubeSegment segment = ((CubeSegment) flatDesc.getSegment());
-        KylinConfig kylinConfig;
-        if (null == segment) {
-            kylinConfig = KylinConfig.getInstanceFromEnv();
-        } else {
-            kylinConfig = (flatDesc.getSegment()).getConfig();
-        }
-
-        if (kylinConfig.isAdvancedFlatTableUsed()) {
-            try {
-                Class advancedFlatTable = Class.forName(kylinConfig.getAdvancedFlatTableClass());
-                Method method = advancedFlatTable.getMethod("generateInsertDataStatement", IJoinedFlatTableDesc.class,
-                        JobEngineConfig.class);
-                return (String) method.invoke(null, flatDesc);
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        return "INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement1(flatDesc)
-                + ";\n";
-    }
-
-    public static String generateInsertPartialDataStatement(IJoinedFlatTableDesc flatDesc) {
-        return "INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc)
-                + ";\n";
-    }
-
     public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc) {
         return generateSelectDataStatement(flatDesc, false, null);
     }
 
-    public static String generateSelectDataStatement1(IJoinedFlatTableDesc flatDesc) {
-        return generateSelectDataStatement1(flatDesc, false, null);
-    }
-
     public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc, boolean singleLine,
             String[] skipAs) {
         final String sep = singleLine ? " " : "\n";
@@ -215,7 +149,7 @@ public class JoinedFlatTable {
             if (skipAsList.contains(colTotalName)) {
                 sql.append(col.getExpressionInSourceDB() + sep);
             } else {
-                sql.append(col.getExpressionInSourceDB() + " as " + colName(col) + sep);
+                sql.append(col.getExpressionInSourceDB() + " as " + colName(col, true) + sep);
             }
         }
         appendJoinStatement(flatDesc, sql, singleLine);
@@ -223,40 +157,6 @@ public class JoinedFlatTable {
         return sql.toString();
     }
 
-    public static String generateSelectDataStatement1(IJoinedFlatTableDesc flatDesc, boolean singleLine,
-                                                     String[] skipAs) {
-        final String sep = singleLine ? " " : "\n";
-        final List<String> skipAsList = (skipAs == null) ? new ArrayList<String>() : Arrays.asList(skipAs);
-
-        StringBuilder sql = new StringBuilder();
-        sql.append("SELECT" + sep);
-
-        for (int i = 0; i < flatDesc.getAllColumns().size(); i++) {
-            TblColRef col = flatDesc.getAllColumns().get(i);
-            if (i > 0) {
-                sql.append(",");
-            }
-            String colTotalName = String.format("%s.%s", col.getTableRef().getTableName(), col.getName());
-            if (skipAsList.contains(colTotalName)) {
-                sql.append(col.getExpressionInSourceDB() + sep);
-            } else {
-                sql.append(col.getExpressionInSourceDB() + " as " + colName(col) + sep);
-            }
-        }
-        appendJoinStatement1(flatDesc, sql, singleLine);
-        appendWhereStatement1(flatDesc, sql, singleLine);
-        return sql.toString();
-    }
-
-    public static String generateCountDataStatement(IJoinedFlatTableDesc flatDesc, final String outputDir) {
-        final StringBuilder sql = new StringBuilder();
-        final TableRef rootTbl = flatDesc.getDataModel().getRootFactTable();
-        sql.append("dfs -mkdir -p " + outputDir + ";\n");
-        sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + rootTbl.getTableIdentity()
-                + " " + rootTbl.getAlias() + "\n");
-        appendWhereStatement(flatDesc, sql);
-        return sql.toString();
-    }
 
     public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) {
         final String sep = singleLine ? " " : "\n";
@@ -264,7 +164,7 @@ public class JoinedFlatTable {
 
         DataModelDesc model = flatDesc.getDataModel();
         TableRef rootTable = model.getRootFactTable();
-        sql.append("FROM " + rootTable.getTableIdentity() + " as " + rootTable.getAlias() + " " + sep);//这flatDesc.getTableName()    rootTable.getTableIdentity()
+        sql.append("FROM " + rootTable.getTableIdentity() + " as " + rootTable.getAlias() + " " + sep);
 
         for (JoinTableDesc lookupDesc : model.getJoinTables()) {
             JoinDesc join = lookupDesc.getJoin();
@@ -275,42 +175,8 @@ public class JoinedFlatTable {
                     TblColRef[] fk = join.getForeignKeyColumns();
                     if (pk.length != fk.length) {
                         throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
-                    }                String joinType = join.getType().toUpperCase();
-
-                    sql.append(joinType + " JOIN " + dimTable.getTableIdentity() + " as " + dimTable.getAlias() + sep);
-                    sql.append("ON ");
-                    for (int i = 0; i < pk.length; i++) {
-                        if (i > 0) {
-                            sql.append(" AND ");
-                        }
-                        sql.append(fk[i].getExpressionInSourceDB1() + " = " + pk[i].getExpressionInSourceDB());
                     }
-                    sql.append(sep);
-
-                    dimTableCache.add(dimTable);
-                }
-            }
-        }
-    }
-
-    public static void appendJoinStatement1(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) {
-        final String sep = singleLine ? " " : "\n";
-        Set<TableRef> dimTableCache = new HashSet<>();
-
-        DataModelDesc model = flatDesc.getDataModel();
-        TableRef rootTable = model.getRootFactTable();
-        sql.append("FROM " + rootTable.getTableIdentity() + " as " + rootTable.getAlias() + " " + sep);//这flatDesc.getTableName()    rootTable.getTableIdentity()
-
-        for (JoinTableDesc lookupDesc : model.getJoinTables()) {
-            JoinDesc join = lookupDesc.getJoin();
-            if (join != null && join.getType().equals("") == false) {
-                TableRef dimTable = lookupDesc.getTableRef();
-                if (!dimTableCache.contains(dimTable)) {
-                    TblColRef[] pk = join.getPrimaryKeyColumns();
-                    TblColRef[] fk = join.getForeignKeyColumns();
-                    if (pk.length != fk.length) {
-                        throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
-                    }                String joinType = join.getType().toUpperCase();
+                    String joinType = join.getType().toUpperCase();
 
                     sql.append(joinType + " JOIN " + dimTable.getTableIdentity() + " as " + dimTable.getAlias() + sep);
                     sql.append("ON ");
@@ -318,7 +184,7 @@ public class JoinedFlatTable {
                         if (i > 0) {
                             sql.append(" AND ");
                         }
-                        sql.append(fk[i].getExpressionInSourceDB1() + " = " + pk[i].getExpressionInSourceDB());
+                        sql.append(fk[i].getExpressionInSourceDB() + " = " + pk[i].getExpressionInSourceDB());
                     }
                     sql.append(sep);
 
@@ -330,7 +196,7 @@ public class JoinedFlatTable {
 
     private static void appendDistributeStatement(StringBuilder sql, TblColRef redistCol) {
         if (redistCol != null) {
-            sql.append(" DISTRIBUTE BY ").append(colName(redistCol)).append(";\n");
+            sql.append(" DISTRIBUTE BY ").append(colName(redistCol, true)).append(";\n");
         } else {
             sql.append(" DISTRIBUTE BY RAND()").append(";\n");
         }
@@ -372,36 +238,13 @@ public class JoinedFlatTable {
         sql.append(whereBuilder.toString());
     }
 
-    private static void appendWhereStatement1(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) {
-        final String sep = singleLine ? " " : "\n";
-
-        StringBuilder whereBuilder = new StringBuilder();
-        whereBuilder.append("WHERE 1=1");
-
-        DataModelDesc model = flatDesc.getDataModel();
-        if (StringUtils.isNotEmpty(model.getFilterCondition())) {
-            whereBuilder.append(" AND (").append(model.getFilterCondition()).append(") ");
-        }
-
-        /*if (flatDesc.getSegment() != null) {
-            PartitionDesc partDesc = model.getPartitionDesc();
-            if (partDesc != null && partDesc.getPartitionDateColumn() != null) {
-                SegmentRange segRange = flatDesc.getSegRange();
-
-                if (segRange != null && !segRange.isInfinite()) {
-                    whereBuilder.append(" AND (");
-                    whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc,
-                            flatDesc.getSegment(), segRange));
-                    whereBuilder.append(")" + sep);
-                }
-            }
-        }*/
 
-        sql.append(whereBuilder.toString());
+    private static String colName(TblColRef col) {
+        return colName(col, true);
     }
 
-    private static String colName(TblColRef col) {
-        return col.getTableAlias() + "_" + col.getName();
+    private static String colName(TblColRef col, boolean useAlias) {
+        return useAlias ? col.getTableAlias() + "_" + col.getName() : col.getName();
     }
 
     private static String getHiveDataType(String javaDataType) {
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index 46e21f3..8f86a52 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -30,7 +30,7 @@ public interface IJoinedFlatTableDesc {
 
     List<TblColRef> getAllColumns();
 
-    void setAllColumns(List<TblColRef> tblColRefList);
+    List<TblColRef> getFactColumns();
 
     int getColumnIndex(TblColRef colRef);
 
@@ -43,11 +43,6 @@ public interface IJoinedFlatTableDesc {
     // optionally present
     ISegment getSegment();
 
-    ///
-    TableRef getTable();
-
-    void setDataModel(DataModelDesc dataModelDesc);
-
-    void setTableName(String tableName);
+    boolean useAlias();
 
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
index 928d3d2..ee33e8a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
@@ -31,7 +31,6 @@ import org.apache.kylin.metadata.datatype.DataType;
 public class TblColRef implements Serializable {
 
     private static final String INNER_TABLE_NAME = "_kylin_table";
-    private String name;
 
     // used by projection rewrite, see OLAPProjectRel
     public enum InnerDataTypeEnum {
@@ -120,7 +119,7 @@ public class TblColRef implements Serializable {
     private String identity;
     private String parserDescription;
 
-    public TblColRef(ColumnDesc column) {/////
+    TblColRef(ColumnDesc column) {
         this.column = column;
     }
 
@@ -149,10 +148,6 @@ public class TblColRef implements Serializable {
         return column.getName();
     }
 
-    public void setName(String name) {
-        this.name = name;
-    }
-
     public TableRef getTableRef() {
         return table;
     }
@@ -173,14 +168,6 @@ public class TblColRef implements Serializable {
         }
     }
 
-    public String getExpressionInSourceDB1() {
-        return identity;
-    }
-
-    public void setExpressionInSourceDB(String identity) {
-        this.identity = identity;
-    }///
-
     public String getTable() {
         if (column.getTable() == null) {
             return null;
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index d428d44..ae9dad2 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -70,7 +70,7 @@ kylin.job.retry=0
 # you will have to specify kylin.job.remote-cli-hostname, kylin.job.remote-cli-username and kylin.job.remote-cli-password
 # It should not be set to "true" unless you're NOT running Kylin.sh on a hadoop client machine
 # (Thus kylin instance has to ssh to another real hadoop client machine to execute hbase,hive,hadoop commands)
-kylin.job.use-remote-cli=true
+kylin.job.use-remote-cli=false
 
 # Only necessary when kylin.job.use-remote-cli=true
 kylin.job.remote-cli-hostname=sandbox
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index a96f4d5..0e791eb 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -33,6 +34,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.HiveCmdBuilder;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -284,8 +286,8 @@ public class HiveMRInput implements IMRInput {
 
             GarbageCollectionStep step = new GarbageCollectionStep();
             step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
-            step.setIntermediateTableIdentity(getIntermediateTableIdentity());
-            step.setExternalDataPath(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir));
+            step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
+            step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));
             step.setHiveViewIntermediateTableIdentities(hiveViewIntermediateTables);
             jobFlow.addTask(step);
         }
@@ -435,42 +437,58 @@ public class HiveMRInput implements IMRInput {
 
         private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException {
             StringBuffer output = new StringBuffer();
-            final String hiveTable = this.getIntermediateTableIdentity();
-            if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
-                final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
-                hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
-                hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  " + hiveTable + ";");
-                config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
-                output.append("Hive table " + hiveTable + " is dropped. \n");
-                rmdirOnHDFS(getExternalDataPath());
-                output.append(
-                        "Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n");
+            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+            final List<String> hiveTables = this.getIntermediateTables();
+            for (String hiveTable : hiveTables) {
+                if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
+                    hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
+                    hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  " + hiveTable + ";");
+
+                    output.append("Hive table " + hiveTable + " is dropped. \n");
+                }
             }
+            config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
+            rmdirOnHDFS(getExternalDataPaths());
+            output.append(
+                    "Path " + getExternalDataPaths() + " is deleted. \n");
+
             return output.toString();
         }
 
-        private void rmdirOnHDFS(String path) throws IOException {
-            Path externalDataPath = new Path(path);
-            FileSystem fs = HadoopUtil.getWorkingFileSystem();
-            if (fs.exists(externalDataPath)) {
-                fs.delete(externalDataPath, true);
+        private void rmdirOnHDFS(List<String> paths) throws IOException {
+            for (String path : paths) {
+                Path externalDataPath = new Path(path);
+                FileSystem fs = HadoopUtil.getWorkingFileSystem();
+                if (fs.exists(externalDataPath)) {
+                    fs.delete(externalDataPath, true);
+                }
             }
         }
 
-        public void setIntermediateTableIdentity(String tableIdentity) {
-            setParam("oldHiveTable", tableIdentity);
+        public void setIntermediateTables(List<String> tableIdentity) {
+            setParam("oldHiveTables", StringUtil.join(tableIdentity, ","));
         }
 
-        private String getIntermediateTableIdentity() {
-            return getParam("oldHiveTable");
+        private List<String> getIntermediateTables() {
+            List<String> intermediateTables = Lists.newArrayList();
+            String[] tables = StringUtil.splitAndTrim(getParam("oldHiveTables"), ",");
+            for (String t : tables) {
+                intermediateTables.add(t);
+            }
+            return intermediateTables;
         }
 
-        public void setExternalDataPath(String externalDataPath) {
-            setParam("externalDataPath", externalDataPath);
+        public void setExternalDataPaths(List<String> externalDataPaths) {
+            setParam("externalDataPaths", StringUtil.join(externalDataPaths, ","));
         }
 
-        private String getExternalDataPath() {
-            return getParam("externalDataPath");
+        private List<String> getExternalDataPaths() {
+            String[] paths = StringUtil.splitAndTrim(getParam("externalDataPaths"), ",");
+            List<String> result = Lists.newArrayList();
+            for (String s : paths) {
+                result.add(s);
+            }
+            return result;
         }
 
         public void setHiveViewIntermediateTableIdentities(String tableIdentities) {
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 4ca60c5..b2c5360 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -20,15 +20,9 @@ package org.apache.kylin.source.kafka;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
-import java.util.UUID;
 
-import javax.annotation.Nullable;
-
-import com.google.common.collect.Sets;
-import org.apache.hadoop.fs.FileSystem;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -36,10 +30,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.HiveCmdBuilder;
-import org.apache.kylin.cube.CubeDescManager;
-import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
@@ -49,23 +39,16 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
-import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.JoinTableDesc;
+import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.hive.CreateFlatHiveTableStep;
 import org.apache.kylin.source.hive.HiveMRInput;
@@ -75,8 +58,6 @@ import org.apache.kylin.source.kafka.job.MergeOffsetStep;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.kylin.source.hive.HiveMRInput.BatchCubingInputSide.materializeViewHql;
-
 public class KafkaMRInput implements IMRInput {
 
     private static final Logger logger = LoggerFactory.getLogger(KafkaMRInput.class);
@@ -103,13 +84,11 @@ public class KafkaMRInput implements IMRInput {
         private final CubeSegment cubeSegment;
         private final JobEngineConfig conf;
         private String delimiter;
-        private static final Logger logger = LoggerFactory.getLogger(KafkaTableInputFormat.class);
 
         public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) {
             this.cubeSegment = cubeSegment;
             this.conf = conf;
             this.delimiter = cubeSegment.getConfig().getFlatTableFieldDelimiter();
-            //delimiter="|"; //fixme
         }
 
         @Override
@@ -136,266 +115,142 @@ public class KafkaMRInput implements IMRInput {
 
     public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
 
-        final JobEngineConfig conf;
         final CubeSegment seg;
-        CubeDesc cubeDesc ;
-        KylinConfig config;
-        private String outputPath;
-        protected IJoinedFlatTableDesc flatDesc;//
-        final protected String hiveTableDatabase;//
-        final protected String hiveIntermediateTableDatabase1;//
-        final protected String hdfsWorkingDir;//
-
-        String hiveViewIntermediateTables = "";//
+        private CubeDesc cubeDesc ;
+        private KylinConfig config;
+        protected IJoinedFlatTableDesc flatDesc;
+        protected String hiveTableDatabase;
+        private List<String> intermediateTables = Lists.newArrayList();
+        private List<String> intermediatePaths = Lists.newArrayList();
 
         public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
-            config = KylinConfig.getInstanceFromEnv();//
-            this.flatDesc = flatDesc;//
-            this.hiveTableDatabase = config.getHiveDatabaseForStreamingTable();//
-            this.hiveIntermediateTableDatabase1 = config.getHiveDatabaseForIntermediateTable();//
-            this.hdfsWorkingDir = config.getHdfsWorkingDirectory();//
-            this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+            config = seg.getConfig();
+            this.flatDesc = flatDesc;
+            this.hiveTableDatabase = config.getHiveDatabaseForIntermediateTable();
             this.seg = seg;
+            this.cubeDesc = seg.getCubeDesc();
         }
 
-        //这下面7个方法都是我新加的
-        protected void addStepPhase1_DoCreateHiveTable(DefaultChainedExecutable jobFlow) {
-            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase);
-            outputPath = getJobWorkingDir(jobFlow);///
-
-            jobFlow.addTask(createHiveTable(hiveInitStatements, cubeName));
-        }
-
-        protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
-            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveIntermediateTableDatabase1);
-            final String jobWorkingDir = getJobWorkingDir(jobFlow);
-
-            //change hdfspath
-            ///outputPath = hdfsWorkingDir + "kylin-" + UUID.randomUUID().toString();///
-            jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName));
-        }
-
-        protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
-            String jobWorkingDir = JobBuilderSupport.getJobWorkingDir(hdfsWorkingDir, jobFlow.getId());
-            if (KylinConfig.getInstanceFromEnv().getHiveTableDirCreateFirst()) {
-                checkAndCreateWorkDir(jobWorkingDir);
-            }
-            return jobWorkingDir;
-        }
+        @Override
+        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
 
-        private void checkAndCreateWorkDir(String jobWorkingDir) {
-            try {
-                Path path = new Path(jobWorkingDir);
-                FileSystem fileSystem = HadoopUtil.getFileSystem(path);
-                if (!fileSystem.exists(path)) {
-                    logger.info("Create jobWorkDir : " + jobWorkingDir);
-                    fileSystem.mkdirs(path);
-                }
-            } catch (IOException e) {
-                logger.error("Could not create lookUp table dir : " + jobWorkingDir);
+            boolean onlyOneTable = cubeDesc.getModel().getLookupTables().size() == 0;
+            final String baseLocation = getJobWorkingDir(jobFlow);
+            if (onlyOneTable) {
+                // directly use flat table name
+                final String intermediateFactTable = flatDesc.getTableName();
+                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + intermediateFactTable));
+                jobFlow.addTask(createMockFactTable(intermediateFactTable, baseLocation));
+                intermediateTables.add(intermediateFactTable);
+                intermediatePaths.add(baseLocation + "/" + intermediateFactTable);
+            } else {
+                final String mockFactTableName =  MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + cubeDesc.getName().toLowerCase() + "_"
+                        + seg.getUuid().replaceAll("-", "_") + "_fact";
+                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName));
+                jobFlow.addTask(createMockFactTable(mockFactTableName, baseLocation));
+                jobFlow.addTask(createFlatTable(mockFactTableName, baseLocation));
+                intermediateTables.add(flatDesc.getTableName());
+                intermediateTables.add(mockFactTableName);
+                intermediatePaths.add(baseLocation + "/" + flatDesc.getTableName());
+                intermediatePaths.add(baseLocation + "/" + mockFactTableName);
             }
         }
+        private AbstractExecutable createFlatTable(String mockRootTableName, String baseLocation) {
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase);
 
-        private AbstractExecutable createHiveTable(String hiveInitStatements,
-                                                   String cubeName) {
-            final String dropTableHql = JoinedFlatTable.generateDropTableStatement1(flatDesc);
-            final String createTableHql = JoinedFlatTable.generateCreateTableStatement1(flatDesc, outputPath);
-
-            CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
-            step.setInitStatement(hiveInitStatements);
-            step.setCreateTableStatement(dropTableHql + createTableHql);
-            CubingExecutableUtil.setCubeName(cubeName, step.getParams());
-            step.setName(ExecutableConstants.STEP_NAME_CREATE_HIVE_TABLE);
-            return step;
-        }
-
-        private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir,
-                                                           String cubeName) {
-
-            //TableRef tableRef = flatDesc.getDataModel().getRootFactTable();
-            //change table name and columns
-            //ColumnDesc columnDesc = new ColumnDesc();
-            //TableDesc tableDesc = tableRef.getTableDesc();
-            //tableDesc.setName(flatDesc.getDataModel().getRootFactTableName());/////////
-            //tableRef.setTableIdentity(tableDesc.getIdentity());
-
-            /*TblColRef tblColRef = null;
-            List<TblColRef> tblColRefList = Lists.newArrayList();
-            ColumnDesc[] columnDescs = tableDesc.getColumns();
-            for (int i = 0; i < flatDesc.getAllColumns().size(); i++) {
-                TblColRef col = flatDesc.getAllColumns().get(i);
-                col.setName(colName(col));
-                columnDescs[i].setName(colName(col));
-                columnDesc.setName(columnDescs[i].getName());
-                tblColRef = new TblColRef(columnDesc);
-                tblColRef.setExpressionInSourceDB(tableRef.getAlias() + "." + tableRef.getAlias() + "_" + columnDesc.getName());
-                tblColRefList.add(tblColRef);
-            }
-            CubeJoinedFlatTableDesc cubeJoinedFlatTableDesc = new CubeJoinedFlatTableDesc(seg);
-            cubeJoinedFlatTableDesc.setAllColumns(tblColRefList);*/
-            //tblColRef.setName(tableRef.getAlias() + "_" + columnDesc.getName());
-
-            //tableDesc.setColumns(columnDescs);
-            //tableRef.setTableDesc(tableDesc);
-            //tableRef.setColumn(tblColRef);
-            /*DataModelDesc dataModelDesc = flatDesc.getDataModel();
-            dataModelDesc.setRootFactTable(tableRef);
-            dataModelDesc.setRootFactTableName(flatDesc.getTableName());*/
-            cubeDesc = CubeDescManager.getInstance(config).getCubeDesc(cubeName);
-            flatDesc.setTableName(MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + cubeDesc.getName().toLowerCase() + "_"
-                    + seg.getUuid().replaceAll("-", "_") + Math.round(Math.random() * 10));
-            //flatDesc.setDataModel(dataModelDesc);
-
-            //from hive to hive
             final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
-            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
-            String insertDataHqls = JoinedFlatTable.generateInsertDataStatement1(flatDesc);
-            outputPath = jobWorkingDir + "/" + flatDesc.getTableName();
+            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, baseLocation);
+            String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
+            insertDataHqls = insertDataHqls.replace(flatDesc.getDataModel().getRootFactTableName() + " ", mockRootTableName + " ");
 
             CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
+            CubingExecutableUtil.setCubeName(cubeDesc.getName(), step.getParams());
             step.setInitStatement(hiveInitStatements);
             step.setCreateTableStatement(dropTableHql + createTableHql + insertDataHqls);
-            CubingExecutableUtil.setCubeName(cubeName, step.getParams());
             step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
-            return step;
-        }
 
-        private AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName) {
-            HiveMRInput.RedistributeFlatHiveTableStep step = new HiveMRInput.RedistributeFlatHiveTableStep();
-            step.setInitStatement(hiveInitStatements);
-            step.setIntermediateTable(flatDesc.getTableName());
-            step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc));
-            CubingExecutableUtil.setCubeName(cubeName, step.getParams());
-            step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE);
             return step;
         }
 
-        protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
-            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase);
-            final String jobWorkingDir = getJobWorkingDir(jobFlow);
-
-            AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir);
-            if (task != null) {
-                jobFlow.addTask(task);
-            }
+        protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
+            return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
         }
 
-        private ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements,
-                                                                        String jobWorkingDir) {
-            ShellExecutable step = new ShellExecutable();
-            step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
-
-            KylinConfig kylinConfig = ((CubeSegment) flatDesc.getSegment()).getConfig();
-            TableMetadataManager metadataManager = TableMetadataManager.getInstance(kylinConfig);
-            final Set<TableDesc> lookupViewsTables = Sets.newHashSet();
+        private AbstractExecutable createMockFactTable(final String mockTalbeName, String baseLocation) {
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase);
+            final IJoinedFlatTableDesc mockfactDesc = new IJoinedFlatTableDesc() {
 
-            String prj = flatDesc.getDataModel().getProject();
-            for (JoinTableDesc lookupDesc : flatDesc.getDataModel().getJoinTables()) {
-                TableDesc tableDesc = metadataManager.getTableDesc(lookupDesc.getTable(), prj);
-                if (lookupDesc.getKind() == DataModelDesc.TableKind.LOOKUP && tableDesc.isView()) {
-                    lookupViewsTables.add(tableDesc);
+                @Override
+                public String getTableName() {
+                    return mockTalbeName;
                 }
-            }
 
-            if (lookupViewsTables.size() == 0) {
-                return null;
-            }
-
-            HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
-            hiveCmdBuilder.overwriteHiveProps(kylinConfig.getHiveConfigOverride());
-            hiveCmdBuilder.addStatement(hiveInitStatements);
-            for (TableDesc lookUpTableDesc : lookupViewsTables) {
-                String identity = lookUpTableDesc.getIdentity();
-                String intermediate = lookUpTableDesc.getMaterializedName();
-                if (lookUpTableDesc.isView()) {
-                    String materializeViewHql = materializeViewHql(intermediate, identity, jobWorkingDir);
-                    hiveCmdBuilder.addStatement(materializeViewHql);
-                    hiveViewIntermediateTables = hiveViewIntermediateTables + intermediate + ";";
+                @Override
+                public DataModelDesc getDataModel() {
+                    return cubeDesc.getModel();
                 }
-            }
 
-            hiveViewIntermediateTables = hiveViewIntermediateTables.substring(0,
-                    hiveViewIntermediateTables.length() - 1);
-
-            step.setCmd(hiveCmdBuilder.build());
-            return step;
-        }
-
-        private static String colName(TblColRef col) {
-            return col.getTableAlias() + "_" + col.getName();
-        }
+                @Override
+                public List<TblColRef> getAllColumns() {
+                    return flatDesc.getFactColumns();
+                }
 
-        @Override
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
-            jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId()));
+                @Override
+                public List<TblColRef> getFactColumns() {
+                    return null;
+                }
 
-            //下面开始是我的
-            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName)
-                    .getConfig();
-            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase);
-            ////String jobWorkingDir = getJobWorkingDir(jobFlow);/////
-
-            //judge the number of tables
-            Set<TableRef> dimTableCache = new HashSet<>();
-            DataModelDesc model = flatDesc.getDataModel();
-            for (JoinTableDesc lookupDesc : model.getJoinTables()) {
-                JoinDesc join = lookupDesc.getJoin();
-                if (join != null && join.getType().equals("") == false) {
-                    TableRef dimTable = lookupDesc.getTableRef();
-                    if (!dimTableCache.contains(dimTable)) {
-                        TblColRef[] pk = join.getPrimaryKeyColumns();
-                        TblColRef[] fk = join.getForeignKeyColumns();
-                        if (pk.length != fk.length) {
-                            throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
-                        }
-                        dimTable.getTableIdentity();
-                        dimTable.getAlias();
-                        dimTableCache.add(dimTable);
-                    }
+                @Override
+                public int getColumnIndex(TblColRef colRef) {
+                    return 0;
                 }
-            }
-            if(dimTableCache.size() == 0){
-                logger.info("you only have a table");
-            }else{
-                // create hive table first
-                addStepPhase1_DoCreateHiveTable(jobFlow);
-                //createHiveTable(hiveInitStatements, jobWorkingDir, cubeName);
 
-                //change hdfspath
-                //jobWorkingDir = hdfsWorkingDir + "kylin-" + UUID.randomUUID().toString();
+                @Override
+                public SegmentRange getSegRange() {
+                    return null;
+                }
 
-                // next create flat table
-                //addStepPhase1_DoCreateFlatTable(jobFlow);
-                addStepPhase1_DoCreateFlatTable(jobFlow);//addStepPhase1_DoCreateFlatTable(jobFlow);
-                //createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName);
+                @Override
+                public TblColRef getDistributedBy() {
+                    return null;
+                }
 
+                @Override
+                public TblColRef getClusterBy() {
+                    return null;
+                }
 
-                // then count and redistribute
-                if (cubeConfig.isHiveRedistributeEnabled()) {
-                    jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName));
+                @Override
+                public ISegment getSegment() {
+                    return null;
                 }
 
+                @Override
+                public boolean useAlias() {
+                    return false;
+                }
+            };
+            final String dropTableHql = JoinedFlatTable.generateDropTableStatement(mockfactDesc);
+            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(mockfactDesc, baseLocation, JoinedFlatTable.TEXTFILE);
 
-                // special for hive
-                addStepPhase1_DoMaterializeLookupTable(jobFlow);
-            }
+            CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
+            step.setInitStatement(hiveInitStatements);
+            step.setCreateTableStatement(dropTableHql + createTableHql);
+            CubingExecutableUtil.setCubeName(cubeDesc.getName(), step.getParams());
+            step.setName(ExecutableConstants.STEP_NAME_CREATE_HIVE_TABLE);
+            return step;
         }
 
-        private MapReduceExecutable createSaveKafkaDataStep(String jobId) {
-            MapReduceExecutable result = new MapReduceExecutable();
 
-            IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg);
-            outputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
+        private MapReduceExecutable createSaveKafkaDataStep(String jobId, String location) {
+            MapReduceExecutable result = new MapReduceExecutable();
             result.setName("Save data from Kafka");
             result.setMapReduceJobClass(KafkaFlatTableJob.class);
             JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system");
             StringBuilder cmd = new StringBuilder();
             jobBuilderSupport.appendMapReduceParameters(cmd);
             JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
-            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+            JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, location);
             JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
             JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
 
@@ -405,20 +260,18 @@ public class KafkaMRInput implements IMRInput {
 
         @Override
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            GarbageCollectionStep step = new GarbageCollectionStep();
-            step.setName(ExecutableConstants.STEP_NAME_KAFKA_CLEANUP);
-            step.setDataPath(outputPath);
+            HiveMRInput.GarbageCollectionStep step = new HiveMRInput.GarbageCollectionStep();
+            step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
+            step.setIntermediateTables(intermediateTables);
+            step.setExternalDataPaths(intermediatePaths);
             jobFlow.addTask(step);
 
         }
 
         @Override
         public IMRTableInputFormat getFlatTableInputFormat() {
-            KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
-            KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(seg.getCubeInstance().getRootFactTable());
-            List<TblColRef> columns = new CubeJoinedFlatTableDesc(seg).getAllColumns();
-
-            return new KafkaTableInputFormat(seg, columns, kafkaConfig, conf);
+            String intermediateHiveTable = hiveTableDatabase + "." + flatDesc.getTableName();
+            return new HiveMRInput.HiveTableInputFormat(intermediateHiveTable);
         }
     }
 
@@ -443,36 +296,4 @@ public class KafkaMRInput implements IMRInput {
         }
     }
 
-    public static class GarbageCollectionStep extends AbstractExecutable {
-        private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
-
-        @Override
-        protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-            try {
-                rmdirOnHDFS(getDataPath());
-            } catch (IOException e) {
-                logger.error("job:" + getId() + " execute finished with exception", e);
-                return ExecuteResult.createError(e);
-            }
-
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "HDFS path " + getDataPath() + " is dropped.\n");
-        }
-
-        private void rmdirOnHDFS(String path) throws IOException {
-            Path externalDataPath = new Path(path);
-            FileSystem fs = HadoopUtil.getWorkingFileSystem();
-            if (fs.exists(externalDataPath)) {
-                fs.delete(externalDataPath, true);
-            }
-        }
-
-        public void setDataPath(String externalDataPath) {
-            setParam("dataPath", externalDataPath);
-        }
-
-        private String getDataPath() {
-            return getParam("dataPath");
-        }
-
-    }
 }
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 696c20c..c31132b 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -67,6 +67,9 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("margin")
     private long margin;
 
+    @JsonProperty("splitRows")
+    private int splitRows=1000000;
+
     //"configA=1;configB=2"
     @JsonProperty("parserProperties")
     private String parserProperties;
@@ -157,6 +160,15 @@ public class KafkaConfig extends RootPersistentEntity {
         return sb.toString();
     }
 
+
+    public int getSplitRows() {
+        return splitRows;
+    }
+
+    public void setSplitRows(int splitRows) {
+        this.splitRows = splitRows;
+    }
+
     @Override
     public KafkaConfig clone() {
         try {
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index b71ca84..b4bc17b 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
@@ -64,6 +64,8 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
     public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format";
     public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name";
 
+    public static final String CONFIG_KAFKA_SPLIT_ROWS = "kafka.split.rows";
+
     @Override
     public int run(String[] args) throws Exception {
         Options options = new Options();
@@ -111,6 +113,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
             job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout()));
             job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
             job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName());
+            job.getConfiguration().set(CONFIG_KAFKA_SPLIT_ROWS, String.valueOf(kafkaConfig.getSplitRows()));
             job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name
             setupMapper(cube.getSegmentById(segmentId));
             job.setNumReduceTasks(0);
@@ -154,7 +157,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
         job.setInputFormatClass(KafkaInputFormat.class);
         job.setOutputKeyClass(NullWritable.class);
         job.setOutputValueClass(Text.class);
-        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        job.setOutputFormatClass(TextOutputFormat.class);
         job.setNumReduceTasks(0);
     }
 
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
index a3c5e62..f88fe3a 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java
@@ -38,6 +38,7 @@ import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.StreamingParser;
@@ -63,17 +64,18 @@ public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritabl
 
         config = AbstractHadoopJob.loadKylinPropsAndMetadata();
         String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
-        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+        final CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
         this.cubeSegment = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
         this.delimiter = cubeSegment.getConfig().getFlatTableFieldDelimiter();
-        //delimiter="|"; //fixme
         logger.info("Use delimiter: " + delimiter);
-        KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(config);
-        KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cubeSegment.getCubeInstance().getRootFactTable());
-        List<TblColRef> columns = new CubeJoinedFlatTableDesc(cubeSegment).getAllColumns();
+        final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(config);
+        final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cubeSegment.getCubeInstance().getRootFactTable());
+
+        final IJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
+        final List<TblColRef> allColumns = flatTableDesc.getFactColumns();
 
         try {
-            streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getAllParserProperties(), columns);
+            streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getAllParserProperties(), allColumns);
         } catch (ReflectiveOperationException e) {
             throw new IllegalArgumentException(e);
         }
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
index 3681a98..c3ed47f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -55,7 +55,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
         final String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
         final Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN));
         final Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX));
-        final Integer spiltsSetnum = 1000;//这个参数是用来将来获取到从前台传来的数据的,现先假定一个固定值
+        final Integer spiltRows = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_SPLIT_ROWS));
 
         final Map<Integer, Long> startOffsetMap = Maps.newHashMap();
         final Map<Integer, Long> endOffsetMap = Maps.newHashMap();
@@ -83,14 +83,14 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
                 long new_start = startOffsetMap.get(partitionId);
                 long end = endOffsetMap.get(partitionId);
                 while (end > new_start) {
-                    if ((end - new_start) <= spiltsSetnum && (end > new_start)) {
+                    if ((end - new_start) <= spiltRows && (end > new_start)) {
                         InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, end);
                         splits.add(split);
                         break;
                     } else {
-                        InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, new_start + spiltsSetnum);
+                        InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, new_start + spiltRows);
                         splits.add(split);
-                        new_start += spiltsSetnum;
+                        new_start += spiltRows;
                     }
                 }
             }

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.