You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/21 02:29:32 UTC

[7/7] incubator-kylin git commit: KYLIN-878 HBase storage abstraction for cubing flow

KYLIN-878 HBase storage abstraction for cubing flow


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/22dc5734
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/22dc5734
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/22dc5734

Branch: refs/heads/0.8
Commit: 22dc573445447639daf3a7cb165faf84789444a7
Parents: 6171b0a
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Jul 17 10:27:43 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Tue Jul 21 08:28:13 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/mr/KylinMapper.java |   1 +
 .../apache/kylin/common/util/BytesSplitter.java |   9 +-
 .../apache/kylin/common/util/HadoopUtil.java    |  10 +-
 .../java/org/apache/kylin/cube/CubeBuilder.java | 100 ---
 .../java/org/apache/kylin/cube/CubeManager.java |  20 +-
 .../java/org/apache/kylin/cube/CubeSegment.java |  19 +-
 .../java/org/apache/kylin/cube/CubeUpdate.java  | 101 +++
 .../apache/kylin/cube/CubeManagerCacheTest.java |   2 +-
 .../org/apache/kylin/cube/CubeManagerTest.java  |  10 +-
 .../org/apache/kylin/cube/CubeSegmentsTest.java |   4 +-
 .../apache/kylin/engine/BuildEngineFactory.java |  20 +-
 .../apache/kylin/engine/IBatchCubingEngine.java |   3 +
 .../kylin/engine/mr/BatchCubingJobBuilder.java  | 145 +---
 .../kylin/engine/mr/BatchCubingJobBuilder2.java |  94 +++
 .../kylin/engine/mr/BatchMergeJobBuilder.java   |  98 +--
 .../kylin/engine/mr/BatchMergeJobBuilder2.java  | 100 +++
 .../kylin/engine/mr/ByteArrayWritable.java      | 166 +++++
 .../kylin/engine/mr/GarbageCollectionStep.java  | 149 -----
 .../org/apache/kylin/engine/mr/IMRInput.java    |  14 +-
 .../kylin/engine/mr/IMRJobFlowParticipant.java  |  34 -
 .../org/apache/kylin/engine/mr/IMROutput.java   |  58 +-
 .../org/apache/kylin/engine/mr/IMROutput2.java  |  88 +++
 .../kylin/engine/mr/JobBuilderSupport.java      | 152 ++---
 .../kylin/engine/mr/MRBatchCubingEngine.java    |  29 +-
 .../kylin/engine/mr/MRBatchCubingEngine2.java   |  47 ++
 .../java/org/apache/kylin/engine/mr/MRUtil.java |  55 ++
 .../kylin/engine/mr/MergeDictionaryStep.java    | 197 ------
 .../engine/mr/UpdateCubeInfoAfterBuildStep.java | 100 ---
 .../engine/mr/UpdateCubeInfoAfterMergeStep.java | 135 ----
 .../kylin/engine/mr/steps/InMemCuboidJob.java   | 104 +++
 .../engine/mr/steps/InMemCuboidMapper.java      | 122 ++++
 .../engine/mr/steps/InMemCuboidReducer.java     |  95 +++
 .../mr/steps/MapContextGTRecordWriter.java      |  96 +++
 .../mr/steps/MergeCuboidFromStorageJob.java     |  95 +++
 .../mr/steps/MergeCuboidFromStorageMapper.java  | 197 ++++++
 .../engine/mr/steps/MergeDictionaryStep.java    | 197 ++++++
 .../engine/mr/steps/MergeStatisticsStep.java    | 188 ++++++
 .../engine/mr/steps/SaveStatisticsStep.java     | 109 +++
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java  | 101 +++
 .../mr/steps/UpdateCubeInfoAfterMergeStep.java  | 136 ++++
 .../kylin/job/constant/BatchConstants.java      |   3 +-
 .../kylin/job/engine/JobEngineConfig.java       |   4 -
 .../kylin/job/hadoop/AbstractHadoopJob.java     |   6 +-
 .../cardinality/ColumnCardinalityMapper.java    |   4 +-
 .../cardinality/HiveColumnCardinalityJob.java   |   4 +-
 .../kylin/job/hadoop/cube/CubeHFileMapper.java  |   6 +-
 .../apache/kylin/job/hadoop/cube/CuboidJob.java |  45 +-
 .../kylin/job/hadoop/cube/CuboidReducer.java    |   2 +-
 .../job/hadoop/cube/FactDistinctColumnsJob.java |   4 +-
 .../cube/FactDistinctColumnsMapperBase.java     |   4 +-
 .../job/hadoop/cube/HiveToBaseCuboidMapper.java |  28 +-
 .../job/hadoop/cube/IIToBaseCuboidMapper.java   | 109 ---
 .../kylin/job/hadoop/cubev2/InMemCuboidJob.java | 130 ----
 .../job/hadoop/cubev2/InMemCuboidMapper.java    | 123 ----
 .../job/hadoop/cubev2/InMemCuboidReducer.java   |  96 ---
 .../job/hadoop/cubev2/InMemKeyValueCreator.java |  77 ---
 .../hadoop/cubev2/MapContextGTRecordWriter.java |  97 ---
 .../hadoop/cubev2/MergeCuboidFromHBaseJob.java  | 120 ----
 .../cubev2/MergeCuboidFromHBaseMapper.java      | 242 -------
 .../job/hadoop/cubev2/MergeStatisticsStep.java  | 188 ------
 .../job/hadoop/cubev2/SaveStatisticsStep.java   | 112 ----
 .../kylin/job/streaming/CubeStreamConsumer.java |   6 +-
 .../apache/kylin/source/hive/HiveMRInput.java   |   2 +-
 .../java/org/apache/kylin/storage/IStorage.java |  28 -
 .../apache/kylin/storage/StorageFactory2.java   |  34 +
 .../kylin/storage/hbase/HBaseMROutput.java      |  38 +-
 .../kylin/storage/hbase/HBaseMROutput2.java     | 259 ++++++++
 .../kylin/storage/hbase/HBaseMRSteps.java       | 138 ++++
 .../kylin/storage/hbase/HBaseStorage.java       |   4 +-
 .../storage/hbase/InMemKeyValueCreator.java     |  73 ++
 .../apache/kylin/storage/hbase/MergeGCStep.java | 121 ++++
 .../kylin/job/BuildCubeWithEngineTest.java      |   4 +-
 .../java/org/apache/kylin/job/DeployUtil.java   |   4 +-
 .../hadoop/cube/HiveToBaseCuboidMapperTest.java |   6 +-
 .../job/hadoop/cube/MergeCuboidMapperTest.java  |   4 +-
 .../job/streaming/CubeStreamConsumerTest.java   |   4 +-
 .../kylin/metadata/measure/MeasureCodec.java    |   5 -
 .../apache/kylin/metadata/model/IBuildable.java |  28 +
 .../apache/kylin/source/TableSourceFactory.java |   7 +-
 .../kylin/query/enumerator/OLAPEnumerator.java  |   4 +-
 .../kylin/rest/controller/CubeController.java   |   4 +-
 .../apache/kylin/rest/service/CubeService.java  |  12 +-
 .../apache/kylin/rest/service/JobService.java   |   4 +-
 .../kylin/rest/service/CacheServiceTest.java    |   2 +-
 .../kylin/storage/ICachableStorageEngine.java   |  34 -
 .../kylin/storage/ICachableStorageQuery.java    |  33 +
 .../java/org/apache/kylin/storage/IStorage.java |  28 +
 .../kylin/storage/StorageEngineFactory.java     |  83 ---
 .../apache/kylin/storage/StorageFactory.java    |  85 +++
 .../AbstractCacheFledgedStorageEngine.java      |   6 +-
 .../cache/CacheFledgedDynamicStorageEngine.java |   4 +-
 .../cache/CacheFledgedStaticStorageEngine.java  |   4 +-
 .../kylin/storage/cube/CubeStorageEngine.java   | 371 -----------
 .../kylin/storage/cube/CubeStorageQuery.java    | 371 +++++++++++
 .../kylin/storage/hbase/CubeStorageEngine.java  | 663 -------------------
 .../kylin/storage/hbase/CubeStorageQuery.java   | 663 +++++++++++++++++++
 .../hbase/InvertedIndexStorageEngine.java       |  83 ---
 .../hbase/InvertedIndexStorageQuery.java        |  82 +++
 .../storage/hybrid/HybridStorageEngine.java     |   4 +-
 .../kylin/storage/test/DynamicCacheTest.java    |   4 +-
 .../kylin/storage/test/ITStorageTest.java       |   4 +-
 .../kylin/storage/test/StaticCacheTest.java     |   4 +-
 102 files changed, 4403 insertions(+), 3718 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/common/src/main/java/org/apache/kylin/common/mr/KylinMapper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/mr/KylinMapper.java b/common/src/main/java/org/apache/kylin/common/mr/KylinMapper.java
index a933782..fd0b337 100644
--- a/common/src/main/java/org/apache/kylin/common/mr/KylinMapper.java
+++ b/common/src/main/java/org/apache/kylin/common/mr/KylinMapper.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.Mapper;
 /**
  */
 public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+    
     protected void bindCurrentConfiguration(Configuration conf) {
         HadoopUtil.setCurrentConfiguration(conf);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java b/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
index bd16246..7249dcf 100644
--- a/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
+++ b/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * @author xjiang
  */
 public class BytesSplitter {
     private static final Logger logger = LoggerFactory.getLogger(BytesSplitter.class);
@@ -79,6 +78,14 @@ public class BytesSplitter {
 
         return bufferSize;
     }
+    
+    public void setBuffers(byte[][] buffers) {
+        for (int i = 0; i < buffers.length; i++) {
+            splitBuffers[i].value = buffers[i];
+            splitBuffers[i].length = buffers[i].length;
+        }
+        this.bufferSize = buffers.length;
+    }
 
     public byte inferByteRowDelimiter(byte[] bytes, int byteLen, int expectedSplits) throws IOException {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 8ee1440..fb6b9bb 100644
--- a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -29,6 +29,7 @@ import java.util.regex.Pattern;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -140,7 +141,6 @@ public class HadoopUtil {
     }
 
     /**
-     * 
      * @param table the identifier of hive table, in format <db_name>.<table_name>
      * @return a string array with 2 elements: {"db_name", "table_name"}
      */
@@ -151,4 +151,12 @@ public class HadoopUtil {
 
         return new String[] { database, tableName };
     }
+
+    public static void deletePath(Configuration conf, Path path) throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        if (fs.exists(path)) {
+            fs.delete(path, true);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/cube/src/main/java/org/apache/kylin/cube/CubeBuilder.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeBuilder.java b/cube/src/main/java/org/apache/kylin/cube/CubeBuilder.java
deleted file mode 100644
index a828252..0000000
--- a/cube/src/main/java/org/apache/kylin/cube/CubeBuilder.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube;
-
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-
-/**
- */
-public class CubeBuilder {
-    private CubeInstance cubeInstance;
-    private CubeSegment[] toAddSegs = null;
-    private CubeSegment[] toRemoveSegs = null;
-    private CubeSegment[] toUpdateSegs = null;
-    private RealizationStatusEnum status;
-    private String owner;
-    private int cost = -1;
-
-    public CubeBuilder(CubeInstance cubeInstance) {
-        this.cubeInstance = cubeInstance;
-    }
-
-    public CubeInstance getCubeInstance() {
-        return cubeInstance;
-    }
-
-    public CubeBuilder setCubeInstance(CubeInstance cubeInstance) {
-        this.cubeInstance = cubeInstance;
-        return this;
-    }
-
-    public CubeSegment[] getToAddSegs() {
-        return toAddSegs;
-    }
-
-    public CubeBuilder setToAddSegs(CubeSegment... toAddSegs) {
-        this.toAddSegs = toAddSegs;
-        return this;
-    }
-
-    public CubeSegment[] getToRemoveSegs() {
-        return toRemoveSegs;
-    }
-
-    public CubeBuilder setToRemoveSegs(CubeSegment... toRemoveSegs) {
-        this.toRemoveSegs = toRemoveSegs;
-        return this;
-    }
-
-    public CubeSegment[] getToUpdateSegs() {
-        return toUpdateSegs;
-    }
-
-    public CubeBuilder setToUpdateSegs(CubeSegment... toUpdateSegs) {
-        this.toUpdateSegs = toUpdateSegs;
-        return this;
-    }
-
-    public RealizationStatusEnum getStatus() {
-        return status;
-    }
-
-    public CubeBuilder setStatus(RealizationStatusEnum status) {
-        this.status = status;
-        return this;
-    }
-
-    public String getOwner() {
-        return owner;
-    }
-
-    public CubeBuilder setOwner(String owner) {
-        this.owner = owner;
-        return this;
-    }
-
-    public int getCost() {
-        return cost;
-    }
-
-    public CubeBuilder setCost(int cost) {
-        this.cost = cost;
-        return this;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index e85a5fe..cc61dac 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -172,7 +172,7 @@ public class CubeManager implements IRealizationProvider {
         if (dictInfo != null) {
             cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
 
-            CubeBuilder cubeBuilder = new CubeBuilder(cubeSeg.getCubeInstance());
+            CubeUpdate cubeBuilder = new CubeUpdate(cubeSeg.getCubeInstance());
             cubeBuilder.setToUpdateSegs(cubeSeg);
             updateCube(cubeBuilder);
         }
@@ -211,7 +211,7 @@ public class CubeManager implements IRealizationProvider {
         SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
 
         cubeSeg.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
-        CubeBuilder cubeBuilder = new CubeBuilder(cubeSeg.getCubeInstance());
+        CubeUpdate cubeBuilder = new CubeUpdate(cubeSeg.getCubeInstance());
         cubeBuilder.setToUpdateSegs(cubeSeg);
         updateCube(cubeBuilder);
 
@@ -248,13 +248,13 @@ public class CubeManager implements IRealizationProvider {
         CubeInstance cube = CubeInstance.create(cubeName, projectName, desc);
         cube.setOwner(owner);
 
-        updateCube(new CubeBuilder(cube));
+        updateCube(new CubeUpdate(cube));
         ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cubeName, projectName, owner);
 
         return cube;
     }
 
-    public CubeInstance updateCube(CubeBuilder cubeBuilder) throws IOException {
+    public CubeInstance updateCube(CubeUpdate cubeBuilder) throws IOException {
         return updateCube(cubeBuilder, 0);
     }
 
@@ -283,7 +283,7 @@ public class CubeManager implements IRealizationProvider {
         return true;
     }
 
-    private CubeInstance updateCube(CubeBuilder cubeBuilder, int retry) throws IOException {
+    private CubeInstance updateCube(CubeUpdate cubeBuilder, int retry) throws IOException {
         if (cubeBuilder == null || cubeBuilder.getCubeInstance() == null)
             throw new IllegalStateException();
 
@@ -385,7 +385,7 @@ public class CubeManager implements IRealizationProvider {
 
         validateNewSegments(cube, mergeSegment);
 
-        CubeBuilder cubeBuilder = new CubeBuilder(cube).setToAddSegs(appendSegment, mergeSegment);
+        CubeUpdate cubeBuilder = new CubeUpdate(cube).setToAddSegs(appendSegment, mergeSegment);
         updateCube(cubeBuilder);
 
         return new Pair<CubeSegment, CubeSegment>(appendSegment, mergeSegment);
@@ -414,7 +414,7 @@ public class CubeManager implements IRealizationProvider {
 
         if (saveChange) {
 
-            CubeBuilder cubeBuilder = new CubeBuilder(cube);
+            CubeUpdate cubeBuilder = new CubeUpdate(cube);
             cubeBuilder.setToAddSegs(newSegment);
             updateCube(cubeBuilder);
         }
@@ -425,7 +425,7 @@ public class CubeManager implements IRealizationProvider {
         checkNoBuildingSegment(cube);
 
         CubeSegment newSegment = newSegment(cube, startDate, endDate);
-        CubeBuilder cubeBuilder = new CubeBuilder(cube);
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
         cubeBuilder.setToAddSegs(newSegment);
         updateCube(cubeBuilder);
 
@@ -456,7 +456,7 @@ public class CubeManager implements IRealizationProvider {
 
         validateNewSegments(cube, false, newSegment);
 
-        CubeBuilder cubeBuilder = new CubeBuilder(cube);
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
         cubeBuilder.setToAddSegs(newSegment);
         updateCube(cubeBuilder);
 
@@ -684,7 +684,7 @@ public class CubeManager implements IRealizationProvider {
 
         logger.info("Promoting cube " + cube + ", new segments " + Arrays.toString(newSegments) + ", to remove segments " + toRemoveSegs);
 
-        CubeBuilder cubeBuilder = new CubeBuilder(cube);
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
         cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])).setToUpdateSegs(newSegments).setStatus(RealizationStatusEnum.READY);
         updateCube(cubeBuilder);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 035b5eb..a26c2c9 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -23,10 +23,12 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonBackReference;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Objects;
+
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.IDictionaryAware;
+import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 
@@ -37,7 +39,7 @@ import java.util.TimeZone;
 import java.util.concurrent.ConcurrentHashMap;
 
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware {
+public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IBuildable {
 
     @JsonBackReference
     private CubeInstance cubeInstance;
@@ -344,4 +346,19 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware {
     public static String getStatisticsResourcePath(String cubeName, String cubeSegmentId) {
         return ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + cubeSegmentId + ".seq";
     }
+
+    @Override
+    public int getSourceType() {
+        return 0;
+    }
+
+    @Override
+    public int getEngineType() {
+        return 0;
+    }
+
+    @Override
+    public int getStorageType() {
+        return 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java b/cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
new file mode 100644
index 0000000..dffaa48
--- /dev/null
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube;
+
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+
+/**
+ * Hold changes to a cube so that they can be applied as one unit.
+ */
+public class CubeUpdate {
+    private CubeInstance cubeInstance;
+    private CubeSegment[] toAddSegs = null;
+    private CubeSegment[] toRemoveSegs = null;
+    private CubeSegment[] toUpdateSegs = null;
+    private RealizationStatusEnum status;
+    private String owner;
+    private int cost = -1;
+
+    public CubeUpdate(CubeInstance cubeInstance) {
+        this.cubeInstance = cubeInstance;
+    }
+
+    public CubeInstance getCubeInstance() {
+        return cubeInstance;
+    }
+
+    public CubeUpdate setCubeInstance(CubeInstance cubeInstance) {
+        this.cubeInstance = cubeInstance;
+        return this;
+    }
+
+    public CubeSegment[] getToAddSegs() {
+        return toAddSegs;
+    }
+
+    public CubeUpdate setToAddSegs(CubeSegment... toAddSegs) {
+        this.toAddSegs = toAddSegs;
+        return this;
+    }
+
+    public CubeSegment[] getToRemoveSegs() {
+        return toRemoveSegs;
+    }
+
+    public CubeUpdate setToRemoveSegs(CubeSegment... toRemoveSegs) {
+        this.toRemoveSegs = toRemoveSegs;
+        return this;
+    }
+
+    public CubeSegment[] getToUpdateSegs() {
+        return toUpdateSegs;
+    }
+
+    public CubeUpdate setToUpdateSegs(CubeSegment... toUpdateSegs) {
+        this.toUpdateSegs = toUpdateSegs;
+        return this;
+    }
+
+    public RealizationStatusEnum getStatus() {
+        return status;
+    }
+
+    public CubeUpdate setStatus(RealizationStatusEnum status) {
+        this.status = status;
+        return this;
+    }
+
+    public String getOwner() {
+        return owner;
+    }
+
+    public CubeUpdate setOwner(String owner) {
+        this.owner = owner;
+        return this;
+    }
+
+    public int getCost() {
+        return cost;
+    }
+
+    public CubeUpdate setCost(int cost) {
+        this.cost = cost;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
----------------------------------------------------------------------
diff --git a/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java b/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
index 62c628d..4226319 100644
--- a/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
+++ b/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
@@ -66,7 +66,7 @@ public class CubeManagerCacheTest extends LocalFileMetadataTestCase {
         assertEquals(0, createdCube.getSegments().size());
         assertEquals(RealizationStatusEnum.DISABLED, createdCube.getStatus());
         createdCube.setStatus(RealizationStatusEnum.DESCBROKEN);
-        CubeBuilder cubeBuilder = new CubeBuilder(createdCube);
+        CubeUpdate cubeBuilder = new CubeUpdate(createdCube);
 
         cubeManager.updateCube(cubeBuilder);
         assertEquals(RealizationStatusEnum.DESCBROKEN, cubeManager.getCube("a_whole_new_cube").getStatus());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
----------------------------------------------------------------------
diff --git a/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
index 64c4ea5..fbfad73 100644
--- a/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
+++ b/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
@@ -94,7 +94,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
         CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
 
         cube.setAutoMergeTimeRanges(new long[] {2000, 6000});
-        mgr.updateCube(new CubeBuilder(cube));
+        mgr.updateCube(new CubeUpdate(cube));
 
         assertTrue(cube.needAutoMerge());
 
@@ -109,7 +109,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
         CubeSegment seg2 = mgr.appendSegments(cube, 2000);
         seg2.setStatus(SegmentStatusEnum.READY);
 
-        CubeBuilder cubeBuilder = new CubeBuilder(cube);
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
 
         mgr.updateCube(cubeBuilder);
 
@@ -140,7 +140,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
         CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
 
         cube.setAutoMergeTimeRanges(new long[] {2000, 6000});
-        mgr.updateCube(new CubeBuilder(cube));
+        mgr.updateCube(new CubeUpdate(cube));
 
         assertTrue(cube.needAutoMerge());
 
@@ -155,7 +155,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
         CubeSegment seg3 = mgr.appendSegments(cube, 2000, 4000, false, false);
         seg3.setStatus(SegmentStatusEnum.READY);
 
-        CubeBuilder cubeBuilder = new CubeBuilder(cube);
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
         cubeBuilder.setToAddSegs(seg3);
         cubeBuilder.setToUpdateSegs(seg1);
 
@@ -172,7 +172,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
         CubeSegment seg4 = mgr.appendSegments(cube, 4000, 8000, false, false);
         seg4.setStatus(SegmentStatusEnum.READY);
 
-        cubeBuilder = new CubeBuilder(cube);
+        cubeBuilder = new CubeUpdate(cube);
         cubeBuilder.setToAddSegs(seg4);
 
         mgr.updateCube(cubeBuilder);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
----------------------------------------------------------------------
diff --git a/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java b/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
index e3f8638..319ce5f 100644
--- a/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
+++ b/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
@@ -177,13 +177,13 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase {
 
         CubeSegment seg3 = mgr.appendSegments(cube, 2000, 3000, false, false);
         seg3.setStatus(SegmentStatusEnum.READY);
-        CubeBuilder builder = new CubeBuilder(cube).setToAddSegs(seg3);
+        CubeUpdate builder = new CubeUpdate(cube).setToAddSegs(seg3);
 
         mgr.updateCube(builder);
         assertEquals(2, cube.getSegments().size());
 
         CubeSegment seg2 = mgr.appendSegments(cube, 1000, 2000, false, false);
-        builder = new CubeBuilder(cube).setToAddSegs(seg2);
+        builder = new CubeUpdate(cube).setToAddSegs(seg2);
         mgr.updateCube(builder);
         assertEquals(3, cube.getSegments().size());
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
index c536deb..d24c99c 100644
--- a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
+++ b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
@@ -18,22 +18,36 @@
 
 package org.apache.kylin.engine;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine2;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
 public class BuildEngineFactory {
     
-    private static final IBatchCubingEngine defaultBatch = new MRBatchCubingEngine();
+    private static IBatchCubingEngine defaultBatchEngine;
+    
+    public static IBatchCubingEngine defaultBatchEngine() {
+        if (defaultBatchEngine == null) {
+            KylinConfig conf = KylinConfig.getInstanceFromEnv();
+            if (conf.isCubingInMem()) {
+                defaultBatchEngine = new MRBatchCubingEngine2();
+            } else {
+                defaultBatchEngine = new MRBatchCubingEngine();
+            }
+        }
+        return defaultBatchEngine;
+    }
     
     /** Build a new cube segment, typically its time range appends to the end of current cube. */
     public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
-        return defaultBatch.createBatchCubingJob(newSegment, submitter);
+        return defaultBatchEngine().createBatchCubingJob(newSegment, submitter);
     }
     
     /** Merge multiple small segments into a big one. */
     public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
-        return defaultBatch.createBatchMergeJob(mergeSegment, submitter);
+        return defaultBatchEngine().createBatchMergeJob(mergeSegment, submitter);
     }
     
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
index 55a080c..904f557 100644
--- a/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
+++ b/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
@@ -29,4 +29,7 @@ public interface IBatchCubingEngine {
     /** Merge multiple small segments into a big one. */
     public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
     
+    public Class<?> getSourceInterface();
+    
+    public Class<?> getStorageInterface();
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 239ce64..a39ac74 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -19,111 +19,58 @@
 package org.apache.kylin.engine.mr;
 
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
-import org.apache.kylin.job.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
 import org.apache.kylin.job.common.MapReduceExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.hadoop.cube.BaseCuboidJob;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
 import org.apache.kylin.job.hadoop.cube.NDCuboidJob;
-import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
-import org.apache.kylin.job.hadoop.cubev2.SaveStatisticsStep;
-import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
 
 public class BatchCubingJobBuilder extends JobBuilderSupport {
 
     private final IMRBatchCubingInputSide inputSide;
-    
+    private final IMRBatchCubingOutputSide outputSide;
+
     public BatchCubingJobBuilder(CubeSegment newSegment, String submitter) {
         super(newSegment, submitter);
-        this.inputSide = MRBatchCubingEngine.getBatchCubingInputSide(seg);
+        this.inputSide = MRUtil.getBatchCubingInputSide(seg);
+        this.outputSide = MRUtil.getBatchCubingOutputSide(seg);
     }
 
     public CubingJob build() {
-        final CubingJob result = CubingJob.createBuildJob(seg,  submitter,  config);
+        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
         final String jobId = result.getId();
         final String cuboidRootPath = getCuboidRootPath(jobId);
-        final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
 
         // Phase 1: Create Flat Table
         inputSide.addStepPhase1_CreateFlatTable(result);
-        
+
         // Phase 2: Build Dictionary
-        result.addTask(createFactDistinctColumnsStep(flatHiveTableDesc, jobId));
+        result.addTask(createFactDistinctColumnsStep(jobId));
         result.addTask(createBuildDictionaryStep(jobId));
-        
+
         // Phase 3: Build Cube
-        if (config.isInMemCubing()) {
-            result.addTask(createSaveStatisticsStep(jobId));
-            
-            // create htable step
-            result.addTask(createCreateHTableStep(jobId));
-            result.addTask(createInMemCubingStep(flatHiveTableDesc, result.getId()));
-            // bulk load step
-            result.addTask(createBulkLoadStep(jobId));
-        } else {
-            final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
-            final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
-            final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
-            
-            // base cuboid step
-            result.addTask(createBaseCuboidStep(flatHiveTableDesc, cuboidOutputTempPath, jobId));
-
-            // n dim cuboid steps
-            for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
-                int dimNum = totalRowkeyColumnsCount - i;
-                result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
-            }
-            result.addTask(createRangeRowkeyDistributionStep(cuboidRootPath + "*", jobId));
-            // create htable step
-            result.addTask(createCreateHTableStep(jobId));
-            // generate hfiles step
-            result.addTask(createConvertCuboidToHfileStep(cuboidRootPath + "*", jobId));
-            // bulk load step
-            result.addTask(createBulkLoadStep(jobId));
+        final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
+        final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
+        final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
+        // base cuboid step
+        result.addTask(createBaseCuboidStep(cuboidOutputTempPath, jobId));
+        // n dim cuboid steps
+        for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
+            int dimNum = totalRowkeyColumnsCount - i;
+            result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
         }
+        outputSide.addStepPhase3_BuildCube(result, cuboidRootPath);
 
         // Phase 4: Update Metadata & Cleanup
         result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
-        inputSide.addStepPhase4_UpdateMetadataAndCleanup(result);
-
-        return result;
-    }
-
-    private MapReduceExecutable createFactDistinctColumnsStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
-        MapReduceExecutable result = new MapReduceExecutable();
-        result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
-        result.setMapReduceJobClass(FactDistinctColumnsJob.class);
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId));
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(config.isInMemCubing()));
-        appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
-        appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
+        inputSide.addStepPhase4_Cleanup(result);
+        outputSide.addStepPhase4_Cleanup(result);
 
-        result.setMapReduceParams(cmd.toString());
         return result;
     }
 
-    private HadoopShellExecutable createBuildDictionaryStep(String jobId) {
-        // base cuboid job
-        HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
-        buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", getFactDistinctColumnsPath(jobId));
-
-        buildDictionaryStep.setJobParams(cmd.toString());
-        buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
-        return buildDictionaryStep;
-    }
-
-    private MapReduceExecutable createBaseCuboidStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String[] cuboidOutputTempPath, String jobId) {
+    private MapReduceExecutable createBaseCuboidStep(String[] cuboidOutputTempPath, String jobId) {
         // base cuboid job
         MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
 
@@ -134,7 +81,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
 
         appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
+        appendExecCmdParameters(cmd, "input", ""); // marks flat table input
         appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
         appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getCubeInstance().getName());
         appendExecCmdParameters(cmd, "level", "0");
@@ -165,52 +112,6 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         return ndCuboidStep;
     }
 
-    private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
-        SaveStatisticsStep result = new SaveStatisticsStep();
-        result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setStatisticsPath(getStatisticsPath(jobId));
-        return result;
-    }
-
-    private MapReduceExecutable createInMemCubingStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
-        // base cuboid job
-        MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
-
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg);
-
-        baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
-
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
-        appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
-        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "level", "0");
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-
-        baseCuboidStep.setMapReduceParams(cmd.toString());
-        baseCuboidStep.setMapReduceJobClass(InMemCuboidJob.class);
-        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
-        return baseCuboidStep;
-    }
-
-    private UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) {
-        final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
-        updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
-        updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
-        updateCubeInfoStep.setSegmentId(seg.getUuid());
-        updateCubeInfoStep.setCubingJobId(jobId);
-        return updateCubeInfoStep;
-    }
-    
-    private String getFlatHiveTableLocation(CubeJoinedFlatTableDesc flatTableDesc, String jobId) {
-        return getJobWorkingDir(jobId) + "/" + flatTableDesc.getTableName();
-    }
-    
     private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
         String[] paths = new String[groupRowkeyColumnsCount + 1];
         for (int i = 0; i <= groupRowkeyColumnsCount; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
new file mode 100644
index 0000000..b6f264e
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
+import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
+import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+
+public class BatchCubingJobBuilder2 extends JobBuilderSupport {
+
+    private final IMRBatchCubingInputSide inputSide;
+    private final IMRBatchCubingOutputSide2 outputSide;
+
+    public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
+        super(newSegment, submitter);
+        this.inputSide = MRUtil.getBatchCubingInputSide(seg);
+        this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
+    }
+
+    public CubingJob build() {
+        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+        final String jobId = result.getId();
+
+        // Phase 1: Create Flat Table
+        inputSide.addStepPhase1_CreateFlatTable(result);
+
+        // Phase 2: Build Dictionary
+        result.addTask(createFactDistinctColumnsStepWithStats(jobId));
+        result.addTask(createBuildDictionaryStep(jobId));
+        result.addTask(createSaveStatisticsStep(jobId));
+        outputSide.addStepPhase2_BuildDictionary(result);
+
+        // Phase 3: Build Cube
+        result.addTask(createInMemCubingStep(jobId));
+        outputSide.addStepPhase3_BuildCube(result);
+
+        // Phase 4: Update Metadata & Cleanup
+        result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
+        inputSide.addStepPhase4_Cleanup(result);
+        outputSide.addStepPhase4_Cleanup(result);
+
+        return result;
+    }
+
+    private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
+        SaveStatisticsStep result = new SaveStatisticsStep();
+        result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setStatisticsPath(getStatisticsPath(jobId));
+        return result;
+    }
+
+    private MapReduceExecutable createInMemCubingStep(String jobId) {
+        // base cuboid job
+        MapReduceExecutable cubeStep = new MapReduceExecutable();
+
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd, seg);
+
+        cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
+
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "jobflowid", jobId);
+
+        cubeStep.setMapReduceParams(cmd.toString());
+        cubeStep.setMapReduceJobClass(InMemCuboidJob.class);
+        cubeStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
+        return cubeStep;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
index cb2d8dd..6264ebd 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
@@ -22,23 +22,25 @@ import java.util.List;
 
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
 import org.apache.kylin.job.common.MapReduceExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.hadoop.cube.MergeCuboidJob;
-import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
-import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 public class BatchMergeJobBuilder extends JobBuilderSupport {
 
+    private final IMRBatchMergeOutputSide outputSide;
+
     public BatchMergeJobBuilder(CubeSegment mergeSegment, String submitter) {
         super(mergeSegment, submitter);
+        this.outputSide = MRUtil.getBatchMergeOutputSide(seg);
     }
 
     public CubingJob build() {
-        final CubingJob result = CubingJob.createMergeJob(seg,  submitter,  config);
+        final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
         final String jobId = result.getId();
         final String cuboidRootPath = getCuboidRootPath(jobId);
 
@@ -46,66 +48,23 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
         Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
         final List<String> mergingSegmentIds = Lists.newArrayList();
         final List<String> mergingCuboidPaths = Lists.newArrayList();
-        final List<String> mergingHTables = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingSegmentIds.add(merging.getUuid());
             mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
-            mergingHTables.add(merging.getStorageLocationIdentifier());
         }
 
+        // Phase 1: Merge Dictionary
         result.addTask(createMergeDictionaryStep(mergingSegmentIds));
-        
-        if (config.isInMemCubing()) {
-
-            String mergedStatisticsFolder = getStatisticsPath(jobId);
-            result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, mergedStatisticsFolder));
-
-            // create htable step
-            result.addTask(createCreateHTableStep(jobId));
-
-            String formattedTables = StringUtil.join(mergingHTables, ",");
-            result.addTask(createMergeCuboidDataFromHBaseStep(formattedTables, jobId));
-
-        } else {
-            // merge cuboid
-            String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
-            result.addTask(createMergeCuboidDataStep(seg, formattedPath, cuboidRootPath));
-            
-            // convert htable
-            result.addTask(createRangeRowkeyDistributionStep(cuboidRootPath + "*", jobId));
-            // create htable step
-            result.addTask(createCreateHTableStep(jobId));
-            // generate hfiles step
-            result.addTask(createConvertCuboidToHfileStep(cuboidRootPath + "*", jobId));
-        }
-        
-        // bulk load step
-        result.addTask(createBulkLoadStep(jobId));
-
-        // update cube info
-        result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
 
-        result.addTask(createGarbageCollectionStep(mergingHTables, null));
+        // Phase 2: Merge Cube Files
+        String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
+        result.addTask(createMergeCuboidDataStep(seg, formattedPath, cuboidRootPath));
+        outputSide.addStepPhase2_BuildCube(result, cuboidRootPath);
 
-        return result;
-    }
-    
-    private MergeDictionaryStep createMergeDictionaryStep(List<String> mergingSegmentIds) {
-        MergeDictionaryStep result = new MergeDictionaryStep();
-        result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
-        return result;
-    }
+        // Phase 3: Update Metadata & Cleanup
+        result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
+        outputSide.addStepPhase3_Cleanup(result);
 
-    private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
-        MergeStatisticsStep result = new MergeStatisticsStep();
-        result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
-        result.setMergedStatisticsPath(mergedStatisticsFolder);
         return result;
     }
 
@@ -126,35 +85,4 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
         return mergeCuboidDataStep;
     }
 
-
-    private MapReduceExecutable createMergeCuboidDataFromHBaseStep(String inputTableNames, String jobId) {
-        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
-        mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", inputTableNames);
-        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
-
-        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
-        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
-        mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
-        return mergeCuboidDataStep;
-    }
-
-    private UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, String jobId) {
-        UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
-        result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
-        result.setCubingJobId(jobId);
-        return result;
-    }
-
-    
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
new file mode 100644
index 0000000..8a5cb02
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.engine.mr.steps.MergeCuboidFromStorageJob;
+import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
+import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class BatchMergeJobBuilder2 extends JobBuilderSupport {
+
+    private final IMRBatchCubingOutputSide2 outputSide;
+    
+    public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
+        super(mergeSegment, submitter);
+        this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
+    }
+
+    public CubingJob build() {
+        final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+        final String jobId = result.getId();
+
+        final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+        final List<String> mergingSegmentIds = Lists.newArrayList();
+        final List<String> mergingHTables = Lists.newArrayList();
+        for (CubeSegment merging : mergingSegments) {
+            mergingSegmentIds.add(merging.getUuid());
+            mergingHTables.add(merging.getStorageLocationIdentifier());
+        }
+
+        // Phase 1: Merge Dictionary
+        result.addTask(createMergeDictionaryStep(mergingSegmentIds));
+        result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, getStatisticsPath(jobId)));
+        outputSide.addStepPhase2_BuildDictionary(result);
+
+        // Phase 2: Merge Cube
+        String formattedTables = StringUtil.join(mergingHTables, ",");
+        result.addTask(createMergeCuboidDataFromStorageStep(formattedTables, jobId));
+        outputSide.addStepPhase3_BuildCube(result);
+
+        // Phase 3: Update Metadata & Cleanup
+        result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
+        outputSide.addStepPhase4_Cleanup(result);
+
+        return result;
+    }
+
+    private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
+        MergeStatisticsStep result = new MergeStatisticsStep();
+        result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setMergingSegmentIds(mergingSegmentIds);
+        result.setMergedStatisticsPath(mergedStatisticsFolder);
+        return result;
+    }
+
+    private MapReduceExecutable createMergeCuboidDataFromStorageStep(String inputTableNames, String jobId) {
+        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
+        mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd, seg);
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+        appendExecCmdParameters(cmd, "jobflowid", jobId);
+
+        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
+        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromStorageJob.class);
+        mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+        return mergeCuboidDataStep;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java b/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
new file mode 100644
index 0000000..88bb68c
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
@@ -0,0 +1,166 @@
+package org.apache.kylin.engine.mr;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.kylin.common.util.Bytes;
+
+public class ByteArrayWritable implements WritableComparable<ByteArrayWritable> {
+
+    private byte[] data;
+    private int offset;
+    private int length;
+
+    public ByteArrayWritable() {
+        this(null, 0, 0);
+    }
+
+    public ByteArrayWritable(int capacity) {
+        this(new byte[capacity], 0, capacity);
+    }
+
+    public ByteArrayWritable(byte[] data) {
+        this(data, 0, data == null ? 0 : data.length);
+    }
+
+    public ByteArrayWritable(byte[] data, int offset, int length) {
+        this.data = data;
+        this.offset = offset;
+        this.length = length;
+    }
+
+    public byte[] array() {
+        return data;
+    }
+
+    public int offset() {
+        return offset;
+    }
+
+    public int length() {
+        return length;
+    }
+
+    public void set(byte[] array) {
+        set(array, 0, array.length);
+    }
+
+    public void set(byte[] array, int offset, int length) {
+        this.data = array;
+        this.offset = offset;
+        this.length = length;
+    }
+
+    public ByteBuffer asBuffer() {
+        if (data == null)
+            return null;
+        else if (offset == 0 && length == data.length)
+            return ByteBuffer.wrap(data);
+        else
+            return ByteBuffer.wrap(data, offset, length).slice();
+    }
+
+    @Override
+    public int hashCode() {
+        if (data == null)
+            return 0;
+        else
+            return Bytes.hashCode(data, offset, length);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(this.length);
+        out.write(this.data, this.offset, this.length);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.length = in.readInt();
+        this.data = new byte[this.length];
+        in.readFully(this.data, 0, this.length);
+        this.offset = 0;
+    }
+
+    // Below methods copied from BytesWritable
+    /**
+     * Define the sort order of the BytesWritable.
+     * @param that The other bytes writable
+     * @return Positive if left is bigger than right, 0 if they are equal, and
+     *         negative if left is smaller than right.
+     */
+    public int compareTo(ByteArrayWritable that) {
+        return WritableComparator.compareBytes(this.data, this.offset, this.length, that.data, that.offset, that.length);
+    }
+
+    /**
+     * Compares the bytes in this object to the specified byte array
+     * @param that
+     * @return Positive if left is bigger than right, 0 if they are equal, and
+     *         negative if left is smaller than right.
+     */
+    public int compareTo(final byte[] that) {
+        return WritableComparator.compareBytes(this.data, this.offset, this.length, that, 0, that.length);
+    }
+
+    /**
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    @Override
+    public boolean equals(Object right_obj) {
+        if (right_obj instanceof byte[]) {
+            return compareTo((byte[]) right_obj) == 0;
+        }
+        if (right_obj instanceof ByteArrayWritable) {
+            return compareTo((ByteArrayWritable) right_obj) == 0;
+        }
+        return false;
+    }
+
+    /**
+     * @see java.lang.Object#toString()
+     */
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder(3 * this.length);
+        final int endIdx = this.offset + this.length;
+        for (int idx = this.offset; idx < endIdx; idx++) {
+            sb.append(' ');
+            String num = Integer.toHexString(0xff & this.data[idx]);
+            // if it is only one digit, add a leading 0.
+            if (num.length() < 2) {
+                sb.append('0');
+            }
+            sb.append(num);
+        }
+        return sb.length() > 0 ? sb.substring(1) : "";
+    }
+
+    /** A Comparator optimized for ImmutableBytesWritable.
+     */
+    public static class Comparator extends WritableComparator {
+        private BytesWritable.Comparator comparator = new BytesWritable.Comparator();
+
+        /** constructor */
+        public Comparator() {
+            super(ByteArrayWritable.class);
+        }
+
+        /**
+         * @see org.apache.hadoop.io.WritableComparator#compare(byte[], int, int, byte[], int, int)
+         */
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return comparator.compare(b1, s1, l1, b2, s2, l2);
+        }
+    }
+
+    static { // register this comparator
+        WritableComparator.define(ByteArrayWritable.class, new Comparator());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
deleted file mode 100644
index d79f35d..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.job.cmd.ShellCmdOutput;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * Drop the resources that is no longer needed, including intermediate hive table (after cube build) and hbase tables (after cube merge)
- */
-@Deprecated // only exists for backward compatibility
-public class GarbageCollectionStep extends AbstractExecutable {
-
-    private static final String OLD_HTABLES = "oldHTables";
-
-    private static final String OLD_HIVE_TABLE = "oldHiveTable";
-
-    private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
-
-    public GarbageCollectionStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-
-        StringBuffer output = new StringBuffer();
-
-        final String hiveTable = this.getOldHiveTable();
-        if (StringUtils.isNotEmpty(hiveTable)) {
-            final String dropHiveCMD = "hive -e \"DROP TABLE IF EXISTS  " + hiveTable + ";\"";
-            ShellCmdOutput shellCmdOutput = new ShellCmdOutput();
-            try {
-                context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput);
-                output.append("Hive table " + hiveTable + " is dropped. \n");
-            } catch (IOException e) {
-                logger.error("job:" + getId() + " execute finished with exception", e);
-                output.append(shellCmdOutput.getOutput()).append("\n").append(e.getLocalizedMessage());
-                return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
-            }
-        }
-
-
-        List<String> oldTables = getOldHTables();
-        if (oldTables != null && oldTables.size() > 0) {
-            String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
-            Configuration conf = HBaseConfiguration.create();
-            HBaseAdmin admin = null;
-            try {
-                admin = new HBaseAdmin(conf);
-                for (String table : oldTables) {
-                    if (admin.tableExists(table)) {
-                        HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
-                        String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
-                        if (metadataUrlPrefix.equalsIgnoreCase(host)) {
-                            if (admin.isTableEnabled(table)) {
-                                admin.disableTable(table);
-                            }
-                            admin.deleteTable(table);
-                            logger.debug("Dropped htable: " + table);
-                            output.append("HBase table " + table + " is dropped. \n");
-                        } else {
-                            logger.debug("Skip htable: " + table);
-                            output.append("Skip htable: " + table + ". \n");
-                        }
-                    }
-                }
-
-            } catch (IOException e) {
-                output.append("Got error when drop HBase table, exiting... \n");
-                // This should not block the merge job; Orphans should be cleaned up in StorageCleanupJob
-                return new ExecuteResult(ExecuteResult.State.ERROR, output.append(e.getLocalizedMessage()).toString());
-            } finally {
-                if (admin != null)
-                    try {
-                        admin.close();
-                    } catch (IOException e) {
-                        logger.error(e.getLocalizedMessage());
-                    }
-            }
-        }
-
-
-        return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
-    }
-
-    public void setOldHTables(List<String> ids) {
-        setParam(OLD_HTABLES, StringUtils.join(ids, ","));
-    }
-
-    private List<String> getOldHTables() {
-        final String ids = getParam(OLD_HTABLES);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id : splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-    public void setOldHiveTable(String hiveTable) {
-        setParam(OLD_HIVE_TABLE, hiveTable);
-    }
-
-    private String getOldHiveTable() {
-        return getParam(OLD_HIVE_TABLE);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 08ed94a..0c39398 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -48,22 +48,22 @@ public interface IMRInput {
     
     /**
      * Participate the batch cubing flow as the input side. Responsible for creating
-     * intermediate flat table (Phase 1) and clean up if necessary (Phase 4).
+     * intermediate flat table (Phase 1) and clean up any leftover (Phase 4).
      * 
      * - Phase 1: Create Flat Table
-     * - Phase 2: Build Dictionary
-     * - Phase 3: Build Cube
+     * - Phase 2: Build Dictionary (with FlatTableInputFormat)
+     * - Phase 3: Build Cube (with FlatTableInputFormat)
      * - Phase 4: Update Metadata & Cleanup
      */
     public interface IMRBatchCubingInputSide {
         
+        /** Return an InputFormat that reads from the intermediate flat table */
+        public IMRTableInputFormat getFlatTableInputFormat();
+        
         /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
         
         /** Add step that does necessary clean up, like delete the intermediate flat table */
-        public void addStepPhase4_UpdateMetadataAndCleanup(DefaultChainedExecutable jobFlow);
-        
-        /** Return an InputFormat that reads from the intermediate flat table */
-        public IMRTableInputFormat getFlatTableInputFormat();
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java b/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java
deleted file mode 100644
index 6a94920..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.util.List;
-
-import org.apache.kylin.job.execution.AbstractExecutable;
-
-public interface IMRJobFlowParticipant {
-
-    public List<? extends AbstractExecutable> contributePhase1CreateFlatTable(List<? extends AbstractExecutable> steps);
-    
-    public List<? extends AbstractExecutable> contributePhase2CreateDictionary(List<? extends AbstractExecutable> steps);
-    
-    public List<? extends AbstractExecutable> contributePhase3BuildCube(List<? extends AbstractExecutable> steps);
-    
-    public List<? extends AbstractExecutable> contributePhase4UpdateCubeMetadata(List<? extends AbstractExecutable> steps);
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
index 6e3a42c..bc6ee1f 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
@@ -18,9 +18,61 @@
 
 package org.apache.kylin.engine.mr;
 
-public interface IMROutput {
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
-    public IMRJobFlowParticipant createBuildFlowParticipant();
+public interface IMROutput {
 
-    public IMRJobFlowParticipant createMergeFlowParticipant();
+    /** Return a helper to participate in batch cubing job flow. */
+    public IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg);
+    
+    /**
+     * Participate the batch cubing flow as the output side. Responsible for saving
+     * the cuboid output to storage (Phase 3).
+     * 
+     * - Phase 1: Create Flat Table
+     * - Phase 2: Build Dictionary
+     * - Phase 3: Build Cube
+     * - Phase 4: Update Metadata & Cleanup
+     */
+    public interface IMRBatchCubingOutputSide {
+        
+        /**
+         * Add step that saves cuboid output from HDFS to storage.
+         * 
+         * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn", 
+         * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+         * dictionary encoding; Mx is measure value serialization form.
+         */
+        public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
+        
+        /** Add step that does any necessary clean up. */
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+    }
+    
+    /** Return a helper to participate in batch merge job flow. */
+    public IMRBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg);
+    
+    /**
+     * Participate the batch cubing flow as the output side. Responsible for saving
+     * the cuboid output to storage (Phase 2).
+     * 
+     * - Phase 1: Merge Dictionary
+     * - Phase 2: Merge Cube
+     * - Phase 3: Update Metadata & Cleanup
+     */
+    public interface IMRBatchMergeOutputSide {
+        
+        /**
+         * Add step that saves cuboid output from HDFS to storage.
+         * 
+         * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn", 
+         * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+         * dictionary encoding; Mx is measure value serialization form.
+         */
+        public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
+        
+        /** Add step that does any necessary clean up. */
+        public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
new file mode 100644
index 0000000..974e2fc
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -0,0 +1,88 @@
+package org.apache.kylin.engine.mr;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public interface IMROutput2 {
+
+    /** Return a helper to participate in batch cubing job flow. */
+    public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg);
+
+    /**
+     * Participate the batch cubing flow as the output side.
+     * 
+     * - Phase 1: Create Flat Table
+     * - Phase 2: Build Dictionary
+     * - Phase 3: Build Cube (with StorageOutputFormat)
+     * - Phase 4: Update Metadata & Cleanup
+     */
+    public interface IMRBatchCubingOutputSide2 {
+
+        public IMRStorageOutputFormat getStorageOutputFormat();
+
+        /** Add step that executes after build dictionary and before build cube. */
+        public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow);
+
+        /** Add step that executes after build cube. */
+        public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
+
+        /** Add step that does any necessary clean up. */
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+    }
+
+    public IMRBatchMergeInputSide2 getBatchMergeInputSide(CubeSegment seg);
+
+    public interface IMRBatchMergeInputSide2 {
+        public IMRStorageInputFormat getStorageInputFormat();
+    }
+
+    @SuppressWarnings("rawtypes")
+    public interface IMRStorageInputFormat {
+        
+        public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException;
+        
+        public CubeSegment findSourceSegment(Mapper.Context context, CubeInstance cubeInstance) throws IOException;
+        
+        public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue);
+    }
+
+    /** Return a helper to participate in batch merge job flow. */
+    public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(CubeSegment seg);
+
+    /**
+     * Participate the batch merge flow as the output side.
+     * 
+     * - Phase 1: Merge Dictionary
+     * - Phase 2: Merge Cube (with StorageInputFormat & StorageOutputFormat)
+     * - Phase 3: Update Metadata & Cleanup
+     */
+    public interface IMRBatchMergeOutputSide2 {
+
+        public IMRStorageOutputFormat getStorageOutputFormat();
+
+        /** Add step that executes after merge dictionary and before merge cube. */
+        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+        /** Add step that executes after merge cube. */
+        public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow);
+
+        /** Add step that does any necessary clean up. */
+        public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public interface IMRStorageOutputFormat {
+        public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException;
+        
+        public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException;
+    }
+}