You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/08/20 13:47:32 UTC

kylin git commit: KYLIN-1966 Refactor IJoinedFlatTableDesc

Repository: kylin
Updated Branches:
  refs/heads/master ddb59106b -> 889c544ad


KYLIN-1966 Refactor IJoinedFlatTableDesc


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/889c544a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/889c544a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/889c544a

Branch: refs/heads/master
Commit: 889c544adf650963598381ce905aa8f86cb8ef86
Parents: ddb5910
Author: Yang Li <li...@apache.org>
Authored: Sat Aug 20 21:47:18 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sat Aug 20 21:47:18 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeSegment.java |  2 +-
 .../InMemCubeBuilderInputConverter.java         |  2 +-
 .../cube/model/CubeJoinedFlatTableDesc.java     | 86 +++++++++---------
 .../org/apache/kylin/cube/util/CubingUtils.java |  4 +-
 .../org/apache/kylin/job/JoinedFlatTable.java   | 93 ++++++++++----------
 .../apache/kylin/job/JoinedFlatTableTest.java   |  2 +-
 .../kylin/metadata/model/DataModelDesc.java     |  1 +
 .../metadata/model/IJoinedFlatTableDesc.java    | 10 ++-
 .../metadata/model/IntermediateColumnDesc.java  | 60 -------------
 .../apache/kylin/metadata/model/LookupDesc.java | 12 +++
 .../kylin/engine/mr/BatchMergeJobBuilder2.java  |  3 -
 .../org/apache/kylin/engine/mr/IMRInput.java    | 13 +--
 .../java/org/apache/kylin/engine/mr/MRUtil.java |  6 +-
 .../engine/mr/steps/BaseCuboidMapperBase.java   |  2 +-
 .../mr/steps/FactDistinctColumnsMapperBase.java |  2 +-
 .../apache/kylin/engine/spark/SparkCubing.java  |  4 +-
 .../inmemcubing/ITInMemCubeBuilderTest.java     |  8 +-
 .../kylin/rest/controller/CubeController.java   |  3 +-
 .../apache/kylin/source/hive/HiveMRInput.java   | 38 +++-----
 .../kylin/source/kafka/KafkaStreamingInput.java | 12 +--
 20 files changed, 139 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 4697c63..0797ab3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -531,7 +531,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable {
     }
 
     public IJoinedFlatTableDesc getJoinedFlatTableDesc() {
-        return new CubeJoinedFlatTableDesc(this.getCubeDesc(), this);
+        return new CubeJoinedFlatTableDesc(this);
     }
 
     public String getIndexPath() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
index 3ce635b..607f6bb 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
@@ -50,7 +50,7 @@ public class InMemCubeBuilderInputConverter {
 
     public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap, GTInfo gtInfo) {
         this.gtInfo = gtInfo;
-        this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+        this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc);
         this.measureCount = cubeDesc.getMeasures().size();
         this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
         this.measureIngesters = MeasureIngester.create(cubeDesc.getMeasures());

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index e8856ef..ebc903a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -28,7 +28,6 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -48,13 +47,21 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
     private int[] rowKeyColumnIndexes; // the column index on flat table
     private int[][] measureColumnIndexes; // [i] is the i.th measure related column index on flat table
 
-    private List<IntermediateColumnDesc> columnList = Lists.newArrayList();
-
-    private Map<String, Integer> columnIndexMap;
+    private List<TblColRef> columnList = Lists.newArrayList();
+    private Map<TblColRef, Integer> columnIndexMap;
 
     private List<JoinDesc> cubeJoins;
 
-    public CubeJoinedFlatTableDesc(CubeDesc cubeDesc, CubeSegment cubeSegment) {
+    
+    public CubeJoinedFlatTableDesc(CubeDesc cubeDesc) {
+        this(cubeDesc, null);
+    }
+    
+    public CubeJoinedFlatTableDesc(CubeSegment cubeSegment) {
+        this(cubeSegment.getCubeDesc(), cubeSegment);
+    }
+    
+    private CubeJoinedFlatTableDesc(CubeDesc cubeDesc, CubeSegment cubeSegment /* can be null */) {
         this.cubeDesc = cubeDesc;
         this.cubeSegment = cubeSegment;
         this.columnIndexMap = Maps.newHashMap();
@@ -62,16 +69,8 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
         parseCubeDesc();
     }
 
-    /**
-     * @return the cubeSegment
-     */
-    public CubeSegment getCubeSegment() {
-        return cubeSegment;
-    }
-
     // check what columns from hive tables are required, and index them
     private void parseCubeDesc() {
-        int rowkeyColCount = cubeDesc.getRowkey().getRowKeyColumns().length;
         long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
         Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
 
@@ -83,19 +82,20 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
 
         int columnIndex = 0;
         for (TblColRef col : cubeDesc.listDimensionColumnsExcludingDerived(false)) {
-            columnIndexMap.put(colName(col.getCanonicalName()), columnIndex);
-            columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), col));
+            columnIndexMap.put(col, columnIndex);
+            columnList.add(col);
             columnIndex++;
         }
 
         // build index for rowkey columns
         List<TblColRef> cuboidColumns = baseCuboid.getColumns();
+        int rowkeyColCount = cubeDesc.getRowkey().getRowKeyColumns().length;
         rowKeyColumnIndexes = new int[rowkeyColCount];
         for (int i = 0; i < rowkeyColCount; i++) {
-            String colName = colName(cuboidColumns.get(i).getCanonicalName());
-            Integer dimIdx = columnIndexMap.get(colName);
+            TblColRef col = cuboidColumns.get(i);
+            Integer dimIdx = columnIndexMap.get(col);
             if (dimIdx == null) {
-                throw new RuntimeException("Can't find column " + colName);
+                throw new RuntimeException("Can't find column " + col);
             }
             rowKeyColumnIndexes[i] = dimIdx;
         }
@@ -112,11 +112,11 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
                 measureColumnIndexes[i] = new int[colRefs.size()];
                 for (int j = 0; j < colRefs.size(); j++) {
                     TblColRef c = colRefs.get(j);
-                    measureColumnIndexes[i][j] = contains(columnList, c);
+                    measureColumnIndexes[i][j] = columnList.indexOf(c);
                     if (measureColumnIndexes[i][j] < 0) {
                         measureColumnIndexes[i][j] = columnIndex;
-                        columnIndexMap.put(colName(c.getCanonicalName()), columnIndex);
-                        columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), c));
+                        columnIndexMap.put(c, columnIndex);
+                        columnList.add(c);
                         columnIndex++;
                     }
                 }
@@ -126,16 +126,16 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
         if (cubeDesc.getDictionaries() != null) {
             for (DictionaryDesc dictDesc : cubeDesc.getDictionaries()) {
                 TblColRef c = dictDesc.getColumnRef();
-                if (contains(columnList, c) < 0) {
-                    columnIndexMap.put(colName(c.getCanonicalName()), columnIndex);
-                    columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), c));
+                if (columnList.indexOf(c) < 0) {
+                    columnIndexMap.put(c, columnIndex);
+                    columnList.add(c);
                     columnIndex++;
                 }
                 if (dictDesc.getResuseColumnRef() != null) {
                     c = dictDesc.getResuseColumnRef();
-                    if (contains(columnList, c) < 0) {
-                        columnIndexMap.put(colName(c.getCanonicalName()), columnIndex);
-                        columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), c));
+                    if (columnList.indexOf(c) < 0) {
+                        columnIndexMap.put(c, columnIndex);
+                        columnList.add(c);
                         columnIndex++;
                     }
                 }
@@ -151,16 +151,6 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
         }
     }
 
-    private int contains(List<IntermediateColumnDesc> columnList, TblColRef c) {
-        for (int i = 0; i < columnList.size(); i++) {
-            IntermediateColumnDesc col = columnList.get(i);
-
-            if (col.isSameAs(c.getTable(), c.getName()))
-                return i;
-        }
-        return -1;
-    }
-
     // sanity check the input record (in bytes) matches what's expected
     public void sanityCheck(BytesSplitter bytesSplitter) {
         if (columnCount != bytesSplitter.getBufferSize()) {
@@ -188,7 +178,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
     }
 
     @Override
-    public List<IntermediateColumnDesc> getColumnList() {
+    public List<TblColRef> getAllColumns() {
         return columnList;
     }
 
@@ -197,11 +187,6 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
         return cubeDesc.getModel();
     }
 
-    @Override
-    public List<JoinDesc> getUsedJoinsSet() {
-        return cubeJoins;
-    }
-
     private static String colName(String canonicalColName) {
         return canonicalColName.replace(".", "_");
     }
@@ -215,4 +200,19 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
         return index.intValue();
     }
 
+    @Override
+    public long getSourceOffsetStart() {
+        return cubeSegment.getSourceOffsetStart();
+    }
+
+    @Override
+    public long getSourceOffsetEnd() {
+        return cubeSegment.getSourceOffsetEnd();
+    }
+
+    @Override
+    public TblColRef getDistributedBy() {
+        return cubeDesc.getDistributedByColumn();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
index 0be39b4..b7f79e1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
@@ -75,7 +75,7 @@ public class CubingUtils {
     private static Logger logger = LoggerFactory.getLogger(CubingUtils.class);
 
     public static Map<Long, HyperLogLogPlusCounter> sampling(CubeDesc cubeDesc, Iterable<List<String>> streams) {
-        CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+        CubeJoinedFlatTableDesc flatDesc = new CubeJoinedFlatTableDesc(cubeDesc);
         final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
         final List<Long> allCuboidIds = new CuboidScheduler(cubeDesc).getAllCuboidIds();
         final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
@@ -125,7 +125,7 @@ public class CubingUtils {
             //generate hash for each row key column
             for (int i = 0; i < rowkeyLength; i++) {
                 Hasher hc = hf.newHasher();
-                final String cell = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+                final String cell = row.get(flatDesc.getRowKeyColumnIndexes()[i]);
                 if (cell != null) {
                     row_hashcodes[i].set(hc.putString(cell).hash().asBytes());
                 } else {

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index cb3735a..b39265d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -28,13 +28,9 @@ import java.util.Set;
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.LookupDesc;
 import org.apache.kylin.metadata.model.PartitionDesc;
@@ -42,6 +38,9 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.w3c.dom.Document;
 import org.w3c.dom.NodeList;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 /**
  *
  */
@@ -79,24 +78,24 @@ public class JoinedFlatTable {
         return buffer.toString();
     }
 
-    public static String generateCreateTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
+    public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir) {
         StringBuilder ddl = new StringBuilder();
 
-        ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediateTableDesc.getTableName() + "\n");
+        ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + flatDesc.getTableName() + "\n");
 
         ddl.append("(" + "\n");
-        for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
-            IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
+        for (int i = 0; i < flatDesc.getAllColumns().size(); i++) {
+            TblColRef col = flatDesc.getAllColumns().get(i);
             if (i > 0) {
                 ddl.append(",");
             }
-            ddl.append(colName(col.getCanonicalName()) + " " + getHiveDataType(col.getDataType()) + "\n");
+            ddl.append(colName(col.getCanonicalName()) + " " + getHiveDataType(col.getDatatype()) + "\n");
         }
         ddl.append(")" + "\n");
 
         ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n");
         ddl.append("STORED AS SEQUENCEFILE" + "\n");
-        ddl.append("LOCATION '" + getTableDir(intermediateTableDesc, storageDfsDir) + "';").append("\n");
+        ddl.append("LOCATION '" + getTableDir(flatDesc, storageDfsDir) + "';").append("\n");
         // ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" +
         // ";\n");
         return ddl.toString();
@@ -115,32 +114,32 @@ public class JoinedFlatTable {
         return sql.toString();
     }
 
-    public static String generateSelectDataStatement(IJoinedFlatTableDesc intermediateTableDesc) {
+    public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc) {
         StringBuilder sql = new StringBuilder();
         sql.append("SELECT" + "\n");
         String tableAlias;
-        Map<String, String> tableAliasMap = buildTableAliasMap(intermediateTableDesc.getDataModel());
-        for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
-            IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
+        Map<String, String> tableAliasMap = buildTableAliasMap(flatDesc.getDataModel());
+        for (int i = 0; i < flatDesc.getAllColumns().size(); i++) {
+            TblColRef col = flatDesc.getAllColumns().get(i);
             if (i > 0) {
                 sql.append(",");
             }
-            tableAlias = tableAliasMap.get(col.getTableName());
-            sql.append(tableAlias + "." + col.getColumnName() + "\n");
+            tableAlias = tableAliasMap.get(col.getTable());
+            sql.append(tableAlias + "." + col.getName() + "\n");
         }
-        appendJoinStatement(intermediateTableDesc, sql, tableAliasMap);
-        appendWhereStatement(intermediateTableDesc, sql, tableAliasMap);
-        appendDistributeStatement(intermediateTableDesc, sql, tableAliasMap);
+        appendJoinStatement(flatDesc, sql, tableAliasMap);
+        appendWhereStatement(flatDesc, sql, tableAliasMap);
+        appendDistributeStatement(flatDesc, sql, tableAliasMap);
         return sql.toString();
     }
 
-    public static String generateCountDataStatement(IJoinedFlatTableDesc intermediateTableDesc, final String outputDir) {
-        final Map<String, String> tableAliasMap = buildTableAliasMap(intermediateTableDesc.getDataModel());
+    public static String generateCountDataStatement(IJoinedFlatTableDesc flatDesc, final String outputDir) {
+        final Map<String, String> tableAliasMap = buildTableAliasMap(flatDesc.getDataModel());
         final StringBuilder sql = new StringBuilder();
-        final String factTbl = intermediateTableDesc.getDataModel().getFactTable();
+        final String factTbl = flatDesc.getDataModel().getFactTable();
         sql.append("dfs -mkdir -p " + outputDir + ";\n");
         sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + factTbl + " " + tableAliasMap.get(factTbl) + "\n");
-        appendWhereStatement(intermediateTableDesc, sql, tableAliasMap);
+        appendWhereStatement(flatDesc, sql, tableAliasMap);
         return sql.toString();
     }
 
@@ -172,12 +171,12 @@ public class JoinedFlatTable {
         tableAliasMap.put(table, alias);
     }
 
-    private static void appendJoinStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
-        List<JoinDesc> cubeJoins = intermediateTableDesc.getUsedJoinsSet();
+    private static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
+        List<JoinDesc> cubeJoins = getUsedJoinsSet(flatDesc);
 
         Set<String> dimTableCache = new HashSet<String>();
 
-        DataModelDesc dataModelDesc = intermediateTableDesc.getDataModel();
+        DataModelDesc dataModelDesc = flatDesc.getDataModel();
         String factTableName = dataModelDesc.getFactTable();
         String factTableAlias = tableAliasMap.get(factTableName);
         sql.append("FROM " + factTableName + " as " + factTableAlias + " \n");
@@ -212,13 +211,25 @@ public class JoinedFlatTable {
         }
     }
 
-    private static void appendDistributeStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
-        if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) {
-            return;//TODO: for now only cube segments support distribution
+    private static List<JoinDesc> getUsedJoinsSet(IJoinedFlatTableDesc flatDesc) {
+        Set<String> usedTableIdentities = Sets.newHashSet();
+        for (TblColRef col : flatDesc.getAllColumns()) {
+            usedTableIdentities.add(col.getTable());
+        }
+        
+        List<JoinDesc> result = Lists.newArrayList();
+        for (LookupDesc lookup : flatDesc.getDataModel().getLookups()) {
+            String table = lookup.getTableDesc().getIdentity();
+            if (usedTableIdentities.contains(table)) {
+                result.add(lookup.getJoin());
+            }
         }
-        CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) intermediateTableDesc;
+        
+        return result;
+    }
 
-        TblColRef distDcol = desc.getCubeDesc().getDistributedByColumn();
+    private static void appendDistributeStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
+        TblColRef distDcol = flatDesc.getDistributedBy();
 
         if (distDcol != null) {
             String tblAlias = tableAliasMap.get(distDcol.getTable());
@@ -228,30 +239,22 @@ public class JoinedFlatTable {
         }
     }
 
-    private static void appendWhereStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
-        if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) {
-            return;//TODO: for now only cube segments support filter and partition
-        }
-        CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) intermediateTableDesc;
-
+    private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
         boolean hasCondition = false;
         StringBuilder whereBuilder = new StringBuilder();
         whereBuilder.append("WHERE");
 
-        CubeDesc cubeDesc = desc.getCubeDesc();
-        DataModelDesc model = cubeDesc.getModel();
+        DataModelDesc model = flatDesc.getDataModel();
 
         if (model.getFilterCondition() != null && model.getFilterCondition().equals("") == false) {
             whereBuilder.append(" (").append(model.getFilterCondition()).append(") ");
             hasCondition = true;
         }
 
-        CubeSegment cubeSegment = desc.getCubeSegment();
-
-        if (null != cubeSegment) {
-            PartitionDesc partDesc = model.getPartitionDesc();
-            long dateStart = cubeSegment.getDateRangeStart();
-            long dateEnd = cubeSegment.getDateRangeEnd();
+        PartitionDesc partDesc = model.getPartitionDesc();
+        if (partDesc != null && partDesc.getPartitionDateColumn() != null) {
+            long dateStart = flatDesc.getSourceOffsetStart();
+            long dateEnd = flatDesc.getSourceOffsetEnd();
 
             if (!(dateStart == 0 && dateEnd == Long.MAX_VALUE)) {
                 whereBuilder.append(hasCondition ? " AND (" : " (");

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
index 697d392..17a7178 100644
--- a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
@@ -51,7 +51,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
         this.createTestMetadata();
         cube = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready");
         cubeSegment = cube.getSegments().get(0);
-        intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment);
+        intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
index 5f39049..7f5edfe 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
@@ -270,6 +270,7 @@ public class DataModelDesc extends RootPersistentEntity {
             if (dimTable == null) {
                 throw new IllegalStateException("Table " + lookup.getTable() + " does not exist for " + this);
             }
+            lookup.setTableDesc(dimTable);
             this.lookupTableDescs.add(dimTable);
 
             JoinDesc join = lookup.getJoin();

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index 5088c9c..55ea71f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -26,9 +26,13 @@ public interface IJoinedFlatTableDesc {
 
     String getTableName();
 
-    List<IntermediateColumnDesc> getColumnList();
-
     DataModelDesc getDataModel();
+    
+    List<TblColRef> getAllColumns();
 
-    List<JoinDesc> getUsedJoinsSet();
+    long getSourceOffsetStart();
+    
+    long getSourceOffsetEnd();
+    
+    TblColRef getDistributedBy();
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IntermediateColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IntermediateColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IntermediateColumnDesc.java
deleted file mode 100644
index 10a3795..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IntermediateColumnDesc.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.model;
-
-/**
- */
-public class IntermediateColumnDesc {
-    private String id;
-    private TblColRef colRef;
-
-    public IntermediateColumnDesc(String id, TblColRef colRef) {
-        this.id = id;
-        this.colRef = colRef;
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public TblColRef getColRef() {
-        return this.colRef;
-    }
-
-    public String getColumnName() {
-        return colRef.getName();
-    }
-
-    public String getDataType() {
-        return colRef.getDatatype();
-    }
-
-    public String getTableName() {
-        return colRef.getTable();
-    }
-
-    public boolean isSameAs(String tableName, String columnName) {
-        return colRef.isSameAs(tableName, columnName);
-    }
-
-    public String getCanonicalName() {
-        return colRef.getCanonicalName();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/core-metadata/src/main/java/org/apache/kylin/metadata/model/LookupDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/LookupDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/LookupDesc.java
index 057a1e4..c85612a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/LookupDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/LookupDesc.java
@@ -30,6 +30,8 @@ public class LookupDesc {
 
     @JsonProperty("join")
     private JoinDesc join;
+    
+    private TableDesc tableDesc;
 
     public String getTable() {
         return table;
@@ -47,4 +49,14 @@ public class LookupDesc {
         this.join = join;
     }
 
+    public TableDesc getTableDesc() {
+        return tableDesc;
+    }
+
+    void setTableDesc(TableDesc tableDesc) {
+        this.tableDesc = tableDesc;
+    }
+    
+    
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index b504dbf..289cd48 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -34,12 +34,10 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
     private static final Logger logger = LoggerFactory.getLogger(BatchMergeJobBuilder2.class);
 
     private final IMROutput2.IMRBatchMergeOutputSide2 outputSide;
-    private final IMRInput.IMRBatchMergeInputSide inputSide;
 
     public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
         super(mergeSegment, submitter);
         this.outputSide = MRUtil.getBatchMergeOutputSide2(seg);
-        this.inputSide = MRUtil.getBatchMergeInputSide(seg);
     }
 
     public CubingJob build() {
@@ -57,7 +55,6 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
         }
 
         // Phase 1: Merge Dictionary
-        inputSide.addStepPhase1_MergeDictionary(result);
         result.addTask(createMergeDictionaryStep(mergingSegmentIds));
         result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
         outputSide.addStepPhase1_MergeDictionary(result);

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 6e01877..582052f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -19,8 +19,8 @@
 package org.apache.kylin.engine.mr;
 
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 
 /**
@@ -29,14 +29,11 @@ import org.apache.kylin.metadata.model.TableDesc;
 public interface IMRInput {
 
     /** Return a helper to participate in batch cubing job flow. */
-    public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg);
+    public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
 
     /** Return an InputFormat that reads from specified table. */
     public IMRTableInputFormat getTableInputFormat(TableDesc table);
 
-    /** Return a helper to participate in batch cubing merge job flow. */
-    public IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg);
-
     /**
      * Utility that configures mapper to read from a table.
      */
@@ -70,10 +67,4 @@ public interface IMRInput {
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
 
-    public interface IMRBatchMergeInputSide {
-
-        /** Add step that executes before merge dictionary and before merge cube. */
-        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 14fdd93..877358b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -37,7 +37,7 @@ import org.apache.kylin.storage.StorageFactory;
 public class MRUtil {
 
     public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
-        return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(seg);
+        return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(seg.getJoinedFlatTableDesc());
     }
 
     public static IMRTableInputFormat getTableInputFormat(String tableName) {
@@ -68,10 +68,6 @@ public class MRUtil {
         return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
     }
 
-    public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
-        return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
-    }
-
     // use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale
     // Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe
     public static int runMRJob(Tool tool, String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 85e3cc7..4786505 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -102,7 +102,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
         long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
         baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
 
-        intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment);
+        intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
 
         bytesSplitter = new BytesSplitter(200, 16384);
         rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index a91d333..20259cb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -72,7 +72,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
 
         flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
 
-        intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+        intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc);
         dictionaryColumnIndex = new int[factDictCols.size()];
         for (int i = 0; i < factDictCols.size(); i++) {
             TblColRef colRef = factDictCols.get(i);

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 69629b2..3ccbcc8 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -177,7 +177,7 @@ public class SparkCubing extends AbstractApplication {
         final String[] columns = intermediateTable.columns();
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
         final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
-        final CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+        final CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc);
         final List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns();
         final long start = System.currentTimeMillis();
         final RowKeyDesc rowKey = cubeDesc.getRowkey();
@@ -248,7 +248,7 @@ public class SparkCubing extends AbstractApplication {
             zeroValue.put(id, new HyperLogLogPlusCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
         }
 
-        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc);
 
         final int[] rowKeyColumnIndexes = flatTableDesc.getRowKeyColumnIndexes();
         final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
index 169daa4..87b222e 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
@@ -149,8 +149,8 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
     }
 
     static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException {
-        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null);
-        int nColumns = flatTableDesc.getColumnList().size();
+        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor());
+        int nColumns = flatTableDesc.getAllColumns().size();
 
         @SuppressWarnings("unchecked")
         Set<String>[] distinctSets = new Set[nColumns];
@@ -190,8 +190,8 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
     static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
         Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
         CubeDesc desc = cube.getDescriptor();
-        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
-        int nColumns = flatTableDesc.getColumnList().size();
+        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc);
+        int nColumns = flatTableDesc.getAllColumns().size();
 
         List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns();
         for (int c = 0; c < columns.size(); c++) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 7932211..4e56f74 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -134,9 +134,8 @@ public class CubeController extends BasicController {
     @ResponseBody
     public GeneralResponse getSql(@PathVariable String cubeName, @PathVariable String segmentName) {
         CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
-        CubeDesc cubeDesc = cube.getDescriptor();
         CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.READY);
-        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, cubeSegment);
+        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
         String sql = JoinedFlatTable.generateSelectDataStatement(flatTableDesc);
 
         GeneralResponse repsonse = new GeneralResponse();

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 9ec0f02..e3d7879 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -28,9 +28,6 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
@@ -56,8 +53,8 @@ import com.google.common.collect.Sets;
 public class HiveMRInput implements IMRInput {
 
     @Override
-    public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
-        return new BatchCubingInputSide(seg);
+    public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+        return new BatchCubingInputSide(flatDesc);
     }
 
     @Override
@@ -65,16 +62,6 @@ public class HiveMRInput implements IMRInput {
         return new HiveTableInputFormat(table.getIdentity());
     }
 
-    @Override
-    public IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
-        return new IMRBatchMergeInputSide() {
-            @Override
-            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
-                // doing nothing
-            }
-        };
-    }
-
     public static class HiveTableInputFormat implements IMRTableInputFormat {
         final String dbName;
         final String tableName;
@@ -111,14 +98,12 @@ public class HiveMRInput implements IMRInput {
     public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
 
         final JobEngineConfig conf;
-        final CubeSegment seg;
-        final IJoinedFlatTableDesc flatHiveTableDesc;
+        final IJoinedFlatTableDesc flatDesc;
         String hiveViewIntermediateTables = "";
 
-        public BatchCubingInputSide(CubeSegment seg) {
+        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
             this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
-            this.seg = seg;
-            this.flatHiveTableDesc = seg.getJoinedFlatTableDesc();
+            this.flatDesc = flatDesc;
         }
 
         @Override
@@ -127,8 +112,8 @@ public class HiveMRInput implements IMRInput {
 
             final String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()) + "/row_count";
 
-            jobFlow.addTask(createCountHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId(), rowCountOutputDir));
-            jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId(), cubeName, rowCountOutputDir));
+            jobFlow.addTask(createCountHiveTableStep(conf, flatDesc, jobFlow.getId(), rowCountOutputDir));
+            jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName, rowCountOutputDir));
             AbstractExecutable task = createLookupHiveViewMaterializationStep(jobFlow.getId());
             if (task != null) {
                 jobFlow.addTask(task);
@@ -155,13 +140,10 @@ public class HiveMRInput implements IMRInput {
             HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
 
             KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-            CubeManager cubeMgr = CubeManager.getInstance(kylinConfig);
-            String cubeName = seg.getRealization().getName();
-            CubeDesc cubeDesc = cubeMgr.getCube(cubeName).getDescriptor();
             MetadataManager metadataManager = MetadataManager.getInstance(kylinConfig);
             final Set<TableDesc> lookupViewsTables = Sets.newHashSet();
 
-            for (LookupDesc lookupDesc : cubeDesc.getModel().getLookups()) {
+            for (LookupDesc lookupDesc : flatDesc.getDataModel().getLookups()) {
                 TableDesc tableDesc = metadataManager.getTableDesc(lookupDesc.getTable());
                 if (TableDesc.TABLE_TYPE_VIRTUAL_VIEW.equalsIgnoreCase(tableDesc.getTableType())) {
                     lookupViewsTables.add(tableDesc);
@@ -216,7 +198,7 @@ public class HiveMRInput implements IMRInput {
             GarbageCollectionStep step = new GarbageCollectionStep();
             step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
             step.setIntermediateTableIdentity(getIntermediateTableIdentity());
-            step.setExternalDataPath(JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId())));
+            step.setExternalDataPath(JoinedFlatTable.getTableDir(flatDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId())));
             step.setHiveViewIntermediateTableIdentities(hiveViewIntermediateTables);
             jobFlow.addTask(step);
         }
@@ -227,7 +209,7 @@ public class HiveMRInput implements IMRInput {
         }
 
         private String getIntermediateTableIdentity() {
-            return conf.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatHiveTableDesc.getTableName();
+            return conf.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatDesc.getTableName();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/889c544a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index fe3fe0a..e055d9e 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -40,8 +40,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import javax.annotation.Nullable;
-
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.StreamingBatch;
@@ -52,7 +50,6 @@ import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.streaming.IStreamingInput;
 import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.engine.streaming.StreamingManager;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
@@ -62,7 +59,6 @@ import org.apache.kylin.source.kafka.util.KafkaUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 
 import kafka.cluster.Broker;
@@ -94,13 +90,7 @@ public class KafkaStreamingInput implements IStreamingInput {
             try {
                 final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
                 final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming);
-                List<TblColRef> columns = Lists.transform(new CubeJoinedFlatTableDesc(cube.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
-                    @Nullable
-                    @Override
-                    public TblColRef apply(IntermediateColumnDesc input) {
-                        return input.getColRef();
-                    }
-                });
+                List<TblColRef> columns = new CubeJoinedFlatTableDesc(cube.getDescriptor()).getAllColumns();
 
                 final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
                 final ExecutorService executorService = Executors.newCachedThreadPool();