You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/09/03 12:45:56 UTC

[02/50] [abbrv] kylin git commit: KYLIN-1966 futher refactor, decouple flat table and cube

KYLIN-1966 futher refactor, decouple flat table and cube


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/29ba46be
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/29ba46be
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/29ba46be

Branch: refs/heads/1.5.x-CDH5.7
Commit: 29ba46beee3c9330c54a23265fdc183333f750d8
Parents: 1b34c38
Author: Yang Li <li...@apache.org>
Authored: Mon Aug 22 08:37:12 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Aug 22 08:37:12 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeSegment.java |   8 +-
 .../inmemcubing/AbstractInMemCubeBuilder.java   |   7 +-
 .../cube/inmemcubing/DoggedCubeBuilder.java     |   7 +-
 .../cube/inmemcubing/InMemCubeBuilder.java      |   7 +-
 .../InMemCubeBuilderInputConverter.java         |  15 +-
 .../org/apache/kylin/cube/model/CubeDesc.java   |   3 +-
 .../cube/model/CubeJoinedFlatTableDesc.java     |  42 +-----
 .../cube/model/CubeJoinedFlatTableEnrich.java   | 140 +++++++++++++++++++
 .../org/apache/kylin/cube/util/CubingUtils.java |   7 +-
 .../org/apache/kylin/engine/EngineFactory.java  |  12 ++
 .../apache/kylin/engine/IBatchCubingEngine.java |   8 ++
 .../apache/kylin/job/JoinedFlatTableTest.java   |  13 +-
 .../metadata/model/IJoinedFlatTableDesc.java    |   2 +
 .../kylin/engine/mr/MRBatchCubingEngine.java    |  13 ++
 .../kylin/engine/mr/MRBatchCubingEngine2.java   |  13 ++
 .../java/org/apache/kylin/engine/mr/MRUtil.java |   5 +-
 .../engine/mr/steps/BaseCuboidMapperBase.java   |   7 +-
 .../mr/steps/FactDistinctColumnsMapperBase.java |   7 +-
 .../engine/mr/steps/InMemCuboidMapper.java      |   5 +-
 .../engine/spark/SparkBatchCubingEngine.java    |  12 ++
 .../apache/kylin/engine/spark/SparkCubing.java  |  29 ++--
 .../engine/spark/SparkCubingJobBuilder.java     |   5 +-
 engine-streaming/pom.xml                        |   4 +
 .../streaming/cube/StreamingCubeBuilder.java    |   9 +-
 .../ITDoggedCubeBuilderStressTest.java          |   5 +-
 .../inmemcubing/ITDoggedCubeBuilderTest.java    |   7 +-
 .../inmemcubing/ITInMemCubeBuilderTest.java     |  19 +--
 .../kylin/rest/controller/CubeController.java   |   5 +-
 .../kylin/source/kafka/KafkaStreamingInput.java |   4 +-
 29 files changed, 310 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 0797ab3..aaa88f1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -35,18 +35,16 @@ import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.kv.CubeDimEncMap;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.IRealization;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonBackReference;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -530,10 +528,6 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable {
         return cubeInstance;
     }
 
-    public IJoinedFlatTableDesc getJoinedFlatTableDesc() {
-        return new CubeJoinedFlatTableDesc(this);
-    }
-
     public String getIndexPath() {
         return indexPath;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
index d417d11..0bfaab3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
@@ -28,6 +28,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,18 +40,22 @@ abstract public class AbstractInMemCubeBuilder {
 
     private static Logger logger = LoggerFactory.getLogger(AbstractInMemCubeBuilder.class);
 
+    final protected IJoinedFlatTableDesc flatDesc;
     final protected CubeDesc cubeDesc;
     final protected Map<TblColRef, Dictionary<String>> dictionaryMap;
 
     protected int taskThreadCount = 4;
     protected int reserveMemoryMB = 100;
 
-    public AbstractInMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+    public AbstractInMemCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+        if (flatDesc == null)
+            throw new NullPointerException();
         if (cubeDesc == null)
             throw new NullPointerException();
         if (dictionaryMap == null)
             throw new IllegalArgumentException("dictionary cannot be null");
 
+        this.flatDesc = flatDesc;
         this.cubeDesc = cubeDesc;
         this.dictionaryMap = dictionaryMap;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index 15f2241..69f1f82 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -38,6 +38,7 @@ import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,8 +55,8 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
     private int splitRowThreshold = Integer.MAX_VALUE;
     private int unitRows = 1000;
 
-    public DoggedCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
-        super(cubeDesc, dictionaryMap);
+    public DoggedCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+        super(cubeDesc, flatDesc, dictionaryMap);
 
         // check memory more often if a single row is big
         if (cubeDesc.hasMemoryHungryMeasures())
@@ -270,7 +271,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
         RuntimeException exception;
 
         public SplitThread() {
-            this.builder = new InMemCubeBuilder(cubeDesc, dictionaryMap);
+            this.builder = new InMemCubeBuilder(cubeDesc, flatDesc, dictionaryMap);
             this.builder.setConcurrentThreads(taskThreadCount);
             this.builder.setReserveMemoryMB(reserveMemoryMB);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 36d1296..e4908b8 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -48,6 +48,7 @@ import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.measure.topn.Counter;
 import org.apache.kylin.measure.topn.TopNCounter;
 import org.apache.kylin.metadata.datatype.DoubleMutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
@@ -86,8 +87,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     private Object[] totalSumForSanityCheck;
     private ICuboidCollector resultCollector;
 
-    public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
-        super(cubeDesc, dictionaryMap);
+    public InMemCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+        super(cubeDesc, flatDesc, dictionaryMap);
         this.cuboidScheduler = new CuboidScheduler(cubeDesc);
         this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
         this.totalCuboidCount = cuboidScheduler.getCuboidCount();
@@ -514,7 +515,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
             this.info = info;
             this.input = input;
             this.record = new GTRecord(info);
-            this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc, dictionaryMap, info);
+            this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc, flatDesc, dictionaryMap, info);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
index 607f6bb..ab44f63 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
@@ -23,11 +23,12 @@ import java.util.Map;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -40,7 +41,7 @@ public class InMemCubeBuilderInputConverter {
 
     public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
 
-    private final CubeJoinedFlatTableDesc intermediateTableDesc;
+    private final CubeJoinedFlatTableEnrich flatDesc;
     private final MeasureDesc[] measureDescs;
     private final MeasureIngester<?>[] measureIngesters;
     private final int measureCount;
@@ -48,9 +49,9 @@ public class InMemCubeBuilderInputConverter {
     private final GTInfo gtInfo;
     protected List<byte[]> nullBytes;
 
-    public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap, GTInfo gtInfo) {
+    public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap, GTInfo gtInfo) {
         this.gtInfo = gtInfo;
-        this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc);
+        this.flatDesc = new CubeJoinedFlatTableEnrich(flatDesc, cubeDesc);
         this.measureCount = cubeDesc.getMeasures().size();
         this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
         this.measureIngesters = MeasureIngester.create(cubeDesc.getMeasures());
@@ -74,11 +75,11 @@ public class InMemCubeBuilderInputConverter {
     }
 
     private Object[] buildKey(List<String> row) {
-        int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
+        int keySize = flatDesc.getRowKeyColumnIndexes().length;
         Object[] key = new Object[keySize];
 
         for (int i = 0; i < keySize; i++) {
-            key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+            key[i] = row.get(flatDesc.getRowKeyColumnIndexes()[i]);
             if (key[i] != null && isNull(Bytes.toBytes((String) key[i]))) {
                 key[i] = null;
             }
@@ -98,7 +99,7 @@ public class InMemCubeBuilderInputConverter {
     private Object buildValueOf(int idxOfMeasure, List<String> row) {
         MeasureDesc measure = measureDescs[idxOfMeasure];
         FunctionDesc function = measure.getFunction();
-        int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure];
+        int[] colIdxOnFlatTable = flatDesc.getMeasureColumnIndexes()[idxOfMeasure];
 
         int paramCount = function.getParameterCount();
         String[] inputToMeasure = new String[paramCount];

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 2c83972..30290cd 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -78,7 +78,7 @@ import com.google.common.collect.Sets;
  */
 @SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class CubeDesc extends RootPersistentEntity {
+public class CubeDesc extends RootPersistentEntity implements IEngineAware {
     private static final Logger logger = LoggerFactory.getLogger(CubeDesc.class);
 
     public static class CannotFilterExtendedColumnException extends RuntimeException {
@@ -1004,6 +1004,7 @@ public class CubeDesc extends RootPersistentEntity {
         this.storageType = storageType;
     }
 
+    @Override
     public int getEngineType() {
         return engineType;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index 1f9d772..6aeb617 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -23,7 +23,6 @@ import java.util.Map;
 
 import org.apache.kylin.common.util.BytesSplitter;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
@@ -42,8 +41,6 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
     private final CubeSegment cubeSegment;
 
     private int columnCount;
-    private int[] rowKeyColumnIndexes; // the column index on flat table
-    private int[][] measureColumnIndexes; // [i] is the i.th measure related column index on flat table
 
     private List<TblColRef> columnList = Lists.newArrayList();
     private Map<TblColRef, Integer> columnIndexMap;
@@ -65,9 +62,6 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
 
     // check what columns from hive tables are required, and index them
     private void parseCubeDesc() {
-        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-
         if (cubeSegment == null) {
             this.tableName = "kylin_intermediate_" + cubeDesc.getName();
         } else {
@@ -81,34 +75,15 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
             columnIndex++;
         }
 
-        // build index for rowkey columns
-        List<TblColRef> cuboidColumns = baseCuboid.getColumns();
-        int rowkeyColCount = cubeDesc.getRowkey().getRowKeyColumns().length;
-        rowKeyColumnIndexes = new int[rowkeyColCount];
-        for (int i = 0; i < rowkeyColCount; i++) {
-            TblColRef col = cuboidColumns.get(i);
-            Integer dimIdx = columnIndexMap.get(col);
-            if (dimIdx == null) {
-                throw new RuntimeException("Can't find column " + col);
-            }
-            rowKeyColumnIndexes[i] = dimIdx;
-        }
-
         List<MeasureDesc> measures = cubeDesc.getMeasures();
         int measureSize = measures.size();
-        measureColumnIndexes = new int[measureSize][];
         for (int i = 0; i < measureSize; i++) {
             FunctionDesc func = measures.get(i).getFunction();
             List<TblColRef> colRefs = func.getParameter().getColRefs();
-            if (colRefs == null) {
-                measureColumnIndexes[i] = null;
-            } else {
-                measureColumnIndexes[i] = new int[colRefs.size()];
+            if (colRefs != null) {
                 for (int j = 0; j < colRefs.size(); j++) {
                     TblColRef c = colRefs.get(j);
-                    measureColumnIndexes[i][j] = columnList.indexOf(c);
-                    if (measureColumnIndexes[i][j] < 0) {
-                        measureColumnIndexes[i][j] = columnIndex;
+                    if (columnList.indexOf(c) < 0) {
                         columnIndexMap.put(c, columnIndex);
                         columnList.add(c);
                         columnIndex++;
@@ -148,18 +123,6 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
         // TODO: check data types here
     }
 
-    public CubeDesc getCubeDesc() {
-        return cubeDesc;
-    }
-
-    public int[] getRowKeyColumnIndexes() {
-        return rowKeyColumnIndexes;
-    }
-
-    public int[][] getMeasureColumnIndexes() {
-        return measureColumnIndexes;
-    }
-
     @Override
     public String getTableName() {
         return tableName;
@@ -175,6 +138,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
         return cubeDesc.getModel();
     }
 
+    @Override
     public int getColumnIndex(TblColRef colRef) {
         Integer index = columnIndexMap.get(colRef);
         if (index == null)

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
new file mode 100644
index 0000000..5212859
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.model;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.BytesSplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * An enrich of IJoinedFlatTableDesc for cubes
+ */
+public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc {
+
+    private CubeDesc cubeDesc;
+    private IJoinedFlatTableDesc flatDesc;
+    private int[] rowKeyColumnIndexes; // the column index on flat table
+    private int[][] measureColumnIndexes; // [i] is the i.th measure related column index on flat table
+
+    public CubeJoinedFlatTableEnrich(IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
+        // != works due to object cache
+        if (cubeDesc.getModel() != flatDesc.getDataModel())
+            throw new IllegalArgumentException();
+        
+        this.cubeDesc = cubeDesc;
+        this.flatDesc = flatDesc;
+        parseCubeDesc();
+    }
+
+    // check what columns from hive tables are required, and index them
+    private void parseCubeDesc() {
+        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+
+        // build index for rowkey columns
+        List<TblColRef> cuboidColumns = baseCuboid.getColumns();
+        int rowkeyColCount = cubeDesc.getRowkey().getRowKeyColumns().length;
+        rowKeyColumnIndexes = new int[rowkeyColCount];
+        for (int i = 0; i < rowkeyColCount; i++) {
+            TblColRef col = cuboidColumns.get(i);
+            rowKeyColumnIndexes[i] = flatDesc.getColumnIndex(col);
+        }
+
+        List<MeasureDesc> measures = cubeDesc.getMeasures();
+        int measureSize = measures.size();
+        measureColumnIndexes = new int[measureSize][];
+        for (int i = 0; i < measureSize; i++) {
+            FunctionDesc func = measures.get(i).getFunction();
+            List<TblColRef> colRefs = func.getParameter().getColRefs();
+            if (colRefs == null) {
+                measureColumnIndexes[i] = null;
+            } else {
+                measureColumnIndexes[i] = new int[colRefs.size()];
+                for (int j = 0; j < colRefs.size(); j++) {
+                    TblColRef c = colRefs.get(j);
+                    measureColumnIndexes[i][j] = flatDesc.getColumnIndex(c);
+                }
+            }
+        }
+    }
+
+    // sanity check the input record (in bytes) matches what's expected
+    public void sanityCheck(BytesSplitter bytesSplitter) {
+        int columnCount = flatDesc.getAllColumns().size();
+        if (columnCount != bytesSplitter.getBufferSize()) {
+            throw new IllegalArgumentException("Expect " + columnCount + " columns, but see " + bytesSplitter.getBufferSize() + " -- " + bytesSplitter);
+        }
+
+        // TODO: check data types here
+    }
+
+    public CubeDesc getCubeDesc() {
+        return cubeDesc;
+    }
+
+    public int[] getRowKeyColumnIndexes() {
+        return rowKeyColumnIndexes;
+    }
+
+    public int[][] getMeasureColumnIndexes() {
+        return measureColumnIndexes;
+    }
+
+    @Override
+    public String getTableName() {
+        return flatDesc.getTableName();
+    }
+
+    @Override
+    public List<TblColRef> getAllColumns() {
+        return flatDesc.getAllColumns();
+    }
+
+    @Override
+    public DataModelDesc getDataModel() {
+        return flatDesc.getDataModel();
+    }
+
+    @Override
+    public int getColumnIndex(TblColRef colRef) {
+        return flatDesc.getColumnIndex(colRef);
+    }
+
+    @Override
+    public long getSourceOffsetStart() {
+        return flatDesc.getSourceOffsetStart();
+    }
+
+    @Override
+    public long getSourceOffsetEnd() {
+        return flatDesc.getSourceOffsetEnd();
+    }
+
+    @Override
+    public TblColRef getDistributedBy() {
+        return flatDesc.getDistributedBy();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
index b7f79e1..aa4610f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
@@ -48,12 +48,13 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
 import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
 import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.ReadableTable;
 import org.slf4j.Logger;
@@ -74,8 +75,8 @@ public class CubingUtils {
 
     private static Logger logger = LoggerFactory.getLogger(CubingUtils.class);
 
-    public static Map<Long, HyperLogLogPlusCounter> sampling(CubeDesc cubeDesc, Iterable<List<String>> streams) {
-        CubeJoinedFlatTableDesc flatDesc = new CubeJoinedFlatTableDesc(cubeDesc);
+    public static Map<Long, HyperLogLogPlusCounter> sampling(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDescIn, Iterable<List<String>> streams) {
+        final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(flatDescIn, cubeDesc);
         final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
         final List<Long> allCuboidIds = new CuboidScheduler(cubeDesc).getAllCuboidIds();
         final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
index 79d2f81..7044a3e 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
@@ -23,8 +23,10 @@ import java.util.Map;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ImplementationSwitch;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.IEngineAware;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 
 public class EngineFactory {
 
@@ -46,6 +48,16 @@ public class EngineFactory {
         return streamingEngines.get(aware.getEngineType());
     }
 
+    /** Mark deprecated to indicate for test purpose only */
+    @Deprecated
+    public static IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) {
+        return batchEngine(cubeDesc).getJoinedFlatTableDesc(cubeDesc);
+    }
+    
+    public static IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) {
+        return batchEngine(newSegment).getJoinedFlatTableDesc(newSegment);
+    }
+    
     /** Build a new cube segment, typically its time range appends to the end of current cube. */
     public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
         return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter);

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
index 556893c..754dbde 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
@@ -19,9 +19,17 @@
 package org.apache.kylin.engine;
 
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 
 public interface IBatchCubingEngine {
+    
+    /** Mark deprecated to indicate for test purpose only */
+    @Deprecated
+    public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc);
+    
+    public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment);
 
     /** Build a new cube segment, typically its time range appends to the end of current cube. */
     public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter);

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
index 17a7178..0faf22a 100644
--- a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
@@ -27,8 +27,9 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -42,7 +43,7 @@ import org.junit.Test;
 public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
 
     CubeInstance cube = null;
-    CubeJoinedFlatTableDesc intermediateTableDesc = null;
+    IJoinedFlatTableDesc flatTableDesc = null;
     String fakeJobUUID = "abc-def";
     CubeSegment cubeSegment = null;
 
@@ -51,7 +52,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
         this.createTestMetadata();
         cube = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready");
         cubeSegment = cube.getSegments().get(0);
-        intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
+        flatTableDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment);
     }
 
     @After
@@ -61,7 +62,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
 
     @Test
     public void testGenCreateTableDDL() {
-        String ddl = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, "/tmp");
+        String ddl = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, "/tmp");
         System.out.println(ddl);
 
         System.out.println("The length for the ddl is " + ddl.length());
@@ -69,14 +70,14 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
 
     @Test
     public void testGenDropTableDDL() {
-        String ddl = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc);
+        String ddl = JoinedFlatTable.generateDropTableStatement(flatTableDesc);
         System.out.println(ddl);
         assertEquals(101, ddl.length());
     }
 
     @Test
     public void testGenerateInsertSql() throws IOException {
-        String sqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
+        String sqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
         System.out.println(sqls);
 
         int length = sqls.length();

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index 55ea71f..f3a4107 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -29,6 +29,8 @@ public interface IJoinedFlatTableDesc {
     DataModelDesc getDataModel();
     
     List<TblColRef> getAllColumns();
+    
+    int getColumnIndex(TblColRef colRef);
 
     long getSourceOffsetStart();
     

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
index 5198db1..681c545 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
@@ -19,12 +19,25 @@
 package org.apache.kylin.engine.mr;
 
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.IBatchCubingEngine;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 
 public class MRBatchCubingEngine implements IBatchCubingEngine {
 
     @Override
+    public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) {
+        return new CubeJoinedFlatTableDesc(cubeDesc);
+    }
+
+    @Override
+    public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) {
+        return new CubeJoinedFlatTableDesc(newSegment);
+    }
+
+    @Override
     public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
         return new BatchCubingJobBuilder(newSegment, submitter).build();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
index b3af7d7..d9fdcb9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
@@ -19,12 +19,25 @@
 package org.apache.kylin.engine.mr;
 
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.IBatchCubingEngine;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 
 public class MRBatchCubingEngine2 implements IBatchCubingEngine {
 
     @Override
+    public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) {
+        return new CubeJoinedFlatTableDesc(cubeDesc);
+    }
+
+    @Override
+    public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) {
+        return new CubeJoinedFlatTableDesc(newSegment);
+    }
+
+    @Override
     public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
         return new BatchCubingJobBuilder2(newSegment, submitter).build();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 877358b..2c3b77f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
@@ -30,6 +31,7 @@ import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
 import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.SourceFactory;
 import org.apache.kylin.storage.StorageFactory;
@@ -37,7 +39,8 @@ import org.apache.kylin.storage.StorageFactory;
 public class MRUtil {
 
     public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
-        return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(seg.getJoinedFlatTableDesc());
+        IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+        return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc);
     }
 
     public static IMRTableInputFormat getTableInputFormat(String tableName) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 4786505..588b087 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -37,7 +37,8 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
@@ -65,7 +66,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
     protected CubeDesc cubeDesc;
     protected CubeSegment cubeSegment;
     protected List<byte[]> nullBytes;
-    protected CubeJoinedFlatTableDesc intermediateTableDesc;
+    protected CubeJoinedFlatTableEnrich intermediateTableDesc;
     protected String intermediateTableRowDelimiter;
     protected byte byteRowDelimiter;
     protected int counter;
@@ -102,7 +103,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
         long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
         baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
 
-        intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
+        intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
 
         bytesSplitter = new BytesSplitter(200, 16384);
         rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 20259cb..3fa966d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -30,7 +30,8 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.MRUtil;
@@ -54,7 +55,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
     protected Text outputValue = new Text();
     protected int errorRecordCounter = 0;
 
-    protected CubeJoinedFlatTableDesc intermediateTableDesc;
+    protected CubeJoinedFlatTableEnrich intermediateTableDesc;
     protected int[] dictionaryColumnIndex;
 
     @Override
@@ -72,7 +73,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
 
         flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
 
-        intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc);
+        intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg),  cubeDesc);
         dictionaryColumnIndex = new int[factDictCols.size()];
         for (int i = 0; i < factDictCols.size(); i++) {
             TblColRef colRef = factDictCols.get(i);

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 24c37ce..1d90d01 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -39,12 +39,14 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,6 +81,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
         String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
         cubeSegment = cube.getSegmentById(segmentID);
         flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
+        IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment);
 
         Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap();
 
@@ -92,7 +95,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
             dictionaryMap.put(col, cubeSegment.getDictionary(col));
         }
 
-        DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap);
         cubeBuilder.setReserveMemoryMB(calculateReserveMB(context.getConfiguration()));
 
         ExecutorService executorService = Executors.newSingleThreadExecutor();

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java
index 4eec233..08ed207 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java
@@ -18,8 +18,10 @@
 package org.apache.kylin.engine.spark;
 
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.IBatchCubingEngine;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 
 /**
  */
@@ -52,4 +54,14 @@ public class SparkBatchCubingEngine implements IBatchCubingEngine {
     public Class<?> getStorageInterface() {
         return null;
     }
+
+    @Override
+    public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 3ccbcc8..5c2def2 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -69,12 +69,13 @@ import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
 import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
 import org.apache.kylin.cube.kv.CubeDimEncMap;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
 import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.cube.model.RowKeyDesc;
 import org.apache.kylin.cube.util.CubingUtils;
 import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
+import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter;
@@ -84,6 +85,7 @@ import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -175,9 +177,10 @@ public class SparkCubing extends AbstractApplication {
         final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
         final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
         final String[] columns = intermediateTable.columns();
+        final CubeSegment seg = cubeInstance.getSegmentById(segmentId);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
         final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
-        final CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc);
+        final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(seg), cubeDesc);
         final List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns();
         final long start = System.currentTimeMillis();
         final RowKeyDesc rowKey = cubeDesc.getRowkey();
@@ -186,7 +189,7 @@ public class SparkCubing extends AbstractApplication {
             if (!rowKey.isUseDictionary(col)) {
                 continue;
             }
-            final int rowKeyColumnIndex = flatTableDesc.getRowKeyColumnIndexes()[i];
+            final int rowKeyColumnIndex = flatDesc.getRowKeyColumnIndexes()[i];
             tblColRefMap.put(rowKeyColumnIndex, col);
         }
 
@@ -228,18 +231,19 @@ public class SparkCubing extends AbstractApplication {
             })));
         }
         final long end = System.currentTimeMillis();
-        CubingUtils.writeDictionary(cubeInstance.getSegmentById(segmentId), dictionaryMap, start, end);
+        CubingUtils.writeDictionary(seg, dictionaryMap, start, end);
         try {
             CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
-            cubeBuilder.setToUpdateSegs(cubeInstance.getSegmentById(segmentId));
+            cubeBuilder.setToUpdateSegs(seg);
             cubeManager.updateCube(cubeBuilder);
         } catch (IOException e) {
             throw new RuntimeException("Failed to deal with the request: " + e.getLocalizedMessage());
         }
     }
 
-    private Map<Long, HyperLogLogPlusCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName) throws Exception {
+    private Map<Long, HyperLogLogPlusCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName, String segmentId) throws Exception {
         CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+        CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
         CubeDesc cubeDesc = cubeInstance.getDescriptor();
         CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
         List<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds();
@@ -248,9 +252,9 @@ public class SparkCubing extends AbstractApplication {
             zeroValue.put(id, new HyperLogLogPlusCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
         }
 
-        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc);
+        CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
 
-        final int[] rowKeyColumnIndexes = flatTableDesc.getRowKeyColumnIndexes();
+        final int[] rowKeyColumnIndexes = flatDesc.getRowKeyColumnIndexes();
         final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
         final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
         final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
@@ -318,9 +322,7 @@ public class SparkCubing extends AbstractApplication {
         return samplingResult;
     }
 
-    /*
-    return hfile location
-     */
+    /** return hfile location */
     private String build(JavaRDD<List<String>> javaRDD, final String cubeName, final String segmentId, final byte[][] splitKeys) throws Exception {
         CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
         CubeDesc cubeDesc = cubeInstance.getDescriptor();
@@ -365,7 +367,8 @@ public class SparkCubing extends AbstractApplication {
 
                 LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue();
                 System.out.println("load properties finished");
-                AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder(cubeInstance.getDescriptor(), dictionaryMap);
+                IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment);
+                AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder(cubeInstance.getDescriptor(), flatDesc, dictionaryMap);
                 final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cubeInstance.getSegmentById(segmentId), columnLengthMap));
                 Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter));
                 try {
@@ -611,7 +614,7 @@ public class SparkCubing extends AbstractApplication {
             }
         });
 
-        final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(rowJavaRDD, cubeName);
+        final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(rowJavaRDD, cubeName, segmentId);
         final byte[][] splitKeys = createHTable(cubeName, segmentId, samplingResult);
 
         final String hfile = build(rowJavaRDD, cubeName, segmentId, splitKeys);

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
index 05246f4..edd9460 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
@@ -18,6 +18,7 @@
 package org.apache.kylin.engine.spark;
 
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.IMROutput2;
@@ -48,11 +49,11 @@ public class SparkCubingJobBuilder extends JobBuilderSupport {
     }
 
     public DefaultChainedExecutable build() {
-        final CubingJob result = CubingJob.createBuildJob((CubeSegment) seg, submitter, config);
+        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
         final String jobId = result.getId();
 
         inputSide.addStepPhase1_CreateFlatTable(result);
-        final IJoinedFlatTableDesc joinedFlatTableDesc = seg.getJoinedFlatTableDesc();
+        final IJoinedFlatTableDesc joinedFlatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
         final String tableName = joinedFlatTableDesc.getTableName();
         logger.info("intermediate table:" + tableName);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/engine-streaming/pom.xml b/engine-streaming/pom.xml
index e25a133..095871c 100644
--- a/engine-streaming/pom.xml
+++ b/engine-streaming/pom.xml
@@ -45,6 +45,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-core-storage</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-job</artifactId>
+        </dependency>
 
         <!-- Env & Test -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index 465a983..180f0b8 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -55,9 +55,11 @@ import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
 import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder;
 import org.apache.kylin.cube.util.CubingUtils;
+import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.streaming.StreamingBatchBuilder;
 import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
@@ -84,8 +86,10 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
         try {
             CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
             final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
+            final IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeInstance.getDescriptor());
+            
             LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>();
-            InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), dictionaryMap);
+            InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), flatDesc, dictionaryMap);
             final Future<?> future = Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, cuboidWriter));
             processedRowCount = streamingBatch.getMessages().size();
             for (StreamingMessage streamingMessage : streamingBatch.getMessages()) {
@@ -129,9 +133,10 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
     public Map<Long, HyperLogLogPlusCounter> sampling(StreamingBatch streamingBatch) {
         final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
         final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
+        final IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeInstance.getDescriptor());
         long start = System.currentTimeMillis();
 
-        final Map<Long, HyperLogLogPlusCounter> samplingResult = CubingUtils.sampling(cubeInstance.getDescriptor(), Lists.transform(streamingBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
+        final Map<Long, HyperLogLogPlusCounter> samplingResult = CubingUtils.sampling(cubeInstance.getDescriptor(), flatDesc, Lists.transform(streamingBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
             @Nullable
             @Override
             public List<String> apply(@Nullable StreamingMessage input) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
index ef69d57..4d23979 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
@@ -31,7 +31,9 @@ import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -79,7 +81,8 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         long randSeed = System.currentTimeMillis();
 
-        DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
+        DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap);
         doggedBuilder.setConcurrentThreads(THREADS);
 
         {

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
index 8923744..8827dff 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
@@ -40,7 +40,9 @@ import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -87,7 +89,8 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         long randSeed = System.currentTimeMillis();
 
-        DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
+        DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap);
         doggedBuilder.setConcurrentThreads(THREADS);
         doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
         FileRecordWriter doggedResult = new FileRecordWriter();
@@ -99,7 +102,7 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
             doggedResult.close();
         }
 
-        InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap);
         inmemBuilder.setConcurrentThreads(THREADS);
         FileRecordWriter inmemResult = new FileRecordWriter();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
index 87b222e..6612375 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
@@ -42,11 +42,13 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
 import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
+import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.junit.After;
@@ -109,7 +111,8 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
 
     private void testBuildInner() throws Exception {
 
-        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
+        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap);
         //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
         cubeBuilder.setConcurrentThreads(nThreads);
 
@@ -149,8 +152,8 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
     }
 
     static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException {
-        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor());
-        int nColumns = flatTableDesc.getAllColumns().size();
+        IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
+        int nColumns = flatDesc.getAllColumns().size();
 
         @SuppressWarnings("unchecked")
         Set<String>[] distinctSets = new Set[nColumns];
@@ -190,15 +193,15 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
     static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
         Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
         CubeDesc desc = cube.getDescriptor();
-        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc);
-        int nColumns = flatTableDesc.getAllColumns().size();
+        CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(desc), desc);
+        int nColumns = flatDesc.getAllColumns().size();
 
         List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns();
         for (int c = 0; c < columns.size(); c++) {
             TblColRef col = columns.get(c);
             if (desc.getRowkey().isUseDictionary(col)) {
                 logger.info("Building dictionary for " + col);
-                List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
+                List<byte[]> valueList = readValueList(flatTable, nColumns, flatDesc.getRowKeyColumnIndexes()[c]);
                 Dictionary<String> dict = DictionaryGenerator.buildDictionary(col.getType(), new IterableDictionaryValueEnumerator(valueList));
                 result.put(col, dict);
             }
@@ -211,7 +214,7 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
             if (dictCols.isEmpty())
                 continue;
 
-            int[] flatTableIdx = flatTableDesc.getMeasureColumnIndexes()[measureIdx];
+            int[] flatTableIdx = flatDesc.getMeasureColumnIndexes()[measureIdx];
             List<TblColRef> paramCols = func.getParameter().getColRefs();
             for (int i = 0; i < paramCols.size(); i++) {
                 TblColRef col = paramCols.get(i);

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 4e56f74..57b0965 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -34,11 +34,12 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.dimension.DimensionEncodingFactory;
+import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.rest.exception.BadRequestException;
@@ -135,7 +136,7 @@ public class CubeController extends BasicController {
     public GeneralResponse getSql(@PathVariable String cubeName, @PathVariable String segmentName) {
         CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
         CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.READY);
-        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
+        IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment);
         String sql = JoinedFlatTable.generateSelectDataStatement(flatTableDesc);
 
         GeneralResponse repsonse = new GeneralResponse();

http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index e055d9e..c3bdb75 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -46,7 +46,7 @@ import org.apache.kylin.common.util.StreamingBatch;
 import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.streaming.IStreamingInput;
 import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.engine.streaming.StreamingManager;
@@ -90,7 +90,7 @@ public class KafkaStreamingInput implements IStreamingInput {
             try {
                 final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
                 final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming);
-                List<TblColRef> columns = new CubeJoinedFlatTableDesc(cube.getDescriptor()).getAllColumns();
+                List<TblColRef> columns = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor()).getAllColumns();
 
                 final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
                 final ExecutorService executorService = Executors.newCachedThreadPool();