You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by nj...@apache.org on 2017/12/02 17:23:59 UTC

[06/19] kylin git commit: APACHE-KYLIN-2733: Introduce optimize job for adjusting cuboid set

APACHE-KYLIN-2733: Introduce optimize job for adjusting cuboid set


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/99fbd755
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/99fbd755
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/99fbd755

Branch: refs/heads/master
Commit: 99fbd755e167cfe0b4ef137dac66f28020d50415
Parents: 4cfffc2
Author: Zhong <nj...@apache.org>
Authored: Wed Aug 30 11:17:43 2017 +0800
Committer: Zhong <nj...@apache.org>
Committed: Sat Dec 2 23:21:43 2017 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/cube/CubeInstance.java     |  12 +-
 .../java/org/apache/kylin/cube/CubeManager.java |  84 ++++++-
 .../java/org/apache/kylin/cube/CubeUpdate.java  |  14 +-
 .../kylin/cube/common/RowKeySplitter.java       |   2 +-
 .../org/apache/kylin/cube/cuboid/Cuboid.java    |   5 +
 .../apache/kylin/cube/cuboid/CuboidUtil.java    |  48 ++++
 .../cube/cuboid/TreeCuboidSchedulerManager.java |   6 +-
 .../cube/inmemcubing/InMemCubeBuilder.java      |   2 +-
 .../org/apache/kylin/cube/kv/RowKeyDecoder.java |   4 +-
 .../org/apache/kylin/engine/EngineFactory.java  |   4 +
 .../apache/kylin/engine/IBatchCubingEngine.java |   3 +
 .../kylin/job/constant/ExecutableConstants.java |   5 +
 .../org/apache/kylin/job/dao/ExecutableDao.java |  18 +-
 .../org/apache/kylin/job/dao/ExecutablePO.java  |  11 +
 .../kylin/job/execution/ExecutableManager.java  |  41 +++-
 .../job/impl/threadpool/DefaultScheduler.java   |   7 +-
 .../engine/mr/BatchOptimizeJobBuilder2.java     | 227 +++++++++++++++++
 .../mr/BatchOptimizeJobCheckpointBuilder.java   |  89 +++++++
 .../org/apache/kylin/engine/mr/CubingJob.java   |   7 +-
 .../org/apache/kylin/engine/mr/IMROutput2.java  |  29 ++-
 .../kylin/engine/mr/JobBuilderSupport.java      |  54 ++++-
 .../kylin/engine/mr/MRBatchCubingEngine.java    |   5 +
 .../kylin/engine/mr/MRBatchCubingEngine2.java   |   5 +
 .../java/org/apache/kylin/engine/mr/MRUtil.java |   4 +
 .../engine/mr/common/AbstractHadoopJob.java     |  24 +-
 .../kylin/engine/mr/common/BatchConstants.java  |   1 +
 .../kylin/engine/mr/common/CubeStatsReader.java | 126 ++++++----
 .../engine/mr/common/CuboidRecommenderUtil.java |  14 +-
 .../engine/mr/common/CuboidSchedulerUtil.java   |  54 +++++
 .../engine/mr/common/CuboidStatsReaderUtil.java |   5 +-
 .../kylin/engine/mr/common/MapReduceUtil.java   | 117 +++++++++
 .../mr/common/StatisticsDecisionUtil.java       |   2 +-
 .../engine/mr/steps/BaseCuboidMapperBase.java   |   2 +-
 .../steps/CalculateStatsFromBaseCuboidJob.java  | 116 +++++++++
 .../CalculateStatsFromBaseCuboidMapper.java     | 201 ++++++++++++++++
 .../CalculateStatsFromBaseCuboidReducer.java    | 112 +++++++++
 .../engine/mr/steps/CopyDictionaryStep.java     |  70 ++++++
 .../engine/mr/steps/CubingExecutableUtil.java   |   9 +
 .../apache/kylin/engine/mr/steps/CuboidJob.java |  28 ++-
 .../mr/steps/FactDistinctColumnsMapper.java     |  37 +--
 .../mr/steps/FactDistinctColumnsReducer.java    |   1 +
 .../mr/steps/FilterRecommendCuboidDataJob.java  | 103 ++++++++
 .../steps/FilterRecommendCuboidDataMapper.java  | 107 ++++++++
 .../mr/steps/InMemCuboidFromBaseCuboidJob.java  | 154 ++++++++++++
 .../steps/InMemCuboidFromBaseCuboidMapper.java  |  96 ++++++++
 .../steps/InMemCuboidFromBaseCuboidReducer.java |  23 ++
 .../kylin/engine/mr/steps/InMemCuboidJob.java   |   4 +-
 .../engine/mr/steps/InMemCuboidMapperBase.java  |  10 +-
 .../kylin/engine/mr/steps/KVGTRecordWriter.java |   2 +-
 .../engine/mr/steps/MergeCuboidMapper.java      |   2 +-
 .../mr/steps/MergeStatisticsWithOldStep.java    | 144 +++++++++++
 .../kylin/engine/mr/steps/NDCuboidMapper.java   |  12 +-
 .../kylin/engine/mr/steps/ReducerNumSizing.java | 106 --------
 .../UpdateCubeInfoAfterCheckpointStep.java      |  69 ++++++
 .../steps/UpdateCubeInfoAfterOptimizeStep.java  |  72 ++++++
 .../mr/steps/UpdateOldCuboidShardJob.java       | 105 ++++++++
 .../mr/steps/UpdateOldCuboidShardMapper.java    | 142 +++++++++++
 .../kylin/engine/spark/SparkCubingByLayer.java  |   2 +-
 .../kylin/rest/controller/CubeController.java   |  63 +++++
 .../kylin/rest/request/JobOptimizeRequest.java  |  34 +++
 .../apache/kylin/rest/service/CubeService.java  |   4 +
 .../apache/kylin/rest/service/JobService.java   | 241 ++++++++++++++++++-
 .../storage/hbase/steps/CreateHTableJob.java    |  19 +-
 .../hbase/steps/HBaseMROutput2Transition.java   |  41 +++-
 .../kylin/storage/hbase/steps/HBaseMRSteps.java |  82 +++++++
 65 files changed, 2987 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index f6eceb6..462223a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -148,6 +148,16 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
         return segments.getMergingSegments(mergedSegment);
     }
 
+    public CubeSegment getOriginalSegmentToOptimize(CubeSegment optimizedSegment) {
+        for (CubeSegment segment : this.getSegments(SegmentStatusEnum.READY)) {
+            if (!optimizedSegment.equals(segment) //
+                    && optimizedSegment.getSegRange().equals(segment.getSegRange())) {
+                return segment;
+            }
+        }
+        return null;
+    }
+
     public CubeDesc getDescriptor() {
         return CubeDescManager.getInstance(config).getCubeDesc(descName);
     }
@@ -353,7 +363,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
             return cuboidsRecommend;
         case RECOMMEND_MISSING_WITH_BASE:
             cuboidsRecommend.removeAll(currentCuboids);
-            currentCuboids.add(getCuboidScheduler().getBaseCuboidId());
+            cuboidsRecommend.add(getCuboidScheduler().getBaseCuboidId());
             return cuboidsRecommend;
         default:
             return null;

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 388c840..5e72721 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -30,6 +30,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -76,6 +77,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * @author yangli9
@@ -406,6 +408,10 @@ public class CubeManager implements IRealizationProvider {
             cube.setCuboids(update.getCuboids());
         }
 
+        if (update.getCuboidsRecommend() != null) {
+            cube.setCuboidsRecommend(update.getCuboidsRecommend());
+        }
+
         try {
             getStore().putResource(cube.getResourcePath(), cube, CUBE_SERIALIZER);
         } catch (IllegalStateException ise) {
@@ -483,8 +489,7 @@ public class CubeManager implements IRealizationProvider {
         return newSegment;
     }
 
-    public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange)
-            throws IOException {
+    public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) throws IOException {
         checkInputRanges(tsRange, segRange);
         checkBuildingSegment(cube);
 
@@ -519,6 +524,27 @@ public class CubeManager implements IRealizationProvider {
         return newSegment;
     }
 
+    public CubeSegment[] optimizeSegments(CubeInstance cube, Set<Long> cuboidsRecommend) throws IOException {
+        checkReadyForOptimize(cube);
+
+        List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY);
+        CubeSegment[] optimizeSegments = new CubeSegment[readySegments.size()];
+        int i = 0;
+        for (CubeSegment segment : readySegments) {
+            CubeSegment newSegment = newSegment(cube, segment.getTSRange(), null);
+            validateNewSegments(cube, newSegment);
+
+            optimizeSegments[i++] = newSegment;
+        }
+
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+        cubeBuilder.setCuboidsRecommend(cuboidsRecommend);
+        cubeBuilder.setToAddSegs(optimizeSegments);
+        updateCube(cubeBuilder);
+
+        return optimizeSegments;
+    }
+
     public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force)
             throws IOException {
         if (cube.getSegments().isEmpty())
@@ -594,8 +620,15 @@ public class CubeManager implements IRealizationProvider {
     }
 
     private void checkBuildingSegment(CubeInstance cube) {
-        int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments();
-        if (cube.getBuildingSegments().size() >= maxBuldingSeg) {
+        checkBuildingSegment(cube, cube.getConfig().getMaxBuildingSegments());
+    }
+
+    public void checkReadyForOptimize(CubeInstance cube) {
+        checkBuildingSegment(cube, 1);
+    }
+
+    private void checkBuildingSegment(CubeInstance cube, int maxBuildingSeg) {
+        if (cube.getBuildingSegments().size() >= maxBuildingSeg) {
             throw new IllegalStateException(
                     "There is already " + cube.getBuildingSegments().size() + " building segment; ");
         }
@@ -725,6 +758,49 @@ public class CubeManager implements IRealizationProvider {
         updateCube(cubeBuilder);
     }
 
+    public void promoteNewlyOptimizeSegments(CubeInstance cube, CubeSegment... optimizedSegments) throws IOException {
+        for (CubeSegment seg : optimizedSegments) {
+            seg.setStatus(SegmentStatusEnum.READY_PENDING);
+        }
+
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+        cubeBuilder.setToUpdateSegs(optimizedSegments);
+        updateCube(cubeBuilder);
+    }
+
+    public void promoteCheckpointOptimizeSegments(CubeInstance cube, Map<Long, Long> recommendCuboids,
+            CubeSegment... optimizedSegments) throws IOException {
+        if (cube.getSegments().size() != optimizedSegments.length * 2) {
+            throw new IllegalStateException("For cube " + cube
+                    + ", every READY segment should be optimized and all segments should be READY before optimizing");
+        }
+        CubeSegment[] originalSegments = new CubeSegment[optimizedSegments.length];
+        int i = 0;
+        for (CubeSegment seg : optimizedSegments) {
+            originalSegments[i++] = cube.getOriginalSegmentToOptimize(seg);
+
+            if (StringUtils.isBlank(seg.getStorageLocationIdentifier()))
+                throw new IllegalStateException(
+                        "For cube " + cube + ", segment " + seg + " missing StorageLocationIdentifier");
+
+            if (StringUtils.isBlank(seg.getLastBuildJobID()))
+                throw new IllegalStateException("For cube " + cube + ", segment " + seg + " missing LastBuildJobID");
+
+            seg.setStatus(SegmentStatusEnum.READY);
+        }
+
+        logger.info("Promoting cube " + cube + ", new segments " + Arrays.toString(optimizedSegments)
+                + ", to remove segments " + originalSegments);
+
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+        cubeBuilder.setToRemoveSegs(originalSegments) //
+                .setToUpdateSegs(optimizedSegments) //
+                .setStatus(RealizationStatusEnum.READY) //
+                .setCuboids(recommendCuboids) //
+                .setCuboidsRecommend(Sets.<Long> newHashSet());
+        updateCube(cubeBuilder);
+    }
+
     public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) {
         List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments);
         List<CubeSegment> newList = Arrays.asList(newSegments);

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
index fae20dc..2e1d652 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.cube;
 
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 
@@ -34,6 +35,7 @@ public class CubeUpdate {
     private String owner;
     private int cost = -1;
     private Map<Long, Long> cuboids = null;
+    private Set<Long> cuboidsRecommend = null;
 
     public CubeUpdate(CubeInstance cubeInstance) {
         this.cubeInstance = cubeInstance;
@@ -106,7 +108,17 @@ public class CubeUpdate {
         return cuboids;
     }
 
-    public void setCuboids(Map<Long, Long> cuboids) {
+    public CubeUpdate setCuboids(Map<Long, Long> cuboids) {
         this.cuboids = cuboids;
+        return this;
+    }
+
+    public Set<Long> getCuboidsRecommend() {
+        return cuboidsRecommend;
+    }
+
+    public CubeUpdate setCuboidsRecommend(Set<Long> cuboidsRecommend) {
+        this.cuboidsRecommend = cuboidsRecommend;
+        return this;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
index cd26347..0c54ecf 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
@@ -115,7 +115,7 @@ public class RowKeySplitter implements java.io.Serializable {
         offset += RowConstants.ROWKEY_CUBOIDID_LEN;
 
         long lastSplittedCuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length);
-        Cuboid cuboid = Cuboid.findById(cubeSegment, lastSplittedCuboidId);
+        Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, lastSplittedCuboidId);
 
         // rowkey columns
         for (int i = 0; i < cuboid.getColumns().size(); i++) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
index efd2e2e..3c4fceb 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
@@ -88,6 +88,11 @@ public class Cuboid implements Comparable<Cuboid>, Serializable {
         return cuboidID;
     }
 
+    // for mandatory cuboid, no need to translate cuboid
+    public static Cuboid findForMandatory(CubeDesc cube, long cuboidID) {
+        return new Cuboid(cube, cuboidID, cuboidID);
+    }
+
     public static Cuboid findById(CuboidScheduler cuboidScheduler, byte[] cuboidID) {
         return findById(cuboidScheduler, Bytes.toLong(cuboidID));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
new file mode 100644
index 0000000..a84f153
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.cuboid;
+
+import com.google.common.base.Preconditions;
+
+public class CuboidUtil {
+
+    public static Integer[][] getCuboidBitSet(Long[] cuboidIds, int nRowKey) {
+        Preconditions.checkArgument(nRowKey < Long.SIZE,
+                "the size of row key could not be large than " + (Long.SIZE - 1));
+
+        Integer[][] allCuboidsBitSet = new Integer[cuboidIds.length][];
+
+        for (int j = 0; j < cuboidIds.length; j++) {
+            Long cuboidId = cuboidIds[j];
+
+            allCuboidsBitSet[j] = new Integer[Long.bitCount(cuboidId)];
+
+            long mask = 1L << (nRowKey - 1);
+            int position = 0;
+            for (int i = 0; i < nRowKey; i++) {
+                if ((mask & cuboidId) > 0) {
+                    allCuboidsBitSet[j][position] = i;
+                    position++;
+                }
+                mask = mask >> 1;
+            }
+        }
+        return allCuboidsBitSet;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
index 5e8d965..22e636b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
@@ -64,7 +64,7 @@ public class TreeCuboidSchedulerManager {
      * @param cubeName
      * @return null if the cube has no pre-built cuboids
      */
-    public static TreeCuboidScheduler getTreeCuboidScheduler(String cubeName) {
+    public TreeCuboidScheduler getTreeCuboidScheduler(String cubeName) {
         TreeCuboidScheduler result = cache.get(cubeName);
         if (result == null) {
             CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
@@ -83,14 +83,14 @@ public class TreeCuboidSchedulerManager {
         return result;
     }
 
-    public static TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, Map<Long, Long> cuboidsWithRowCnt) {
+    public TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, Map<Long, Long> cuboidsWithRowCnt) {
         if (cuboidsWithRowCnt == null || cuboidsWithRowCnt.isEmpty()) {
             return null;
         }
         return getTreeCuboidScheduler(cubeDesc, Lists.newArrayList(cuboidsWithRowCnt.keySet()), cuboidsWithRowCnt);
     }
 
-    public static TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, List<Long> cuboidIds,
+    public TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, List<Long> cuboidIds,
             Map<Long, Long> cuboidsWithRowCnt) {
         if (cuboidIds == null || cuboidsWithRowCnt == null) {
             return null;

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index f63b53f..97bb1de 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -108,7 +108,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     }
 
     private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
-        GTInfo info = CubeGridTable.newGTInfo(Cuboid.findById(cuboidScheduler, cuboidID),
+        GTInfo info = CubeGridTable.newGTInfo(Cuboid.findForMandatory(cubeDesc, cuboidID),
                 new CubeDimEncMap(cubeDesc, dictionaryMap)
         );
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
index 5a1f668..bb03c4c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
@@ -36,7 +36,6 @@ import org.apache.kylin.metadata.model.TblColRef;
  */
 public class RowKeyDecoder {
 
-    private final CubeSegment cubeSegment;
     private final CubeDesc cubeDesc;
     private final RowKeyColumnIO colIO;
     private final RowKeySplitter rowKeySplitter;
@@ -45,7 +44,6 @@ public class RowKeyDecoder {
     private List<String> values;
 
     public RowKeyDecoder(CubeSegment cubeSegment) {
-        this.cubeSegment = cubeSegment;
         this.cubeDesc = cubeSegment.getCubeDesc();
         this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 255);
         this.colIO = new RowKeyColumnIO(cubeSegment.getDimensionEncodingMap());
@@ -75,7 +73,7 @@ public class RowKeyDecoder {
         if (this.cuboid != null && this.cuboid.getId() == cuboidID) {
             return;
         }
-        this.cuboid = Cuboid.findById(cubeSegment, cuboidID);
+        this.cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID);
     }
 
     private void collectValue(TblColRef col, byte[] valueBytes, int length) throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
index 78b1efe..03d986b 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
@@ -61,4 +61,8 @@ public class EngineFactory {
         return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter);
     }
 
+    /** Optimize a segment based on the cuboid recommend list produced by the cube planner. */
+    public static DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter) {
+        return batchEngine(optimizeSegment).createBatchOptimizeJob(optimizeSegment, submitter);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
index 754dbde..a618eac 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
@@ -37,6 +37,9 @@ public interface IBatchCubingEngine {
     /** Merge multiple small segments into a big one. */
     public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
 
+    /** Optimize a segment based on the cuboid recommend list produced by the cube planner. */
+    public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter);
+
     public Class<?> getSourceInterface();
 
     public Class<?> getStorageInterface();

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 36496fe..f3caf3b 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -37,6 +37,9 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
     public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables";
     public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
+    public static final String STEP_NAME_CALCULATE_STATS_FROM_BASE_CUBOID = "Calculate Statistics from Base Cuboid";
+    public static final String STEP_NAME_FILTER_RECOMMEND_CUBOID_DATA_FOR_OPTIMIZATION = "Filter Recommend Cuboid Data for Optimization";
+    public static final String STEP_NAME_UPDATE_OLD_CUBOID_SHARD = "Update Old Cuboid Shard for Optimization";
     public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid";
     public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube In-Mem";
     public static final String STEP_NAME_BUILD_SPARK_CUBE = "Build Cube with Spark";
@@ -45,8 +48,10 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable";
     public static final String STEP_NAME_CONVERT_CUBOID_TO_HFILE = "Convert Cuboid Data to HFile";
     public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table";
+    public static final String STEP_NAME_COPY_DICTIONARY = "Copy dictionary from Old Segment";
     public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
     public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics";
+    public static final String STEP_NAME_MERGE_STATISTICS_WITH_OLD = "Merge Cuboid Statistics with Old for Optimization";
     public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics";
     public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
     public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 16875b1..9cfc61a 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -92,8 +92,8 @@ public class ExecutableDao {
         return store.getResource(path, ExecutablePO.class, JOB_SERIALIZER);
     }
 
-    private void writeJobResource(String path, ExecutablePO job) throws IOException {
-        store.putResource(path, job, JOB_SERIALIZER);
+    private long writeJobResource(String path, ExecutablePO job) throws IOException {
+        return store.putResource(path, job, JOB_SERIALIZER);
     }
 
     private ExecutableOutputPO readJobOutputResource(String path) throws IOException {
@@ -179,6 +179,20 @@ public class ExecutableDao {
         }
     }
 
+    public ExecutablePO updateJob(ExecutablePO job) throws PersistentException {
+        try {
+            if (getJob(job.getUuid()) == null) {
+                throw new IllegalArgumentException("job id:" + job.getUuid() + " does not exist");
+            }
+            final long ts = writeJobResource(pathOfJob(job), job);
+            job.setLastModified(ts);
+            return job;
+        } catch (IOException e) {
+            logger.error("error update job:" + job.getUuid(), e);
+            throw new PersistentException(e);
+        }
+    }
+
     public void deleteJob(String uuid) throws PersistentException {
         try {
             store.deleteResource(pathOfJob(uuid));

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
index 75717e0..f48c876 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
@@ -39,6 +39,9 @@ public class ExecutablePO extends RootPersistentEntity {
     @JsonProperty("tasks")
     private List<ExecutablePO> tasks;
 
+    @JsonProperty("tasks_check")
+    private List<ExecutablePO> tasksForCheck;
+
     @JsonProperty("type")
     private String type;
 
@@ -61,6 +64,14 @@ public class ExecutablePO extends RootPersistentEntity {
         this.tasks = tasks;
     }
 
+    public List<ExecutablePO> getTasksForCheck() {
+        return tasksForCheck;
+    }
+
+    public void setTasksForCheck(List<ExecutablePO> tasksForCheck) {
+        this.tasksForCheck = tasksForCheck;
+    }
+
     public String getType() {
         return type;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index bab8c30..bc38fff 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -96,6 +96,13 @@ public class ExecutableManager {
             }
             result.setTasks(tasks);
         }
+        if (executable instanceof CheckpointExecutable) {
+            List<ExecutablePO> tasksForCheck = Lists.newArrayList();
+            for (AbstractExecutable taskForCheck : ((CheckpointExecutable) executable).getSubTasksForCheck()) {
+                tasksForCheck.add(parse(taskForCheck));
+            }
+            result.setTasksForCheck(tasksForCheck);
+        }
         return result;
     }
 
@@ -121,6 +128,23 @@ public class ExecutableManager {
         }
     }
 
+    public void updateCheckpointJob(String jobId, List<AbstractExecutable> subTasksForCheck) {
+        try {
+            final ExecutablePO job = executableDao.getJob(jobId);
+            Preconditions.checkArgument(job != null, "there is no related job for job id:" + jobId);
+
+            List<ExecutablePO> tasksForCheck = Lists.newArrayListWithExpectedSize(subTasksForCheck.size());
+            for (AbstractExecutable taskForCheck : subTasksForCheck) {
+                tasksForCheck.add(parse(taskForCheck));
+            }
+            job.setTasksForCheck(tasksForCheck);
+            executableDao.updateJob(job);
+        } catch (PersistentException e) {
+            logger.error("fail to update checkpoint job:" + jobId, e);
+            throw new RuntimeException(e);
+        }
+    }
+
     //for ut
     public void deleteJob(String jobId) {
         try {
@@ -349,7 +373,15 @@ public class ExecutableManager {
         if (job == null) {
             return;
         }
-
+        if (job.getStatus().isFinalState()) {
+            if (job.getStatus() != ExecutableState.DISCARDED) {
+                logger.warn("The status of job " + jobId + " is " + job.getStatus().toString()
+                        + ". It's final state and cannot be transfer to be discarded!!!");
+            } else {
+                logger.warn("The job " + jobId + " has been discarded.");
+            }
+            return;
+        }
         if (job instanceof DefaultChainedExecutable) {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
             for (AbstractExecutable task : tasks) {
@@ -501,6 +533,13 @@ public class ExecutableManager {
                     ((ChainedExecutable) result).addTask(parseTo(subTask));
                 }
             }
+            List<ExecutablePO> tasksForCheck = executablePO.getTasksForCheck();
+            if (tasksForCheck != null && !tasksForCheck.isEmpty()) {
+                Preconditions.checkArgument(result instanceof CheckpointExecutable);
+                for (ExecutablePO subTaskForCheck : tasksForCheck) {
+                    ((CheckpointExecutable) result).addTaskForCheck(parseTo(subTaskForCheck));
+                }
+            }
             return result;
         } catch (ReflectiveOperationException e) {
             throw new IllegalStateException("cannot parse this job:" + executablePO.getId(), e);

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 64a7db7..6ef9c81 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -88,8 +88,9 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
                         nRunning++;
                         continue;
                     }
-                    final Output output = executableManager.getOutput(id);
-                    if ((output.getState() != ExecutableState.READY)) {
+                    final AbstractExecutable executable = executableManager.getJob(id);
+                    if (!executable.isReady()) {
+                        final Output output = executableManager.getOutput(id);
                         // logger.debug("Job id:" + id + " not runnable");
                         if (output.getState() == ExecutableState.DISCARDED) {
                             nDiscarded++;
@@ -105,10 +106,8 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
                         continue;
                     }
                     nReady++;
-                    AbstractExecutable executable = null;
                     String jobDesc = null;
                     try {
-                        executable = executableManager.getJob(id);
                         jobDesc = executable.toString();
                         logger.info(jobDesc + " prepare to schedule");
                         context.addRunningJob(executable);

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
new file mode 100644
index 0000000..a8127cc
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.CopyDictionaryStep;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.mr.steps.FilterRecommendCuboidDataJob;
+import org.apache.kylin.engine.mr.steps.InMemCuboidFromBaseCuboidJob;
+import org.apache.kylin.engine.mr.steps.MergeStatisticsWithOldStep;
+import org.apache.kylin.engine.mr.steps.NDCuboidJob;
+import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterOptimizeStep;
+import org.apache.kylin.engine.mr.steps.UpdateOldCuboidShardJob;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class BatchOptimizeJobBuilder2 extends JobBuilderSupport {
+    private static final Logger logger = LoggerFactory.getLogger(BatchOptimizeJobBuilder2.class);
+
+    private final IMROutput2.IMRBatchOptimizeOutputSide2 outputSide;
+
+    public BatchOptimizeJobBuilder2(CubeSegment optimizeSegment, String submitter) {
+        super(optimizeSegment, submitter);
+        this.outputSide = MRUtil.getBatchOptimizeOutputSide2(optimizeSegment);
+    }
+
+    public CubingJob build() {
+        logger.info("MR_V2 new job to OPTIMIZE a segment " + seg);
+
+        final CubingJob result = CubingJob.createOptimizeJob(seg, submitter, config);
+        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+
+        final String jobId = result.getId();
+        final String cuboidRootPath = getCuboidRootPath(jobId);
+        final String optimizeCuboidRootPath = getOptimizationCuboidPath(jobId);
+
+        CubeSegment oldSegment = seg.getCubeInstance().getOriginalSegmentToOptimize(seg);
+        Preconditions.checkNotNull(oldSegment, "cannot find the original segment to be optimized by " + seg);
+
+        // Phase 1: Prepare base cuboid data from old segment
+        String oldcuboidRootPath = getCuboidRootPath(oldSegment) + "*";
+        result.addTask(createFilterRecommendCuboidDataStep(oldcuboidRootPath, optimizeCuboidRootPath));
+
+        // Phase 2: Prepare dictionary and statistics for new segment
+        result.addTask(createCopyDictionaryStep());
+        String optStatsSourcePath = getBaseCuboidPath(optimizeCuboidRootPath);
+        String optStatsDstPath = getOptimizationStatisticsPath(jobId);
+        result.addTask(createCalculateStatsFromBaseCuboid(optStatsSourcePath, optStatsDstPath,
+                CuboidModeEnum.RECOMMEND_MISSING));
+        result.addTask(createMergeStatisticsWithOldStep(jobId, optStatsDstPath, getStatisticsPath(jobId)));
+        outputSide.addStepPhase2_CreateHTable(result);
+
+        result.addTask(createUpdateShardForOldCuboidDataStep(optimizeCuboidRootPath + "*", cuboidRootPath));
+
+        // Phase 3: Build Cube for Missing Cuboid Data
+        addLayerCubingSteps(result, jobId, CuboidModeEnum.RECOMMEND_MISSING_WITH_BASE, cuboidRootPath); // layer cubing
+        result.addTask(createInMemCubingStep(jobId, CuboidModeEnum.RECOMMEND_MISSING_WITH_BASE, cuboidRootPath));
+
+        outputSide.addStepPhase3_BuildCube(result);
+
+        // Phase 4: Update Metadata & Cleanup
+        result.addTask(createUpdateCubeInfoAfterOptimizeStep(jobId));
+        outputSide.addStepPhase4_Cleanup(result);
+
+        return result;
+    }
+
+    public MapReduceExecutable createFilterRecommendCuboidDataStep(String inputPath, String outputPath) {
+        MapReduceExecutable result = new MapReduceExecutable();
+        result.setName(ExecutableConstants.STEP_NAME_FILTER_RECOMMEND_CUBOID_DATA_FOR_OPTIMIZATION);
+
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+                "Kylin_Filter_Recommend_Cuboid_Data_" + seg.getRealization().getName());
+
+        result.setMapReduceParams(cmd.toString());
+        result.setMapReduceJobClass(FilterRecommendCuboidDataJob.class);
+        return result;
+    }
+
+    public CopyDictionaryStep createCopyDictionaryStep() {
+        CopyDictionaryStep result = new CopyDictionaryStep();
+        result.setName(ExecutableConstants.STEP_NAME_COPY_DICTIONARY);
+
+        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+        return result;
+    }
+
+    private MapReduceExecutable createUpdateShardForOldCuboidDataStep(String inputPath, String outputPath) {
+        MapReduceExecutable result = new MapReduceExecutable();
+        result.setName(ExecutableConstants.STEP_NAME_UPDATE_OLD_CUBOID_SHARD);
+
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+                "Kylin_Update_Old_Cuboid_Shard_" + seg.getRealization().getName());
+
+        result.setMapReduceParams(cmd.toString());
+        result.setMapReduceJobClass(UpdateOldCuboidShardJob.class);
+        return result;
+    }
+
+    private MergeStatisticsWithOldStep createMergeStatisticsWithOldStep(final String jobId, final String optStatsPath,
+            final String mergedStatisticsFolder) {
+        MergeStatisticsWithOldStep result = new MergeStatisticsWithOldStep();
+        result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS_WITH_OLD);
+
+        CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
+        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+        CubingExecutableUtil.setStatisticsPath(optStatsPath, result.getParams());
+        CubingExecutableUtil.setMergedStatisticsPath(mergedStatisticsFolder, result.getParams());
+
+        return result;
+    }
+
+    private void addLayerCubingSteps(final CubingJob result, final String jobId, final CuboidModeEnum cuboidMode,
+            final String cuboidRootPath) {
+        // Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime
+        final int maxLevel = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
+        // Don't need to build base cuboid
+        // n dim cuboid steps
+        for (int i = 1; i <= maxLevel; i++) {
+            String parentCuboidPath = i == 1 ? getBaseCuboidPath(cuboidRootPath)
+                    : getCuboidOutputPathsByLevel(cuboidRootPath, i - 1);
+            result.addTask(createNDimensionCuboidStep(parentCuboidPath,
+                    getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId, cuboidMode));
+        }
+    }
+
+    private MapReduceExecutable createNDimensionCuboidStep(String parentPath, String outputPath, int level,
+            String jobId, CuboidModeEnum cuboidMode) {
+        // ND cuboid job
+        MapReduceExecutable ndCuboidStep = new MapReduceExecutable();
+
+        ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : level " + level);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, parentPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+                "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "" + level);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString());
+
+        ndCuboidStep.setMapReduceParams(cmd.toString());
+        ndCuboidStep.setMapReduceJobClass(getNDCuboidJob());
+        return ndCuboidStep;
+    }
+
+    private MapReduceExecutable createInMemCubingStep(String jobId, CuboidModeEnum cuboidMode, String cuboidRootPath) {
+        MapReduceExecutable cubeStep = new MapReduceExecutable();
+
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd, JobEngineConfig.IN_MEM_JOB_CONF_SUFFIX);
+
+        cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
+
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getBaseCuboidPath(cuboidRootPath));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getInMemCuboidPath(cuboidRootPath));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+                "Kylin_Cube_Builder_" + seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString());
+
+        cubeStep.setMapReduceParams(cmd.toString());
+        cubeStep.setMapReduceJobClass(InMemCuboidFromBaseCuboidJob.class);
+        cubeStep.setCounterSaveAs(
+                CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
+        return cubeStep;
+    }
+
+    public UpdateCubeInfoAfterOptimizeStep createUpdateCubeInfoAfterOptimizeStep(String jobId) {
+        final UpdateCubeInfoAfterOptimizeStep result = new UpdateCubeInfoAfterOptimizeStep();
+        result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+
+        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+        CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
+
+        return result;
+    }
+
+    protected Class<? extends AbstractHadoopJob> getNDCuboidJob() {
+        return NDCuboidJob.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobCheckpointBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobCheckpointBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobCheckpointBuilder.java
new file mode 100644
index 0000000..1a779d2
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobCheckpointBuilder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterCheckpointStep;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.CheckpointExecutable;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+
+import com.google.common.base.Preconditions;
+
+public class BatchOptimizeJobCheckpointBuilder {
+
+    protected static SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
+
+    final protected CubeInstance cube;
+    final protected String submitter;
+
+    private final IMROutput2.IMRBatchOptimizeOutputSide2 outputSide;
+
+    public BatchOptimizeJobCheckpointBuilder(CubeInstance cube, String submitter) {
+        this.cube = cube;
+        this.submitter = submitter;
+
+        Preconditions.checkNotNull(cube.getFirstSegment(), "Cube " + cube + " is empty!!!");
+        this.outputSide = MRUtil.getBatchOptimizeOutputSide2(cube.getFirstSegment());
+    }
+
+    public CheckpointExecutable build() {
+        KylinConfig kylinConfig = cube.getConfig();
+        List<ProjectInstance> projList = ProjectManager.getInstance(kylinConfig).findProjects(cube.getType(),
+                cube.getName());
+        if (projList == null || projList.size() == 0) {
+            throw new RuntimeException("Cannot find the project containing the cube " + cube.getName() + "!!!");
+        } else if (projList.size() >= 2) {
+            throw new RuntimeException("Find more than one project containing the cube " + cube.getName()
+                    + ". It does't meet the uniqueness requirement!!! ");
+        }
+
+        CheckpointExecutable checkpointJob = new CheckpointExecutable();
+        checkpointJob.setSubmitter(submitter);
+        CubingExecutableUtil.setCubeName(cube.getName(), checkpointJob.getParams());
+        checkpointJob.setName(
+                cube.getName() + " - OPTIMIZE CHECKPOINT - " + format.format(new Date(System.currentTimeMillis())));
+        checkpointJob.setDeployEnvName(kylinConfig.getDeployEnv());
+        checkpointJob.setProjectName(projList.get(0).getName());
+
+        // Phase 1: Update cube information
+        checkpointJob.addTask(createUpdateCubeInfoAfterCheckpointStep());
+
+        // Phase 2: Garbage collection
+        outputSide.addStepPhase5_Cleanup(checkpointJob);
+
+        return checkpointJob;
+    }
+
+    private UpdateCubeInfoAfterCheckpointStep createUpdateCubeInfoAfterCheckpointStep() {
+        UpdateCubeInfoAfterCheckpointStep result = new UpdateCubeInfoAfterCheckpointStep();
+        result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+
+        CubingExecutableUtil.setCubeName(cube.getName(), result.getParams());
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 4e7bcdd..71b62a0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -65,7 +65,7 @@ public class CubingJob extends DefaultChainedExecutable {
     }
 
     public enum CubingJobTypeEnum {
-        BUILD("BUILD"), MERGE("MERGE");
+        BUILD("BUILD"), OPTIMIZE("OPTIMIZE"), MERGE("MERGE");
 
         private final String name;
 
@@ -106,6 +106,10 @@ public class CubingJob extends DefaultChainedExecutable {
         return initCubingJob(seg, CubingJobTypeEnum.BUILD.toString(), submitter, config);
     }
 
+    public static CubingJob createOptimizeJob(CubeSegment seg, String submitter, JobEngineConfig config) {
+        return initCubingJob(seg, CubingJobTypeEnum.OPTIMIZE.toString(), submitter, config);
+    }
+
     public static CubingJob createMergeJob(CubeSegment seg, String submitter, JobEngineConfig config) {
         return initCubingJob(seg, CubingJobTypeEnum.MERGE.toString(), submitter, config);
     }
@@ -135,6 +139,7 @@ public class CubingJob extends DefaultChainedExecutable {
         result.setJobType(jobType);
         CubingExecutableUtil.setCubeName(seg.getCubeInstance().getName(), result.getParams());
         CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+        CubingExecutableUtil.setSegmentName(seg.getName(), result.getParams());
         result.setName(jobType + " CUBE - " + seg.getCubeInstance().getName() + " - " + seg.getName() + " - "
                 + format.format(new Date(System.currentTimeMillis())));
         result.setSubmitter(submitter);

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
index 69bba0a..e70b497 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
 public interface IMROutput2 {
@@ -67,7 +68,7 @@ public interface IMROutput2 {
         public void configureJobInput(Job job, String input) throws Exception;
 
         /** Configure the OutputFormat of given job. */
-        public void configureJobOutput(Job job, String output, CubeSegment segment, int level) throws Exception;
+        public void configureJobOutput(Job job, String output, CubeSegment segment, CuboidScheduler cuboidScheduler, int level) throws Exception;
 
     }
 
@@ -113,4 +114,30 @@ public interface IMROutput2 {
         public CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube);
     }
 
+    public IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide(CubeSegment seg);
+
+    /**
+     * Participate the batch cubing flow as the output side. Responsible for saving
+     * the cuboid output to storage at the end of Phase 3.
+     *
+     * - Phase 1: Filter Recommended Cuboid Data
+     * - Phase 2: Copy Dictionary & Calculate Statistics & Update Reused Cuboid Shard
+     * - Phase 3: Build Cube
+     * - Phase 4: Cleanup Optimize
+     * - Phase 5: Update Metadata & Cleanup
+     */
+    public interface IMRBatchOptimizeOutputSide2 {
+
+        /** Create HTable based on recommended cuboids & statistics*/
+        public void addStepPhase2_CreateHTable(DefaultChainedExecutable jobFlow);
+
+        /** Build only missing cuboids*/
+        public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
+
+        /** Cleanup intermediate cuboid data on HDFS*/
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+
+        /** Invoked by Checkpoint job & Cleanup old segments' HTables and related working directory*/
+        public void addStepPhase5_Cleanup(DefaultChainedExecutable jobFlow);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 2a51c89..694c936 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -22,14 +22,16 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.CalculateStatsFromBaseCuboidJob;
 import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
-import org.apache.kylin.engine.mr.steps.UHCDictionaryJob;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
 import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
+import org.apache.kylin.engine.mr.steps.UHCDictionaryJob;
 import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
 import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -48,6 +50,10 @@ public class JobBuilderSupport {
 
     final public static String LayeredCuboidFolderPrefix = "level_";
 
+    final public static String PathNameCuboidBase = "base_cuboid";
+    final public static String PathNameCuboidOld = "old";
+    final public static String PathNameCuboidInMem = "in_memory";
+
     public JobBuilderSupport(CubeSegment seg, String submitter) {
         Preconditions.checkNotNull(seg, "segment cannot be null");
         this.config = new JobEngineConfig(seg.getConfig());
@@ -98,6 +104,31 @@ public class JobBuilderSupport {
         return result;
     }
 
+    public MapReduceExecutable createCalculateStatsFromBaseCuboid(String inputPath, String outputPath) {
+        return createCalculateStatsFromBaseCuboid(inputPath, outputPath, CuboidModeEnum.CURRENT);
+    }
+
+    public MapReduceExecutable createCalculateStatsFromBaseCuboid(String inputPath, String outputPath,
+            CuboidModeEnum cuboidMode) {
+        MapReduceExecutable result = new MapReduceExecutable();
+        result.setName(ExecutableConstants.STEP_NAME_CALCULATE_STATS_FROM_BASE_CUBOID);
+        result.setMapReduceJobClass(CalculateStatsFromBaseCuboidJob.class);
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_SAMPLING_PERCENT,
+                String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+                "Calculate_Stats_For_Segment_" + seg.getRealization().getName() + "_Step");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString());
+
+        result.setMapReduceParams(cmd.toString());
+        return result;
+    }
+
     public HadoopShellExecutable createBuildDictionaryStep(String jobId) {
         // base cuboid job
         HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
@@ -197,6 +228,18 @@ public class JobBuilderSupport {
         return getRealizationRootPath(jobId) + "/dict";
     }
 
+    public String getOptimizationRootPath(String jobId) {
+        return getRealizationRootPath(jobId) + "/optimize";
+    }
+
+    public String getOptimizationStatisticsPath(String jobId) {
+        return getOptimizationRootPath(jobId) + "/statistics";
+    }
+
+    public String getOptimizationCuboidPath(String jobId) {
+        return getOptimizationRootPath(jobId) + "/cuboid/";
+    }
+
     // ============================================================================
     // static methods also shared by other job flow participant
     // ----------------------------------------------------------------------------
@@ -218,10 +261,17 @@ public class JobBuilderSupport {
 
     public static String getCuboidOutputPathsByLevel(String cuboidRootPath, int level) {
         if (level == 0) {
-            return cuboidRootPath + LayeredCuboidFolderPrefix + "base_cuboid";
+            return cuboidRootPath + LayeredCuboidFolderPrefix + PathNameCuboidBase;
         } else {
             return cuboidRootPath + LayeredCuboidFolderPrefix + level + "_cuboid";
         }
     }
 
+    public static String getBaseCuboidPath(String cuboidRootPath) {
+        return cuboidRootPath + PathNameCuboidBase;
+    }
+
+    public static String getInMemCuboidPath(String cuboidRootPath) {
+        return cuboidRootPath + PathNameCuboidInMem;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
index 681c545..74c9b6d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
@@ -48,6 +48,11 @@ public class MRBatchCubingEngine implements IBatchCubingEngine {
     }
 
     @Override
+    public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter) {
+        return new BatchOptimizeJobBuilder2(optimizeSegment, submitter).build();
+    }
+
+    @Override
     public Class<?> getSourceInterface() {
         return IMRInput.class;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
index d9fdcb9..665e791 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
@@ -48,6 +48,11 @@ public class MRBatchCubingEngine2 implements IBatchCubingEngine {
     }
 
     @Override
+    public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter) {
+        return new BatchOptimizeJobBuilder2(optimizeSegment, submitter).build();
+    }
+
+    @Override
     public Class<?> getSourceInterface() {
         return IMRInput.class;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index b2a2ea3..124e5e7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -76,6 +76,10 @@ public class MRUtil {
         return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
     }
 
+    public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
+        return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg);
+    }
+    
     // use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale
     // Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe
     public static int runMRJob(Tool tool, String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index e6c208b..b63ef16 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -24,12 +24,14 @@ package org.apache.kylin.engine.mr.common;
  */
 
 import static org.apache.hadoop.util.StringUtils.formatTime;
+import static org.apache.kylin.engine.mr.common.JobRelatedMetaUtil.collectCubeMetadata;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -114,6 +116,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder
             .withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(false)
             .withDescription("Statistics sampling percentage").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
+    protected static final Option OPTION_CUBOID_MODE = OptionBuilder.withArgName(BatchConstants.ARG_CUBOID_MODE)
+            .hasArg().isRequired(false).withDescription("Cuboid Mode").create(BatchConstants.ARG_CUBOID_MODE);
 
     private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
 
@@ -492,27 +496,41 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     }
 
     protected void attachCubeMetadata(CubeInstance cube, Configuration conf) throws IOException {
-        dumpKylinPropsAndMetadata(cube.getProject(), JobRelatedMetaUtil.collectCubeMetadata(cube), cube.getConfig(),
+        dumpKylinPropsAndMetadata(cube.getProject(), collectCubeMetadata(cube), cube.getConfig(),
                 conf);
     }
 
     protected void attachCubeMetadataWithDict(CubeInstance cube, Configuration conf) throws IOException {
         Set<String> dumpList = new LinkedHashSet<>();
-        dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(cube));
+        dumpList.addAll(collectCubeMetadata(cube));
         for (CubeSegment segment : cube.getSegments()) {
             dumpList.addAll(segment.getDictionaryPaths());
         }
         dumpKylinPropsAndMetadata(cube.getProject(), dumpList, cube.getConfig(), conf);
     }
 
+    protected void attachSegmentsMetadataWithDict(List<CubeSegment> segments, Configuration conf) throws IOException {
+        Set<String> dumpList = new LinkedHashSet<>();
+        CubeInstance cube = segments.get(0).getCubeInstance();
+        dumpList.addAll(collectCubeMetadata(cube));
+        for (CubeSegment segment : segments) {
+            dumpList.addAll(segment.getDictionaryPaths());
+        }
+        dumpKylinPropsAndMetadata(cube.getProject(), dumpList, cube.getConfig(), conf);
+    }
+
     protected void attachSegmentMetadataWithDict(CubeSegment segment, Configuration conf) throws IOException {
+        attachSegmentMetadata(segment, conf, true, false);
+    }
+
+    protected void attachSegmentMetadataWithAll(CubeSegment segment, Configuration conf) throws IOException {
         attachSegmentMetadata(segment, conf, true, true);
     }
 
     protected void attachSegmentMetadata(CubeSegment segment, Configuration conf, boolean ifDictIncluded,
             boolean ifStatsIncluded) throws IOException {
         Set<String> dumpList = new LinkedHashSet<>();
-        dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
+        dumpList.addAll(collectCubeMetadata(segment.getCubeInstance()));
         if (ifDictIncluded) {
             dumpList.addAll(segment.getDictionaryPaths());
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index aaf2654..1d6a582 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -75,6 +75,7 @@ public interface BatchConstants {
     String ARG_INPUT = "input";
     String ARG_OUTPUT = "output";
     String ARG_PROJECT = "project";
+    String ARG_CUBOID_MODE = "cuboidMode";
     String ARG_JOB_NAME = "jobname";
     String ARG_CUBING_JOB_ID = "cubingJobId";
     String ARG_CUBE_NAME = "cubename";

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index c82d797..3d7d542 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -80,51 +80,52 @@ public class CubeStatsReader {
     final CuboidScheduler cuboidScheduler;
 
     public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException {
+        this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig);
+    }
+
+    /**
+     * @param cuboidScheduler if it's null, part of it's functions will not be supported
+     */
+    public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, KylinConfig kylinConfig)
+            throws IOException {
         ResourceStore store = ResourceStore.getStore(kylinConfig);
-        cuboidScheduler = cubeSegment.getCuboidScheduler();
         String statsKey = cubeSegment.getStatisticsResourcePath();
         File tmpSeqFile = writeTmpSeqFile(store.getResource(statsKey).inputStream);
-        Reader reader = null;
-
-        try {
-            Configuration hadoopConf = HadoopUtil.getCurrentConfiguration();
-
-            Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath()));
-            Option seqInput = SequenceFile.Reader.file(path);
-            reader = new SequenceFile.Reader(hadoopConf, seqInput);
-
-            int percentage = 100;
-            int mapperNumber = 0;
-            double mapperOverlapRatio = 0;
-            Map<Long, HLLCounter> counterMap = Maps.newHashMap();
-
-            LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf);
-            BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), hadoopConf);
-            while (reader.next(key, value)) {
-                if (key.get() == 0L) {
-                    percentage = Bytes.toInt(value.getBytes());
-                } else if (key.get() == -1) {
-                    mapperOverlapRatio = Bytes.toDouble(value.getBytes());
-                } else if (key.get() == -2) {
-                    mapperNumber = Bytes.toInt(value.getBytes());
-                } else if (key.get() > 0) {
-                    HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
-                    ByteArray byteArray = new ByteArray(value.getBytes());
-                    hll.readRegisters(byteArray.asBuffer());
-                    counterMap.put(key.get(), hll);
-                }
-            }
-
-            this.seg = cubeSegment;
-            this.samplingPercentage = percentage;
-            this.mapperNumberOfFirstBuild = mapperNumber;
-            this.mapperOverlapRatioOfFirstBuild = mapperOverlapRatio;
-            this.cuboidRowEstimatesHLL = counterMap;
+        Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath()));
+
+        CubeStatsResult cubeStatsResult = new CubeStatsResult();
+        cubeStatsResult.initialize(path, kylinConfig.getCubeStatsHLLPrecision());
+        tmpSeqFile.delete();
+
+        this.seg = cubeSegment;
+        this.cuboidScheduler = cuboidScheduler;
+        this.samplingPercentage = cubeStatsResult.percentage;
+        this.mapperNumberOfFirstBuild = cubeStatsResult.mapperNumber;
+        this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.mapperOverlapRatio;
+        this.cuboidRowEstimatesHLL = cubeStatsResult.counterMap;
+    }
 
-        } finally {
-            IOUtils.closeStream(reader);
-            tmpSeqFile.delete();
-        }
+    /**
+     * Read statistics from
+     * @param path
+     * rather than
+     * @param cubeSegment
+     *
+     * Since the statistics are from
+     * @param path
+     * cuboid scheduler should be provided by default
+     */
+    public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, KylinConfig kylinConfig, Path path)
+            throws IOException {
+        CubeStatsResult cubeStatsResult = new CubeStatsResult();
+        cubeStatsResult.initialize(path, kylinConfig.getCubeStatsHLLPrecision());
+
+        this.seg = cubeSegment;
+        this.cuboidScheduler = cuboidScheduler;
+        this.samplingPercentage = cubeStatsResult.percentage;
+        this.mapperNumberOfFirstBuild = cubeStatsResult.mapperNumber;
+        this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.mapperOverlapRatio;
+        this.cuboidRowEstimatesHLL = cubeStatsResult.counterMap;
     }
 
     private File writeTmpSeqFile(InputStream inputStream) throws IOException {
@@ -144,6 +145,10 @@ public class CubeStatsReader {
         return this.cuboidRowEstimatesHLL;
     }
 
+    public int getSamplingPercentage() {
+        return samplingPercentage;
+    }
+
     public Map<Long, Long> getCuboidRowEstimatesHLL() {
         return getCuboidRowCountMapFromSampling(cuboidRowEstimatesHLL, samplingPercentage);
     }
@@ -253,6 +258,9 @@ public class CubeStatsReader {
 
     //return MB
     public double estimateLayerSize(int level) {
+        if (cuboidScheduler == null) {
+            throw new UnsupportedOperationException("cuboid scheduler is null");
+        }
         List<List<Long>> layeredCuboids = cuboidScheduler.getCuboidsByLayer();
         Map<Long, Double> cuboidSizeMap = getCuboidSizeMap();
         double ret = 0;
@@ -265,11 +273,17 @@ public class CubeStatsReader {
     }
 
     public List<Long> getCuboidsByLayer(int level) {
+        if (cuboidScheduler == null) {
+            throw new UnsupportedOperationException("cuboid scheduler is null");
+        }
         List<List<Long>> layeredCuboids = cuboidScheduler.getCuboidsByLayer();
         return layeredCuboids.get(level);
     }
 
     private void printCuboidInfoTreeEntry(Map<Long, Long> cuboidRows, Map<Long, Double> cuboidSizes, PrintWriter out) {
+        if (cuboidScheduler == null) {
+            throw new UnsupportedOperationException("cuboid scheduler is null");
+        }
         long baseCuboid = Cuboid.getBaseCuboidId(seg.getCubeDesc());
         int dimensionCount = Long.bitCount(baseCuboid);
         printCuboidInfoTree(-1L, baseCuboid, cuboidScheduler, cuboidRows, cuboidSizes, dimensionCount, 0, out);
@@ -317,6 +331,36 @@ public class CubeStatsReader {
         return new DecimalFormat("#.##").format(input);
     }
 
+    private class CubeStatsResult {
+        private int percentage = 100;
+        private double mapperOverlapRatio = 0;
+        private int mapperNumber = 0;
+        Map<Long, HLLCounter> counterMap = Maps.newHashMap();
+
+        void initialize(Path path, int precision) throws IOException {
+            Configuration hadoopConf = HadoopUtil.getCurrentConfiguration();
+            Option seqInput = SequenceFile.Reader.file(path);
+            try (Reader reader = new SequenceFile.Reader(hadoopConf, seqInput)) {
+                LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf);
+                BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), hadoopConf);
+                while (reader.next(key, value)) {
+                    if (key.get() == 0L) {
+                        percentage = Bytes.toInt(value.getBytes());
+                    } else if (key.get() == -1) {
+                        mapperOverlapRatio = Bytes.toDouble(value.getBytes());
+                    } else if (key.get() == -2) {
+                        mapperNumber = Bytes.toInt(value.getBytes());
+                    } else if (key.get() > 0) {
+                        HLLCounter hll = new HLLCounter(precision);
+                        ByteArray byteArray = new ByteArray(value.getBytes());
+                        hll.readRegisters(byteArray.asBuffer());
+                        counterMap.put(key.get(), hll);
+                    }
+                }
+            }
+        }
+    }
+
     public static void main(String[] args) throws IOException {
         System.out.println("CubeStatsReader is used to read cube statistic saved in metadata store");
         KylinConfig config = KylinConfig.getInstanceFromEnv();

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
index ba3f023..649eeb6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
@@ -39,20 +39,21 @@ public class CuboidRecommenderUtil {
             return null;
         }
 
-        CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, segment.getConfig());
+        CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, null, segment.getConfig());
         if (cubeStatsReader.getCuboidRowEstimatesHLL() == null
                 || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) {
             logger.info("Cuboid Statistics is not enabled.");
             return null;
         }
-        long baseCuboid = segment.getCuboidScheduler().getBaseCuboidId();
+        CubeInstance cube = segment.getCubeInstance();
+        long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
         if (cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null
                 || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == 0L) {
             logger.info("Base cuboid count in cuboid statistics is 0.");
             return null;
         }
 
-        String key = segment.getCubeInstance().getName();
+        String key = cube.getName();
         CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, cubeStatsReader.getCuboidRowEstimatesHLL(),
                 cubeStatsReader.getCuboidSizeMap()).build();
         return CuboidRecommender.getInstance().getRecommendCuboidList(cuboidStats, segment.getConfig(), false);
@@ -81,20 +82,21 @@ public class CuboidRecommenderUtil {
             return null;
         }
 
-        CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, segment.getConfig());
+        CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, null, segment.getConfig());
         if (cubeStatsReader.getCuboidRowEstimatesHLL() == null
                 || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) {
             logger.info("Cuboid Statistics is not enabled.");
             return null;
         }
-        long baseCuboid = segment.getCuboidScheduler().getBaseCuboidId();
+        CubeInstance cube = segment.getCubeInstance();
+        long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
         if (cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null
                 || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == 0L) {
             logger.info("Base cuboid count in cuboid statistics is 0.");
             return null;
         }
 
-        String key = segment.getCubeInstance().getName() + "-" + segment.getName();
+        String key = cube.getName() + "-" + segment.getName();
         CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, cubeStatsReader.getCuboidRowEstimatesHLL(),
                 cubeStatsReader.getCuboidSizeMap()).setHitFrequencyMap(hitFrequencyMap)
                         .setRollingUpCountSourceMap(rollingUpCountSourceMap,

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
new file mode 100644
index 0000000..d684c04
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.cuboid.DefaultCuboidScheduler;
+import org.apache.kylin.cube.cuboid.TreeCuboidSchedulerManager;
+
+public class CuboidSchedulerUtil {
+
+    public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, String cuboidModeName) {
+        return getCuboidSchedulerByMode(segment, segment.getCubeInstance().getCuboidsByMode(cuboidModeName));
+    }
+
+    public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, CuboidModeEnum cuboidMode) {
+        return getCuboidSchedulerByMode(segment, segment.getCubeInstance().getCuboidsByMode(cuboidMode));
+    }
+
+    public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, Set<Long> cuboidSet) {
+        CuboidScheduler cuboidScheduler;
+        try {
+            cuboidScheduler = TreeCuboidSchedulerManager.getInstance().getTreeCuboidScheduler(segment.getCubeDesc(), //
+                    CuboidStatsReaderUtil.readCuboidStatsFromSegment(cuboidSet, segment));
+        } catch (IOException e) {
+            throw new RuntimeException("Fail to cube stats for segment" + segment + " due to " + e);
+        }
+
+        if (cuboidScheduler == null) {
+            cuboidScheduler = new DefaultCuboidScheduler(segment.getCubeDesc());
+        }
+        return cuboidScheduler;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
index 68380f3..56ab504 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
@@ -74,7 +74,7 @@ public class CuboidStatsReaderUtil {
         Map<Long, HLLCounter> cuboidHLLMapMerged = Maps.newHashMapWithExpectedSize(cuboidSet.size());
         Map<Long, Double> sizeMapMerged = Maps.newHashMapWithExpectedSize(cuboidSet.size());
         for (CubeSegment pSegment : segmentList) {
-            CubeStatsReader pReader = new CubeStatsReader(pSegment, pSegment.getConfig());
+            CubeStatsReader pReader = new CubeStatsReader(pSegment, null, pSegment.getConfig());
             Map<Long, HLLCounter> pHLLMap = pReader.getCuboidRowHLLCounters();
             if (pHLLMap == null || pHLLMap.isEmpty()) {
                 logger.info("Cuboid Statistics for segment " + pSegment.getName() + " is not enabled.");
@@ -113,7 +113,7 @@ public class CuboidStatsReaderUtil {
             return null;
         }
 
-        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cubeSegment.getConfig());
+        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, null, cubeSegment.getConfig());
         if (cubeStatsReader.getCuboidRowEstimatesHLL() == null
                 || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) {
             logger.info("Cuboid Statistics is not enabled.");
@@ -132,4 +132,5 @@ public class CuboidStatsReaderUtil {
         }
         return cuboidsWithStats;
     }
+
 }