You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/20 03:52:25 UTC

[5/5] incubator-kylin git commit: KYLIN-878 half way

KYLIN-878 half way


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/9a82f39b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/9a82f39b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/9a82f39b

Branch: refs/heads/KYLIN-878
Commit: 9a82f39bdfc60347e259547be6460f958986a0e8
Parents: 4d3af3a
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Jul 20 08:59:10 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Jul 20 08:59:57 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/util/BytesSplitter.java |   9 +-
 .../apache/kylin/engine/BuildEngineFactory.java |   2 +-
 .../kylin/engine/mr/BatchCubingJobBuilder.java  |  10 +-
 .../kylin/engine/mr/BatchCubingJobBuilder2.java | 100 +++++++++++++++++
 .../kylin/engine/mr/BatchMergeJobBuilder2.java  | 102 ++++++++++++++++++
 .../org/apache/kylin/engine/mr/IMROutput.java   |  16 ++-
 .../org/apache/kylin/engine/mr/IMROutput2.java  |  53 +++++++++
 .../kylin/engine/mr/JobBuilderSupport.java      |  15 +--
 .../kylin/engine/mr/MRBatchCubingEngine2.java   |  47 ++++++++
 .../java/org/apache/kylin/engine/mr/MRUtil.java |  10 ++
 .../engine/mr2/BatchCubingJobBuilder2.java      | 102 ------------------
 .../kylin/engine/mr2/BatchMergeJobBuilder2.java | 107 -------------------
 .../org/apache/kylin/engine/mr2/IMROutput2.java |   5 -
 .../kylin/engine/mr2/MRBatchCubingEngine2.java  |  48 ---------
 .../apache/kylin/job/hadoop/cube/CuboidJob.java |  45 +++++---
 .../job/hadoop/cube/HiveToBaseCuboidMapper.java |  28 ++++-
 .../kylin/storage/hbase/HBaseMROutput2.java     |  71 ++++++++++++
 17 files changed, 467 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java b/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
index bd16246..7249dcf 100644
--- a/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
+++ b/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * @author xjiang
  */
 public class BytesSplitter {
     private static final Logger logger = LoggerFactory.getLogger(BytesSplitter.class);
@@ -79,6 +78,14 @@ public class BytesSplitter {
 
         return bufferSize;
     }
+    
+    public void setBuffers(byte[][] buffers) {
+        for (int i = 0; i < buffers.length; i++) {
+            splitBuffers[i].value = buffers[i];
+            splitBuffers[i].length = buffers[i].length;
+        }
+        this.bufferSize = buffers.length;
+    }
 
     public byte inferByteRowDelimiter(byte[] bytes, int byteLen, int expectedSplits) throws IOException {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
index 721b85a..d24c99c 100644
--- a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
+++ b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
@@ -21,7 +21,7 @@ package org.apache.kylin.engine;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.MRBatchCubingEngine;
-import org.apache.kylin.engine.mr2.MRBatchCubingEngine2;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine2;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
 public class BuildEngineFactory {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 0ff3bef..a39ac74 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.engine.mr;
 
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
 import org.apache.kylin.job.common.MapReduceExecutable;
@@ -42,13 +41,12 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
         final String jobId = result.getId();
         final String cuboidRootPath = getCuboidRootPath(jobId);
-        final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
 
         // Phase 1: Create Flat Table
         inputSide.addStepPhase1_CreateFlatTable(result);
 
         // Phase 2: Build Dictionary
-        result.addTask(createFactDistinctColumnsStep(flatHiveTableDesc, jobId));
+        result.addTask(createFactDistinctColumnsStep(jobId));
         result.addTask(createBuildDictionaryStep(jobId));
 
         // Phase 3: Build Cube
@@ -56,7 +54,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
         final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
         // base cuboid step
-        result.addTask(createBaseCuboidStep(flatHiveTableDesc, cuboidOutputTempPath, jobId));
+        result.addTask(createBaseCuboidStep(cuboidOutputTempPath, jobId));
         // n dim cuboid steps
         for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
             int dimNum = totalRowkeyColumnsCount - i;
@@ -72,7 +70,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         return result;
     }
 
-    private MapReduceExecutable createBaseCuboidStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String[] cuboidOutputTempPath, String jobId) {
+    private MapReduceExecutable createBaseCuboidStep(String[] cuboidOutputTempPath, String jobId) {
         // base cuboid job
         MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
 
@@ -83,7 +81,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
 
         appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
+        appendExecCmdParameters(cmd, "input", ""); // marks flat table input
         appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
         appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getCubeInstance().getName());
         appendExecCmdParameters(cmd, "level", "0");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
new file mode 100644
index 0000000..a83c596
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
+import org.apache.kylin.job.hadoop.cubev2.SaveStatisticsStep;
+
+public class BatchCubingJobBuilder2 extends JobBuilderSupport {
+
+    private final IMRBatchCubingInputSide inputSide;
+    private final IMRBatchCubingOutputSide2 outputSide;
+
+    public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
+        super(newSegment, submitter);
+        this.inputSide = MRUtil.getBatchCubingInputSide(seg);
+        this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
+    }
+
+    public CubingJob build() {
+        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+        final String jobId = result.getId();
+        final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
+
+        // Phase 1: Create Flat Table
+        inputSide.addStepPhase1_CreateFlatTable(result);
+
+        // Phase 2: Build Dictionary
+        result.addTask(createFactDistinctColumnsStepWithStats(jobId));
+        result.addTask(createBuildDictionaryStep(jobId));
+        result.addTask(createSaveStatisticsStep(jobId));
+        outputSide.addStepPhase2_BuildDictionary(result);
+
+        // Phase 3: Build Cube
+        result.addTask(createInMemCubingStep(flatHiveTableDesc, result.getId()));
+        outputSide.addStepPhase3_BuildCube(result);
+
+        // Phase 4: Update Metadata & Cleanup
+        result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
+        inputSide.addStepPhase4_Cleanup(result);
+        outputSide.addStepPhase4_Cleanup(result);
+
+        return result;
+    }
+
+    private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
+        SaveStatisticsStep result = new SaveStatisticsStep();
+        result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setStatisticsPath(getStatisticsPath(jobId));
+        return result;
+    }
+
+    private MapReduceExecutable createInMemCubingStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
+        // base cuboid job
+        MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
+
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd, seg);
+
+        baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
+
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
+        appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
+        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "level", "0");
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+
+        baseCuboidStep.setMapReduceParams(cmd.toString());
+        baseCuboidStep.setMapReduceJobClass(InMemCuboidJob.class);
+        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
+        return baseCuboidStep;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
new file mode 100644
index 0000000..f97de13
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
+import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class BatchMergeJobBuilder2 extends JobBuilderSupport {
+
+    private final IMRBatchCubingOutputSide2 outputSide;
+    
+    public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
+        super(mergeSegment, submitter);
+        this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
+    }
+
+    public CubingJob build() {
+        final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+        final String jobId = result.getId();
+
+        final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+        final List<String> mergingSegmentIds = Lists.newArrayList();
+        final List<String> mergingHTables = Lists.newArrayList();
+        for (CubeSegment merging : mergingSegments) {
+            mergingSegmentIds.add(merging.getUuid());
+            mergingHTables.add(merging.getStorageLocationIdentifier());
+        }
+
+        // Phase 1: Merge Dictionary
+        result.addTask(createMergeDictionaryStep(mergingSegmentIds));
+        result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, getStatisticsPath(jobId)));
+        outputSide.addStepPhase2_BuildDictionary(result);
+
+        // Phase 2: Merge Cube
+        String formattedTables = StringUtil.join(mergingHTables, ",");
+        result.addTask(createMergeCuboidDataFromStorageStep(formattedTables, jobId));
+        outputSide.addStepPhase3_BuildCube(result);
+
+        // Phase 3: Update Metadata & Cleanup
+        result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
+        outputSide.addStepPhase4_Cleanup(result);
+
+        return result;
+    }
+
+    private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
+        MergeStatisticsStep result = new MergeStatisticsStep();
+        result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setMergingSegmentIds(mergingSegmentIds);
+        result.setMergedStatisticsPath(mergedStatisticsFolder);
+        return result;
+    }
+
+    private MapReduceExecutable createMergeCuboidDataFromStorageStep(String inputTableNames, String jobId) {
+        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
+        mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd, seg);
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "input", inputTableNames);
+        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+
+        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
+        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
+        mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+        return mergeCuboidDataStep;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
index 8896a2e..bc6ee1f 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
@@ -37,7 +37,13 @@ public interface IMROutput {
      */
     public interface IMRBatchCubingOutputSide {
         
-        /** Add step that saves cuboid output from HDFS to storage. */
+        /**
+         * Add step that saves cuboid output from HDFS to storage.
+         * 
+         * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn", 
+         * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+         * dictionary encoding; Mx is measure value serialization form.
+         */
         public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
         
         /** Add step that does any necessary clean up. */
@@ -57,7 +63,13 @@ public interface IMROutput {
      */
     public interface IMRBatchMergeOutputSide {
         
-        /** Add step that saves cuboid output from HDFS to storage. */
+        /**
+         * Add step that saves cuboid output from HDFS to storage.
+         * 
+         * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn", 
+         * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+         * dictionary encoding; Mx is measure value serialization form.
+         */
         public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
         
         /** Add step that does any necessary clean up. */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
new file mode 100644
index 0000000..9aecba9
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -0,0 +1,53 @@
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public interface IMROutput2 {
+
+    /** Return a helper to participate in batch cubing job flow. */
+    public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg);
+    
+    /**
+     * Participate the batch cubing flow as the output side.
+     * 
+     * - Phase 1: Create Flat Table
+     * - Phase 2: Build Dictionary
+     * - Phase 3: Build Cube
+     * - Phase 4: Update Metadata & Cleanup
+     */
+    public interface IMRBatchCubingOutputSide2 {
+        
+        /** Add step that executes after build dictionary and before build cube. */
+        public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow);
+
+        /** Add step that executes after build cube. */
+        public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
+        
+        /** Add step that does any necessary clean up. */
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+    }
+    
+    /** Return a helper to participate in batch merge job flow. */
+    public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(CubeSegment seg);
+    
+    /**
+     * Participate the batch merge flow as the output side.
+     * 
+     * - Phase 1: Merge Dictionary
+     * - Phase 2: Merge Cube
+     * - Phase 3: Update Metadata & Cleanup
+     */
+    public interface IMRBatchMergeOutputSide2 {
+        
+        /** Add step that executes after merge dictionary and before merge cube. */
+        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+        /** Add step that executes after merge cube. */
+        public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow);
+        
+        /** Add step that does any necessary clean up. */
+        public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 4652269..42d30c8 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -23,7 +23,6 @@ import java.util.List;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.job.common.HadoopShellExecutable;
 import org.apache.kylin.job.common.MapReduceExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -49,15 +48,15 @@ public class JobBuilderSupport {
         this.submitter = submitter;
     }
     
-    public MapReduceExecutable createFactDistinctColumnsStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
-        return createFactDistinctColumnsStep(flatHiveTableDesc, jobId, false);
+    public MapReduceExecutable createFactDistinctColumnsStep(String jobId) {
+        return createFactDistinctColumnsStep(jobId, false);
     }
     
-    public MapReduceExecutable createFactDistinctColumnsStepWithStats(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
-        return createFactDistinctColumnsStep(flatHiveTableDesc, jobId, true);
+    public MapReduceExecutable createFactDistinctColumnsStepWithStats(String jobId) {
+        return createFactDistinctColumnsStep(jobId, true);
     }
     
-    private MapReduceExecutable createFactDistinctColumnsStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId, boolean withStats) {
+    private MapReduceExecutable createFactDistinctColumnsStep(String jobId, boolean withStats) {
         MapReduceExecutable result = new MapReduceExecutable();
         result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
         result.setMapReduceJobClass(FactDistinctColumnsJob.class);
@@ -142,10 +141,6 @@ public class JobBuilderSupport {
         }
     }
     
-    public String getFlatHiveTableLocation(CubeJoinedFlatTableDesc flatTableDesc, String jobId) {
-        return getJobWorkingDir(jobId) + "/" + flatTableDesc.getTableName();
-    }
-
     public String getFactDistinctColumnsPath(String jobId) {
         return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
new file mode 100644
index 0000000..57ec128
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.IBatchCubingEngine;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class MRBatchCubingEngine2 implements IBatchCubingEngine {
+
+    @Override
+    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+        return new BatchCubingJobBuilder2(newSegment, submitter).build();
+    }
+
+    @Override
+    public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+        return new BatchMergeJobBuilder2(mergeSegment, submitter).build();
+    }
+    
+    @Override
+    public Class<?> getSourceInterface() {
+        return IMRInput.class;
+    }
+
+    @Override
+    public Class<?> getStorageInterface() {
+        return IMROutput2.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 099a614..4c44af7 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -6,6 +6,8 @@ import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
 import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.TableSourceFactory;
@@ -42,4 +44,12 @@ public class MRUtil {
         return StorageFactory2.createEngineAdapter(seg.getCubeInstance(), IMROutput.class).getBatchMergeOutputSide(seg);
     }
 
+    public static IMRBatchCubingOutputSide2 getBatchCubingOutputSide2(CubeSegment seg) {
+        return StorageFactory2.createEngineAdapter(seg.getCubeInstance(), IMROutput2.class).getBatchCubingOutputSide(seg);
+    }
+    
+    public static IMRBatchMergeOutputSide2 getBatchMergeOutputSide2(CubeSegment seg) {
+        return StorageFactory2.createEngineAdapter(seg.getCubeInstance(), IMROutput2.class).getBatchMergeOutputSide(seg);
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java
deleted file mode 100644
index e83db30..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr2;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
-import org.apache.kylin.job.hadoop.cubev2.SaveStatisticsStep;
-
-public class BatchCubingJobBuilder2 extends JobBuilderSupport {
-
-    private final IMRBatchCubingInputSide inputSide;
-
-    public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
-        super(newSegment, submitter);
-        this.inputSide = MRUtil.getBatchCubingInputSide(seg);
-    }
-
-    public CubingJob build() {
-        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
-        final String jobId = result.getId();
-        final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
-
-        // Phase 1: Create Flat Table
-        inputSide.addStepPhase1_CreateFlatTable(result);
-
-        // Phase 2: Build Dictionary
-        result.addTask(createFactDistinctColumnsStepWithStats(flatHiveTableDesc, jobId));
-        result.addTask(createBuildDictionaryStep(jobId));
-
-        // Phase 3: Build Cube
-        result.addTask(createSaveStatisticsStep(jobId)); //<<<<<
-
-        // create htable step
-        result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
-        result.addTask(createInMemCubingStep(flatHiveTableDesc, result.getId()));
-        // bulk load step
-        result.addTask(createBulkLoadStep(jobId)); //<<<<<
-
-        // Phase 4: Update Metadata & Cleanup
-        result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
-        inputSide.addStepPhase4_Cleanup(result);
-
-        return result;
-    }
-
-    private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
-        SaveStatisticsStep result = new SaveStatisticsStep();
-        result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setStatisticsPath(getStatisticsPath(jobId));
-        return result;
-    }
-
-    private MapReduceExecutable createInMemCubingStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
-        // base cuboid job
-        MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
-
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg);
-
-        baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
-
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
-        appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
-        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "level", "0");
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-
-        baseCuboidStep.setMapReduceParams(cmd.toString());
-        baseCuboidStep.setMapReduceJobClass(InMemCuboidJob.class);
-        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
-        return baseCuboidStep;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java
deleted file mode 100644
index 25ff082..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr2;
-
-import java.util.List;
-
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
-import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class BatchMergeJobBuilder2 extends JobBuilderSupport {
-
-    public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
-        super(mergeSegment, submitter);
-    }
-
-    public CubingJob build() {
-        final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
-        final String jobId = result.getId();
-
-        final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
-        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
-        final List<String> mergingSegmentIds = Lists.newArrayList();
-        final List<String> mergingCuboidPaths = Lists.newArrayList();
-        final List<String> mergingHTables = Lists.newArrayList();
-        for (CubeSegment merging : mergingSegments) {
-            mergingSegmentIds.add(merging.getUuid());
-            mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
-            mergingHTables.add(merging.getStorageLocationIdentifier());
-        }
-
-        result.addTask(createMergeDictionaryStep(mergingSegmentIds));
-
-        String mergedStatisticsFolder = getStatisticsPath(jobId);
-        result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, mergedStatisticsFolder));
-
-        // create htable step
-        result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
-
-        String formattedTables = StringUtil.join(mergingHTables, ",");
-        result.addTask(createMergeCuboidDataFromHBaseStep(formattedTables, jobId));
-
-        // bulk load step
-        result.addTask(createBulkLoadStep(jobId)); //<<<<<
-
-        // update cube info
-        result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
-
-        result.addTask(createGarbageCollectionStep(mergingHTables, null)); //<<<<<
-
-        return result;
-    }
-
-    private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
-        MergeStatisticsStep result = new MergeStatisticsStep();
-        result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
-        result.setMergedStatisticsPath(mergedStatisticsFolder);
-        return result;
-    }
-
-    private MapReduceExecutable createMergeCuboidDataFromHBaseStep(String inputTableNames, String jobId) {
-        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
-        mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", inputTableNames);
-        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
-
-        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
-        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
-        mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
-        return mergeCuboidDataStep;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java b/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java
deleted file mode 100644
index aeddb9b..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.kylin.engine.mr2;
-
-public interface IMROutput2 {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java b/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java
deleted file mode 100644
index 8ec6f69..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr2;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.IBatchCubingEngine;
-import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-public class MRBatchCubingEngine2 implements IBatchCubingEngine {
-
-    @Override
-    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
-        return new BatchCubingJobBuilder2(newSegment, submitter).build();
-    }
-
-    @Override
-    public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
-        return new BatchMergeJobBuilder2(mergeSegment, submitter).build();
-    }
-    
-    @Override
-    public Class<?> getSourceInterface() {
-        return IMRInput.class;
-    }
-
-    @Override
-    public Class<?> getStorageInterface() {
-        return IMROutput2.class;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
index 87fc188..46bb9fc 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.job.hadoop.cube;
 
 import org.apache.commons.cli.Options;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -32,11 +33,15 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidCLI;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +60,9 @@ public class CuboidJob extends AbstractHadoopJob {
 
     @Override
     public int run(String[] args) throws Exception {
+        if (this.mapperClass == null)
+            throw new Exception("Mapper class is not set!");
+        
         Options options = new Options();
 
         try {
@@ -67,7 +75,6 @@ public class CuboidJob extends AbstractHadoopJob {
             options.addOption(OPTION_INPUT_FORMAT);
             parseOptions(options, args);
 
-            Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
             int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL));
@@ -79,26 +86,11 @@ public class CuboidJob extends AbstractHadoopJob {
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
             logger.info("Starting: " + job.getJobName());
-            FileInputFormat.setInputPaths(job, input);
 
             setJobClasspath(job);
 
             // Mapper
-            if (this.mapperClass == null) {
-                throw new Exception("Mapper class is not set!");
-            }
-
-            boolean isInputTextFormat = false;
-            if (hasOption(OPTION_INPUT_FORMAT) && ("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) {
-                isInputTextFormat = true;
-            }
-
-            if (isInputTextFormat) {
-                job.setInputFormatClass(TextInputFormat.class);
-
-            } else {
-                job.setInputFormatClass(SequenceFileInputFormat.class);
-            }
+            configureMapperInputFormat(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
             job.setMapperClass(this.mapperClass);
             job.setMapOutputKeyClass(Text.class);
             job.setMapOutputValueClass(Text.class);
@@ -130,6 +122,25 @@ public class CuboidJob extends AbstractHadoopJob {
         }
     }
 
+    private void configureMapperInputFormat(CubeSegment cubeSeg) throws IOException {
+        String input = getOptionValue(OPTION_INPUT_PATH);
+        
+        if (StringUtils.isBlank(input)) {
+            // base cuboid case
+            IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+            flatTableInputFormat.configureJob(job);
+        }
+        else {
+            // n-dimension cuboid case
+            FileInputFormat.setInputPaths(job, new Path(input));
+            if (hasOption(OPTION_INPUT_FORMAT) && ("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) {
+                job.setInputFormatClass(TextInputFormat.class);
+            } else {
+                job.setInputFormatClass(SequenceFileInputFormat.class);
+            }
+        }
+    }
+
     protected void setReduceTaskNum(Job job, KylinConfig config, String cubeName, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
         Configuration jobConf = job.getConfiguration();
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
index 599dde8..9fa1159 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
@@ -19,25 +19,37 @@
 package org.apache.kylin.job.hadoop.cube;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 
-import org.apache.hadoop.io.Text;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.job.constant.BatchConstants;
 
 /**
  * @author George Song (ysong1)
  */
-public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, Text> {
+public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, Object> {
+    
+    private IMRTableInputFormat flatTableInputFormat;
 
     @Override
-    public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
+    protected void setup(Context context) throws IOException {
+        super.setup(context);
+        flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
+    }
+
+    @Override
+    public void map(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
         counter++;
         if (counter % BatchConstants.COUNTER_MAX == 0) {
             logger.info("Handled " + counter + " records!");
         }
+        
 
         try {
             //put a record into the shared bytesSplitter
-            bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
+            String[] row = flatTableInputFormat.parseMapperInput(value);
+            bytesSplitter.setBuffers(convertUTF8Bytes(row));
             //take care of the data in bytesSplitter
             outputKV(context);
 
@@ -46,4 +58,12 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, T
         }
     }
 
+    private byte[][] convertUTF8Bytes(String[] row) throws UnsupportedEncodingException {
+        byte[][] result = new byte[row.length][];
+        for (int i = 0; i < row.length; i++) {
+            result[i] = row[i].getBytes("UTF-8");
+        }
+        return result;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
new file mode 100644
index 0000000..63e5902
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMROutput;
+import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class HBaseMROutput2 implements IMROutput2 {
+
+    @Override
+    public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) {
+        return new IMRBatchCubingOutputSide2() {
+            HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+            @Override
+            public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
+                result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
+            }
+
+            @Override
+            public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+                result.addTask(createBulkLoadStep(jobId)); //<<<<<
+            }
+
+            @Override
+            public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+                // nothing to do
+            }
+        };
+    }
+
+    @Override
+    public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(final CubeSegment seg) {
+        return new IMRBatchMergeOutputSide2() {
+            HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+                result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
+            }
+
+            @Override
+            public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow) {
+                result.addTask(createBulkLoadStep(jobId)); //<<<<<
+            }
+
+            @Override
+            public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
+                result.addTask(createGarbageCollectionStep(mergingHTables, null)); //<<<<<
+            }
+        };
+    }
+}