You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/24 01:20:28 UTC

[20/28] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory2.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory2.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory2.java
new file mode 100644
index 0000000..9e26378
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory2.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage;
+
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.metadata.model.IBuildable;
+
+/**
+ */
+public class StorageFactory2 {
+    
+    private static final IStorage dft = (IStorage) ClassUtil.newInstance("org.apache.kylin.storage.hbase.HBaseStorage");
+    
+    public static <T> T createEngineAdapter(IBuildable buildable, Class<T> engineInterface) {
+        return dft.adaptToBuildEngine(engineInterface);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
new file mode 100644
index 0000000..c6b1a18
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.tuple;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.List;
+
+import net.sf.ehcache.pool.sizeof.annotations.IgnoreSizeOf;
+
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.measure.DoubleMutable;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
+
+/**
+ * @author xjiang
+ */
+public class Tuple implements ITuple {
+
+    @IgnoreSizeOf
+    private final TupleInfo info;
+    private final Object[] values;
+
+    public Tuple(TupleInfo info) {
+        this.info = info;
+        this.values = new Object[info.size()];
+    }
+
+    public List<String> getAllFields() {
+        return info.getAllFields();
+    }
+
+    public List<TblColRef> getAllColumns() {
+        return info.getAllColumns();
+    }
+
+    public Object[] getAllValues() {
+        return values;
+    }
+
+    @Override
+    public ITuple makeCopy() {
+        Tuple ret = new Tuple(this.info);
+        for (int i = 0; i < this.values.length; ++i) {
+            ret.values[i] = this.values[i];
+        }
+        return ret;
+    }
+
+    public TupleInfo getInfo() {
+        return info;
+    }
+
+    public String getFieldName(TblColRef col) {
+        return info.getFieldName(col);
+    }
+
+    public TblColRef getFieldColumn(String fieldName) {
+        return info.getColumn(fieldName);
+    }
+
+    public Object getValue(String fieldName) {
+        int index = info.getFieldIndex(fieldName);
+        return values[index];
+    }
+
+    public Object getValue(TblColRef col) {
+        int index = info.getColumnIndex(col);
+        return values[index];
+    }
+
+    public String getDataTypeName(int idx) {
+        return info.getDataTypeName(idx);
+    }
+
+    public void setDimensionValue(String fieldName, String fieldValue) {
+        setDimensionValue(info.getFieldIndex(fieldName), fieldValue);
+    }
+
+    public void setDimensionValue(int idx, String fieldValue) {
+        Object objectValue = convertOptiqCellValue(fieldValue, getDataTypeName(idx));
+        values[idx] = objectValue;
+    }
+
+    public void setMeasureValue(String fieldName, Object fieldValue) {
+        setMeasureValue(info.getFieldIndex(fieldName), fieldValue);
+    }
+
+    public void setMeasureValue(int idx, Object fieldValue) {
+        fieldValue = convertWritableToJava(fieldValue);
+
+        String dataType = getDataTypeName(idx);
+        // special handling for BigDecimal, allow double be aggregated as
+        // BigDecimal during cube build for best precision
+        if ("double".equals(dataType) && fieldValue instanceof BigDecimal) {
+            fieldValue = ((BigDecimal) fieldValue).doubleValue();
+        } else if ("integer".equals(dataType) && !(fieldValue instanceof Integer)) {
+            fieldValue = ((Number) fieldValue).intValue();
+        } else if ("float".equals(dataType) && fieldValue instanceof BigDecimal) {
+            fieldValue = ((BigDecimal) fieldValue).floatValue();
+        }
+        values[idx] = fieldValue;
+    }
+
+    private Object convertWritableToJava(Object o) {
+        if (o instanceof LongMutable)
+            o = ((LongMutable) o).get();
+        else if (o instanceof DoubleMutable)
+            o = ((DoubleMutable) o).get();
+        return o;
+    }
+
+    public boolean hasColumn(TblColRef column) {
+        return info.hasColumn(column);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        for (String field : info.getAllFields()) {
+            sb.append(field);
+            sb.append("=");
+            sb.append(getValue(field));
+            sb.append(",");
+        }
+        return sb.toString();
+    }
+
+    public static long epicDaysToMillis(int days) {
+        return 1L * days * (1000 * 3600 * 24);
+    }
+
+    public static int dateToEpicDays(String strValue) {
+        Date dateValue = DateFormat.stringToDate(strValue); // NOTE: forces GMT timezone
+        long millis = dateValue.getTime();
+        return (int) (millis / (1000 * 3600 * 24));
+    }
+
+    public static long getTs(ITuple row, TblColRef partitionCol) {
+        //ts column type differentiate
+        if (partitionCol.getDatatype().equals("date")) {
+            return Tuple.epicDaysToMillis(Integer.valueOf(row.getValue(partitionCol).toString()));
+        } else {
+            return Long.valueOf(row.getValue(partitionCol).toString());
+        }
+    }
+
+    public static Object convertOptiqCellValue(String strValue, String dataTypeName) {
+        if (strValue == null)
+            return null;
+
+        if ((strValue.equals("") || strValue.equals("\\N")) && !dataTypeName.equals("string"))
+            return null;
+
+        // TODO use data type enum instead of string comparison
+        if ("date".equals(dataTypeName)) {
+            // convert epoch time
+            return dateToEpicDays(strValue);// Optiq expects Integer instead of Long. by honma
+        } else if ("timestamp".equals(dataTypeName) || "datetime".equals(dataTypeName)) {
+            return Long.valueOf(DateFormat.stringToMillis(strValue));
+        } else if ("tinyint".equals(dataTypeName)) {
+            return Byte.valueOf(strValue);
+        } else if ("short".equals(dataTypeName) || "smallint".equals(dataTypeName)) {
+            return Short.valueOf(strValue);
+        } else if ("integer".equals(dataTypeName)) {
+            return Integer.valueOf(strValue);
+        } else if ("long".equals(dataTypeName) || "bigint".equals(dataTypeName)) {
+            return Long.valueOf(strValue);
+        } else if ("double".equals(dataTypeName)) {
+            return Double.valueOf(strValue);
+        } else if ("decimal".equals(dataTypeName)) {
+            return new BigDecimal(strValue);
+        } else if ("float".equals(dataTypeName)){
+            return Float.valueOf(strValue);
+        } else {
+            return strValue;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-storage/src/main/java/org/apache/kylin/storage/tuple/TupleInfo.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/tuple/TupleInfo.java b/core-storage/src/main/java/org/apache/kylin/storage/tuple/TupleInfo.java
new file mode 100644
index 0000000..735cc64
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/tuple/TupleInfo.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.tuple;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * 
+ * @author xjiang
+ * 
+ */
+public class TupleInfo {
+
+    private final Map<String, Integer> fieldMap;
+    private final Map<TblColRef, Integer> columnMap;
+    
+    private final List<String> fields;
+    private final List<TblColRef> columns;
+    private final List<String> dataTypeNames;
+
+    public TupleInfo() {
+        fieldMap = new HashMap<String, Integer>();
+        columnMap = new HashMap<TblColRef, Integer>();
+        fields = new ArrayList<String>();
+        columns = new ArrayList<TblColRef>();
+        dataTypeNames = new ArrayList<String>();
+    }
+
+    public TblColRef getColumn(String fieldName) {
+        int idx = getFieldIndex(fieldName);
+        return columns.get(idx);
+    }
+
+    public int getColumnIndex(TblColRef col) {
+        return columnMap.get(col);
+    }
+
+    public String getDataTypeName(int index) {
+        return dataTypeNames.get(index);
+    }
+
+    public int getFieldIndex(String fieldName) {
+        return fieldMap.get(fieldName);
+    }
+    
+    public boolean hasField(String fieldName) {
+        return fieldMap.containsKey(fieldName);
+    }
+
+    public String getFieldName(TblColRef col) {
+        int idx = columnMap.get(col);
+        return fields.get(idx);
+    }
+
+    public boolean hasColumn(TblColRef col) {
+        return columnMap.containsKey(col);
+    }
+
+    public void setField(String fieldName, TblColRef col, int index) {
+        fieldMap.put(fieldName, index);
+
+        if (col != null)
+            columnMap.put(col, index);
+
+        if (fields.size() > index)
+            fields.set(index, fieldName);
+        else
+            fields.add(index, fieldName);
+
+        if (columns.size() > index)
+            columns.set(index, col);
+        else
+            columns.add(index, col);
+        
+        if (dataTypeNames.size() > index)
+            dataTypeNames.set(index, col.getType().getName());
+        else
+            dataTypeNames.add(index, col.getType().getName());
+    }
+
+    public List<String> getAllFields() {
+        return fields;
+    }
+
+    public List<TblColRef> getAllColumns() {
+        return columns;
+    }
+
+    public int size() {
+        return fields.size();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/pom.xml
----------------------------------------------------------------------
diff --git a/engine-mr/pom.xml b/engine-mr/pom.xml
index a226e80..2d6f157 100644
--- a/engine-mr/pom.xml
+++ b/engine-mr/pom.xml
@@ -79,18 +79,33 @@
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-app</artifactId>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-yarn-api</artifactId>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.mrunit</groupId>
+            <artifactId>mrunit</artifactId>
+            <classifier>hadoop2</classifier>
+            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
new file mode 100644
index 0000000..6b5cfa4
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
+import org.apache.kylin.engine.mr.steps.NDCuboidJob;
+import org.apache.kylin.job.constant.ExecutableConstants;
+
+public class BatchCubingJobBuilder extends JobBuilderSupport {
+
+    private final IMRBatchCubingInputSide inputSide;
+    private final IMRBatchCubingOutputSide outputSide;
+
+    public BatchCubingJobBuilder(CubeSegment newSegment, String submitter) {
+        super(newSegment, submitter);
+        this.inputSide = MRUtil.getBatchCubingInputSide(seg);
+        this.outputSide = MRUtil.getBatchCubingOutputSide(seg);
+    }
+
+    public CubingJob build() {
+        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+        final String jobId = result.getId();
+        final String cuboidRootPath = getCuboidRootPath(jobId);
+
+        // Phase 1: Create Flat Table
+        inputSide.addStepPhase1_CreateFlatTable(result);
+
+        // Phase 2: Build Dictionary
+        result.addTask(createFactDistinctColumnsStep(jobId));
+        result.addTask(createBuildDictionaryStep(jobId));
+
+        // Phase 3: Build Cube
+        final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
+        final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
+        final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
+        // base cuboid step
+        result.addTask(createBaseCuboidStep(cuboidOutputTempPath, jobId));
+        // n dim cuboid steps
+        for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
+            int dimNum = totalRowkeyColumnsCount - i;
+            result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
+        }
+        outputSide.addStepPhase3_BuildCube(result, cuboidRootPath);
+
+        // Phase 4: Update Metadata & Cleanup
+        result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
+        inputSide.addStepPhase4_Cleanup(result);
+        outputSide.addStepPhase4_Cleanup(result);
+
+        return result;
+    }
+
+    private MapReduceExecutable createBaseCuboidStep(String[] cuboidOutputTempPath, String jobId) {
+        // base cuboid job
+        MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
+
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd, seg);
+
+        baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
+
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "input", ""); // marks flat table input
+        appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "level", "0");
+
+        baseCuboidStep.setMapReduceParams(cmd.toString());
+        baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
+        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
+        return baseCuboidStep;
+    }
+
+    private MapReduceExecutable createNDimensionCuboidStep(String[] cuboidOutputTempPath, int dimNum, int totalRowkeyColumnCount) {
+        // ND cuboid job
+        MapReduceExecutable ndCuboidStep = new MapReduceExecutable();
+
+        ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension");
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd, seg);
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
+        appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
+        appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getCubeInstance().getName() + "_Step");
+        appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum));
+
+        ndCuboidStep.setMapReduceParams(cmd.toString());
+        ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class);
+        return ndCuboidStep;
+    }
+
+    private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
+        String[] paths = new String[groupRowkeyColumnsCount + 1];
+        for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
+            int dimNum = totalRowkeyColumnCount - i;
+            if (dimNum == totalRowkeyColumnCount) {
+                paths[i] = cuboidRootPath + "base_cuboid";
+            } else {
+                paths[i] = cuboidRootPath + dimNum + "d_cuboid";
+            }
+        }
+        return paths;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
new file mode 100644
index 0000000..abdabd8
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
+import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
+import org.apache.kylin.job.constant.ExecutableConstants;
+
+public class BatchCubingJobBuilder2 extends JobBuilderSupport {
+
+    private final IMRBatchCubingInputSide inputSide;
+    private final IMRBatchCubingOutputSide2 outputSide;
+
+    public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
+        super(newSegment, submitter);
+        this.inputSide = MRUtil.getBatchCubingInputSide(seg);
+        this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
+    }
+
+    public CubingJob build() {
+        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+        final String jobId = result.getId();
+
+        // Phase 1: Create Flat Table
+        inputSide.addStepPhase1_CreateFlatTable(result);
+
+        // Phase 2: Build Dictionary
+        result.addTask(createFactDistinctColumnsStepWithStats(jobId));
+        result.addTask(createBuildDictionaryStep(jobId));
+        result.addTask(createSaveStatisticsStep(jobId));
+        outputSide.addStepPhase2_BuildDictionary(result);
+
+        // Phase 3: Build Cube
+        result.addTask(createInMemCubingStep(jobId));
+        outputSide.addStepPhase3_BuildCube(result);
+
+        // Phase 4: Update Metadata & Cleanup
+        result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
+        inputSide.addStepPhase4_Cleanup(result);
+        outputSide.addStepPhase4_Cleanup(result);
+
+        return result;
+    }
+
+    private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
+        SaveStatisticsStep result = new SaveStatisticsStep();
+        result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setStatisticsPath(getStatisticsPath(jobId));
+        return result;
+    }
+
+    private MapReduceExecutable createInMemCubingStep(String jobId) {
+        // base cuboid job
+        MapReduceExecutable cubeStep = new MapReduceExecutable();
+
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd, seg);
+
+        cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
+
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "jobflowid", jobId);
+
+        cubeStep.setMapReduceParams(cmd.toString());
+        cubeStep.setMapReduceJobClass(InMemCuboidJob.class);
+        cubeStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
+        return cubeStep;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
new file mode 100644
index 0000000..3f6201c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
+import org.apache.kylin.job.constant.ExecutableConstants;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class BatchMergeJobBuilder extends JobBuilderSupport {
+
+    private final IMRBatchMergeOutputSide outputSide;
+
+    public BatchMergeJobBuilder(CubeSegment mergeSegment, String submitter) {
+        super(mergeSegment, submitter);
+        this.outputSide = MRUtil.getBatchMergeOutputSide(seg);
+    }
+
+    public CubingJob build() {
+        final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+        final String jobId = result.getId();
+        final String cuboidRootPath = getCuboidRootPath(jobId);
+
+        final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+        final List<String> mergingSegmentIds = Lists.newArrayList();
+        final List<String> mergingCuboidPaths = Lists.newArrayList();
+        for (CubeSegment merging : mergingSegments) {
+            mergingSegmentIds.add(merging.getUuid());
+            mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
+        }
+
+        // Phase 1: Merge Dictionary
+        result.addTask(createMergeDictionaryStep(mergingSegmentIds));
+
+        // Phase 2: Merge Cube Files
+        String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
+        result.addTask(createMergeCuboidDataStep(seg, formattedPath, cuboidRootPath));
+        outputSide.addStepPhase2_BuildCube(result, cuboidRootPath);
+
+        // Phase 3: Update Metadata & Cleanup
+        result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
+        outputSide.addStepPhase3_Cleanup(result);
+
+        return result;
+    }
+
+    private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) {
+        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
+        mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd, seg);
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "input", inputPath);
+        appendExecCmdParameters(cmd, "output", outputPath);
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+
+        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
+        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class);
+        return mergeCuboidDataStep;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
new file mode 100644
index 0000000..c0fe759
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.MergeCuboidFromStorageJob;
+import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
+import org.apache.kylin.job.constant.ExecutableConstants;
+
+import java.util.List;
+
+public class BatchMergeJobBuilder2 extends JobBuilderSupport {
+
+    private final IMROutput2.IMRBatchMergeOutputSide2 outputSide;
+    
+    public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
+        super(mergeSegment, submitter);
+        this.outputSide = MRUtil.getBatchMergeOutputSide2(seg);
+    }
+
+    public CubingJob build() {
+        final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+        final String jobId = result.getId();
+
+        final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+        final List<String> mergingSegmentIds = Lists.newArrayList();
+        final List<String> mergingHTables = Lists.newArrayList();
+        for (CubeSegment merging : mergingSegments) {
+            mergingSegmentIds.add(merging.getUuid());
+            mergingHTables.add(merging.getStorageLocationIdentifier());
+        }
+
+        // Phase 1: Merge Dictionary
+        result.addTask(createMergeDictionaryStep(mergingSegmentIds));
+        result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, getStatisticsPath(jobId)));
+        outputSide.addStepPhase1_MergeDictionary(result);
+
+        // Phase 2: Merge Cube
+        String formattedTables = StringUtil.join(mergingHTables, ",");
+        result.addTask(createMergeCuboidDataFromStorageStep(formattedTables, jobId));
+        outputSide.addStepPhase2_BuildCube(result);
+
+        // Phase 3: Update Metadata & Cleanup
+        result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
+        outputSide.addStepPhase3_Cleanup(result);
+
+        return result;
+    }
+
+    private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
+        MergeStatisticsStep result = new MergeStatisticsStep();
+        result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setMergingSegmentIds(mergingSegmentIds);
+        result.setMergedStatisticsPath(mergedStatisticsFolder);
+        return result;
+    }
+
+    private MapReduceExecutable createMergeCuboidDataFromStorageStep(String inputTableNames, String jobId) {
+        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
+        mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd, seg);
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+        appendExecCmdParameters(cmd, "jobflowid", jobId);
+
+        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
+        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromStorageJob.class);
+        mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+        return mergeCuboidDataStep;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
new file mode 100644
index 0000000..37a8841
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
@@ -0,0 +1,166 @@
+package org.apache.kylin.engine.mr;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.kylin.common.util.Bytes;
+
+public class ByteArrayWritable implements WritableComparable<ByteArrayWritable> {
+
+    private byte[] data;
+    private int offset;
+    private int length;
+
+    public ByteArrayWritable() {
+        this(null, 0, 0);
+    }
+
+    public ByteArrayWritable(int capacity) {
+        this(new byte[capacity], 0, capacity);
+    }
+
+    public ByteArrayWritable(byte[] data) {
+        this(data, 0, data == null ? 0 : data.length);
+    }
+
+    public ByteArrayWritable(byte[] data, int offset, int length) {
+        this.data = data;
+        this.offset = offset;
+        this.length = length;
+    }
+
+    public byte[] array() {
+        return data;
+    }
+
+    public int offset() {
+        return offset;
+    }
+
+    public int length() {
+        return length;
+    }
+
+    public void set(byte[] array) {
+        set(array, 0, array.length);
+    }
+
+    public void set(byte[] array, int offset, int length) {
+        this.data = array;
+        this.offset = offset;
+        this.length = length;
+    }
+
+    public ByteBuffer asBuffer() {
+        if (data == null)
+            return null;
+        else if (offset == 0 && length == data.length)
+            return ByteBuffer.wrap(data);
+        else
+            return ByteBuffer.wrap(data, offset, length).slice();
+    }
+
+    @Override
+    public int hashCode() {
+        if (data == null)
+            return 0;
+        else
+            return Bytes.hashCode(data, offset, length);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(this.length);
+        out.write(this.data, this.offset, this.length);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.length = in.readInt();
+        this.data = new byte[this.length];
+        in.readFully(this.data, 0, this.length);
+        this.offset = 0;
+    }
+
+    // Below methods copied from BytesWritable
+    /**
+     * Define the sort order of the BytesWritable.
+     * @param that The other bytes writable
+     * @return Positive if left is bigger than right, 0 if they are equal, and
+     *         negative if left is smaller than right.
+     */
+    public int compareTo(ByteArrayWritable that) {
+        return WritableComparator.compareBytes(this.data, this.offset, this.length, that.data, that.offset, that.length);
+    }
+
+    /**
+     * Compares the bytes in this object to the specified byte array
+     * @param that
+     * @return Positive if left is bigger than right, 0 if they are equal, and
+     *         negative if left is smaller than right.
+     */
+    public int compareTo(final byte[] that) {
+        return WritableComparator.compareBytes(this.data, this.offset, this.length, that, 0, that.length);
+    }
+
+    /**
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    @Override
+    public boolean equals(Object right_obj) {
+        if (right_obj instanceof byte[]) {
+            return compareTo((byte[]) right_obj) == 0;
+        }
+        if (right_obj instanceof ByteArrayWritable) {
+            return compareTo((ByteArrayWritable) right_obj) == 0;
+        }
+        return false;
+    }
+
+    /**
+     * @see java.lang.Object#toString()
+     */
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder(3 * this.length);
+        final int endIdx = this.offset + this.length;
+        for (int idx = this.offset; idx < endIdx; idx++) {
+            sb.append(' ');
+            String num = Integer.toHexString(0xff & this.data[idx]);
+            // if it is only one digit, add a leading 0.
+            if (num.length() < 2) {
+                sb.append('0');
+            }
+            sb.append(num);
+        }
+        return sb.length() > 0 ? sb.substring(1) : "";
+    }
+
+    /** A Comparator optimized for ByteArrayWritable.
+     */
+    public static class Comparator extends WritableComparator {
+        private BytesWritable.Comparator comparator = new BytesWritable.Comparator();
+
+        /** constructor */
+        public Comparator() {
+            super(ByteArrayWritable.class);
+        }
+
+        /**
+         * @see org.apache.hadoop.io.WritableComparator#compare(byte[], int, int, byte[], int, int)
+         */
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            return comparator.compare(b1, s1, l1, b2, s2, l2);
+        }
+    }
+
+    static { // register this comparator
+        WritableComparator.define(ByteArrayWritable.class, new Comparator());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
new file mode 100644
index 0000000..7251730
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.execution.Output;
+
+/**
+ */
+public class CubingJob extends DefaultChainedExecutable {
+
+    // KEYS of Output.extraInfo map, info passed across job steps
+    public static final String SOURCE_RECORD_COUNT = "sourceRecordCount";
+    public static final String SOURCE_SIZE_BYTES = "sourceSizeBytes";
+    public static final String CUBE_SIZE_BYTES = "byteSizeBytes";
+    public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
+    
+    private static final String CUBE_INSTANCE_NAME = "cubeName";
+    private static final String SEGMENT_ID = "segmentId";
+    
+    public static CubingJob createBuildJob(CubeSegment seg, String submitter, JobEngineConfig config) {
+        return initCubingJob(seg, "BUILD", submitter, config);
+    }
+    
+    public static CubingJob createMergeJob(CubeSegment seg, String submitter, JobEngineConfig config) {
+        return initCubingJob(seg, "MERGE", submitter, config);
+    }
+    
+    private static CubingJob initCubingJob(CubeSegment seg, String jobType, String submitter, JobEngineConfig config) {
+        CubingJob result = new CubingJob();
+        SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
+        format.setTimeZone(TimeZone.getTimeZone(config.getTimeZone()));
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setName(seg.getCubeInstance().getName() + " - " + seg.getName() + " - " + jobType + " - " + format.format(new Date(System.currentTimeMillis())));
+        result.setSubmitter(submitter);
+        result.setNotifyList(seg.getCubeInstance().getDescriptor().getNotifyList());
+        return result;
+    }
+
+    public CubingJob() {
+        super();
+    }
+    
+    void setCubeName(String name) {
+        setParam(CUBE_INSTANCE_NAME, name);
+    }
+
+    public String getCubeName() {
+        return getParam(CUBE_INSTANCE_NAME);
+    }
+
+    void setSegmentId(String segmentId) {
+        setParam(SEGMENT_ID, segmentId);
+    }
+
+    public String getSegmentId() {
+        return getParam(SEGMENT_ID);
+    }
+
+    @Override
+    protected Pair<String, String> formatNotifications(ExecutableContext context, ExecutableState state) {
+        CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig()).getCube(getCubeName());
+        final Output output = jobService.getOutput(getId());
+        String logMsg;
+        state = output.getState();
+        if (state != ExecutableState.ERROR &&
+                !cubeInstance.getDescriptor().getStatusNeedNotify().contains(state.toString().toLowerCase())) {
+            logger.info("state:" + state + " no need to notify users");
+            return null;
+        }
+        switch (state) {
+            case ERROR:
+                logMsg = output.getVerboseMsg();
+                break;
+            case DISCARDED:
+                logMsg = "job has been discarded";
+                break;
+            case SUCCEED:
+                logMsg = "job has succeeded";
+                break;
+            default:
+                return null;
+        }
+        if (logMsg == null) {
+            logMsg = "no error message";
+        }
+        String content = ExecutableConstants.NOTIFY_EMAIL_TEMPLATE;
+        content = content.replaceAll("\\$\\{job_name\\}", getName());
+        content = content.replaceAll("\\$\\{result\\}", state.toString());
+        content = content.replaceAll("\\$\\{cube_name\\}", getCubeName());
+        content = content.replaceAll("\\$\\{start_time\\}", new Date(getStartTime()).toString());
+        content = content.replaceAll("\\$\\{duration\\}", getDuration() / 60000 + "mins");
+        content = content.replaceAll("\\$\\{mr_waiting\\}", getMapReduceWaitTime() / 60000 + "mins");
+        content = content.replaceAll("\\$\\{last_update_time\\}", new Date(getLastModified()).toString());
+        content = content.replaceAll("\\$\\{submitter\\}", getSubmitter());
+        content = content.replaceAll("\\$\\{error_log\\}", logMsg);
+
+        try {
+            InetAddress inetAddress = InetAddress.getLocalHost();
+            content = content.replaceAll("\\$\\{job_engine\\}", inetAddress.getCanonicalHostName());
+        } catch (UnknownHostException e) {
+            logger.warn(e.getLocalizedMessage(), e);
+        }
+
+        String title = "["+ state.toString() + "] - [Kylin Cube Build Job]-" + getCubeName();
+        return Pair.of(title, content);
+    }
+
+    @Override
+    protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
+        long time = 0L;
+        for (AbstractExecutable task: getTasks()) {
+            final ExecutableState status = task.getStatus();
+            if (status != ExecutableState.SUCCEED) {
+                break;
+            }
+            if (task instanceof MapReduceExecutable) {
+                time += ((MapReduceExecutable) task).getMapReduceWaitTime();
+            }
+        }
+        setMapReduceWaitTime(time);
+        super.onExecuteFinished(result, executableContext);
+    }
+
+    public long getMapReduceWaitTime() {
+        return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
+    }
+
+    public void setMapReduceWaitTime(long t) {
+        addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
+    }
+    
+    public long findSourceRecordCount() {
+        return Long.parseLong(findExtraInfo(SOURCE_RECORD_COUNT, "0"));
+    }
+    
+    public long findSourceSizeBytes() {
+        return Long.parseLong(findExtraInfo(SOURCE_SIZE_BYTES, "0"));
+    }
+    
+    public long findCubeSizeBytes() {
+        return Long.parseLong(findExtraInfo(CUBE_SIZE_BYTES, "0"));
+    }
+    
+    private String findExtraInfo(String key, String dft) {
+        for (AbstractExecutable child : getTasks()) {
+            Output output = executableManager.getOutput(child.getId());
+            String value = output.getExtra().get(key);
+            if (value != null)
+                return value;
+        }
+        return dft;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
new file mode 100644
index 0000000..0c39398
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.TableDesc;
+
+/**
+ * Any ITableSource that wishes to serve as input of MapReduce build engine must adapt to this interface.
+ */
+public interface IMRInput {
+
+    /** Return a helper to participate in batch cubing job flow. */
+    public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg);
+
+    /** Return an InputFormat that reads from specified table. */
+    public IMRTableInputFormat getTableInputFormat(TableDesc table);
+    
+    /**
+     * Utility that configures mapper to read from a table.
+     */
+    public interface IMRTableInputFormat {
+        
+        /** Configure the InputFormat of given job. */
+        public void configureJob(Job job);
+        
+        /** Parse a mapper input object into column values. */
+        public String[] parseMapperInput(Object mapperInput);
+    }
+    
+    /**
+     * Participate the batch cubing flow as the input side. Responsible for creating
+     * intermediate flat table (Phase 1) and clean up any leftover (Phase 4).
+     * 
+     * - Phase 1: Create Flat Table
+     * - Phase 2: Build Dictionary (with FlatTableInputFormat)
+     * - Phase 3: Build Cube (with FlatTableInputFormat)
+     * - Phase 4: Update Metadata & Cleanup
+     */
+    public interface IMRBatchCubingInputSide {
+        
+        /** Return an InputFormat that reads from the intermediate flat table */
+        public IMRTableInputFormat getFlatTableInputFormat();
+        
+        /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
+        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
+        
+        /** Add step that does necessary clean up, like delete the intermediate flat table */
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
new file mode 100644
index 0000000..bc6ee1f
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public interface IMROutput {
+
+    /** Return a helper to participate in batch cubing job flow. */
+    public IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg);
+    
+    /**
+     * Participate the batch cubing flow as the output side. Responsible for saving
+     * the cuboid output to storage (Phase 3).
+     * 
+     * - Phase 1: Create Flat Table
+     * - Phase 2: Build Dictionary
+     * - Phase 3: Build Cube
+     * - Phase 4: Update Metadata & Cleanup
+     */
+    public interface IMRBatchCubingOutputSide {
+        
+        /**
+         * Add step that saves cuboid output from HDFS to storage.
+         * 
+         * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn", 
+         * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+         * dictionary encoding; Mx is measure value serialization form.
+         */
+        public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
+        
+        /** Add step that does any necessary clean up. */
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+    }
+    
+    /** Return a helper to participate in batch merge job flow. */
+    public IMRBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg);
+    
+    /**
+     * Participate the batch cubing flow as the output side. Responsible for saving
+     * the cuboid output to storage (Phase 2).
+     * 
+     * - Phase 1: Merge Dictionary
+     * - Phase 2: Merge Cube
+     * - Phase 3: Update Metadata & Cleanup
+     */
+    public interface IMRBatchMergeOutputSide {
+        
+        /**
+         * Add step that saves cuboid output from HDFS to storage.
+         * 
+         * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn", 
+         * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+         * dictionary encoding; Mx is measure value serialization form.
+         */
+        public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
+        
+        /** Add step that does any necessary clean up. */
+        public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
new file mode 100644
index 0000000..974e2fc
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -0,0 +1,88 @@
+package org.apache.kylin.engine.mr;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public interface IMROutput2 {
+
+    /** Return a helper to participate in batch cubing job flow. */
+    public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg);
+
+    /**
+     * Participate the batch cubing flow as the output side.
+     * 
+     * - Phase 1: Create Flat Table
+     * - Phase 2: Build Dictionary
+     * - Phase 3: Build Cube (with StorageOutputFormat)
+     * - Phase 4: Update Metadata & Cleanup
+     */
+    public interface IMRBatchCubingOutputSide2 {
+
+        public IMRStorageOutputFormat getStorageOutputFormat();
+
+        /** Add step that executes after build dictionary and before build cube. */
+        public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow);
+
+        /** Add step that executes after build cube. */
+        public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
+
+        /** Add step that does any necessary clean up. */
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+    }
+
+    public IMRBatchMergeInputSide2 getBatchMergeInputSide(CubeSegment seg);
+
+    public interface IMRBatchMergeInputSide2 {
+        public IMRStorageInputFormat getStorageInputFormat();
+    }
+
+    @SuppressWarnings("rawtypes")
+    public interface IMRStorageInputFormat {
+        
+        public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException;
+        
+        public CubeSegment findSourceSegment(Mapper.Context context, CubeInstance cubeInstance) throws IOException;
+        
+        public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue);
+    }
+
+    /** Return a helper to participate in batch merge job flow. */
+    public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(CubeSegment seg);
+
+    /**
+     * Participate the batch merge flow as the output side.
+     * 
+     * - Phase 1: Merge Dictionary
+     * - Phase 2: Merge Cube (with StorageInputFormat & StorageOutputFormat)
+     * - Phase 3: Update Metadata & Cleanup
+     */
+    public interface IMRBatchMergeOutputSide2 {
+
+        public IMRStorageOutputFormat getStorageOutputFormat();
+
+        /** Add step that executes after merge dictionary and before merge cube. */
+        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+        /** Add step that executes after merge cube. */
+        public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow);
+
+        /** Add step that does any necessary clean up. */
+        public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public interface IMRStorageOutputFormat {
+        public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException;
+        
+        public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
new file mode 100644
index 0000000..5e23531
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
+import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
+import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
+import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Hold reusable steps for builders.
+ */
+public class JobBuilderSupport {
+
+    final protected JobEngineConfig config;
+    final protected CubeSegment seg;
+    final protected String submitter;
+
+    public JobBuilderSupport(CubeSegment seg, String submitter) {
+        Preconditions.checkNotNull(seg, "segment cannot be null");
+        this.config = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+        this.seg = seg;
+        this.submitter = submitter;
+    }
+    
+    public MapReduceExecutable createFactDistinctColumnsStep(String jobId) {
+        return createFactDistinctColumnsStep(jobId, false);
+    }
+    
+    public MapReduceExecutable createFactDistinctColumnsStepWithStats(String jobId) {
+        return createFactDistinctColumnsStep(jobId, true);
+    }
+    
+    private MapReduceExecutable createFactDistinctColumnsStep(String jobId, boolean withStats) {
+        MapReduceExecutable result = new MapReduceExecutable();
+        result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+        result.setMapReduceJobClass(FactDistinctColumnsJob.class);
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd, seg);
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId));
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
+        appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
+        appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
+
+        result.setMapReduceParams(cmd.toString());
+        return result;
+    }
+
+    public HadoopShellExecutable createBuildDictionaryStep(String jobId) {
+        // base cuboid job
+        HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
+        buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
+        StringBuilder cmd = new StringBuilder();
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "input", getFactDistinctColumnsPath(jobId));
+
+        buildDictionaryStep.setJobParams(cmd.toString());
+        buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
+        return buildDictionaryStep;
+    }
+
+    public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) {
+        final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
+        updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+        updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
+        updateCubeInfoStep.setSegmentId(seg.getUuid());
+        updateCubeInfoStep.setCubingJobId(jobId);
+        return updateCubeInfoStep;
+    }
+
+    public MergeDictionaryStep createMergeDictionaryStep(List<String> mergingSegmentIds) {
+        MergeDictionaryStep result = new MergeDictionaryStep();
+        result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setMergingSegmentIds(mergingSegmentIds);
+        return result;
+    }
+    
+    public UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, String jobId) {
+        UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
+        result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setMergingSegmentIds(mergingSegmentIds);
+        result.setCubingJobId(jobId);
+        return result;
+    }
+
+    // ============================================================================
+
+    public String getJobWorkingDir(String jobId) {
+        return getJobWorkingDir(config, jobId);
+    }
+    
+    public String getCuboidRootPath(String jobId) {
+        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
+    }
+    
+    public String getCuboidRootPath(CubeSegment seg) {
+        return getCuboidRootPath(seg.getLastBuildJobID());
+    }
+    
+    public void appendMapReduceParameters(StringBuilder buf, CubeSegment seg) {
+        try {
+            String jobConf = config.getHadoopJobConfFilePath(seg.getCubeDesc().getModel().getCapacity());
+            if (jobConf != null && jobConf.length() > 0) {
+                buf.append(" -conf ").append(jobConf);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    public String getFactDistinctColumnsPath(String jobId) {
+        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
+    }
+
+
+    public String getStatisticsPath(String jobId) {
+        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/statistics";
+    }
+
+    // ============================================================================
+    // static methods also shared by other job flow participant
+    // ----------------------------------------------------------------------------
+
+    public static String getJobWorkingDir(JobEngineConfig conf, String jobId) {
+        return conf.getHdfsWorkingDirectory() + "/" + "kylin-" + jobId;
+    }
+
+    public static StringBuilder appendExecCmdParameters(StringBuilder buf, String paraName, String paraValue) {
+        return buf.append(" -").append(paraName).append(" ").append(paraValue);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
new file mode 100644
index 0000000..61328c9
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.IBatchCubingEngine;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class MRBatchCubingEngine implements IBatchCubingEngine {
+
+    @Override
+    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+        return new BatchCubingJobBuilder(newSegment, submitter).build();
+    }
+
+    @Override
+    public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+        return new BatchMergeJobBuilder(mergeSegment, submitter).build();
+    }
+    
+    @Override
+    public Class<?> getSourceInterface() {
+        return IMRInput.class;
+    }
+
+    @Override
+    public Class<?> getStorageInterface() {
+        return IMROutput.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
new file mode 100644
index 0000000..57ec128
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.IBatchCubingEngine;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class MRBatchCubingEngine2 implements IBatchCubingEngine {
+
+    @Override
+    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+        return new BatchCubingJobBuilder2(newSegment, submitter).build();
+    }
+
+    @Override
+    public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+        return new BatchMergeJobBuilder2(mergeSegment, submitter).build();
+    }
+    
+    @Override
+    public Class<?> getSourceInterface() {
+        return IMRInput.class;
+    }
+
+    @Override
+    public Class<?> getStorageInterface() {
+        return IMROutput2.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
new file mode 100644
index 0000000..dc0533e
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -0,0 +1,55 @@
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeInputSide2;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.TableSourceFactory;
+import org.apache.kylin.storage.StorageFactory2;
+
+public class MRUtil {
+
+    public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+        return TableSourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(seg);
+    }
+
+    public static IMRTableInputFormat getTableInputFormat(String tableName) {
+        return getTableInputFormat(getTableDesc(tableName));
+    }
+
+    public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
+        return TableSourceFactory.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc);
+    }
+
+    private static TableDesc getTableDesc(String tableName) {
+        return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName);
+    }
+
+    public static IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
+        return StorageFactory2.createEngineAdapter(seg, IMROutput.class).getBatchCubingOutputSide(seg);
+    }
+
+    public static IMRBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg) {
+        return StorageFactory2.createEngineAdapter(seg, IMROutput.class).getBatchMergeOutputSide(seg);
+    }
+
+    public static IMRBatchCubingOutputSide2 getBatchCubingOutputSide2(CubeSegment seg) {
+        return StorageFactory2.createEngineAdapter(seg, IMROutput2.class).getBatchCubingOutputSide(seg);
+    }
+    
+    public static IMRBatchMergeInputSide2 getBatchMergeInputSide2(CubeSegment seg) {
+        return StorageFactory2.createEngineAdapter(seg, IMROutput2.class).getBatchMergeInputSide(seg);
+    }
+    
+    public static IMRBatchMergeOutputSide2 getBatchMergeOutputSide2(CubeSegment seg) {
+        return StorageFactory2.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
+    }
+    
+}