You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/21 02:29:31 UTC

[6/7] incubator-kylin git commit: KYLIN-878 HBase storage abstraction for cubing flow

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 79430f6..3217e4b 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -23,120 +23,117 @@ import java.util.List;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
+import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
+import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
 import org.apache.kylin.job.common.HadoopShellExecutable;
 import org.apache.kylin.job.common.MapReduceExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.cube.CubeHFileJob;
-import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob;
-import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
-import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
+import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
+import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
 
 import com.google.common.base.Preconditions;
 
 /**
- * Hold reusable methods for real builders.
+ * Hold reusable steps for builders.
  */
-abstract public class JobBuilderSupport {
+public class JobBuilderSupport {
 
-    protected final JobEngineConfig config;
-    protected final CubeSegment seg;
-    protected final String submitter;
+    final protected JobEngineConfig config;
+    final protected CubeSegment seg;
+    final protected String submitter;
 
-    protected JobBuilderSupport(CubeSegment seg, String submitter) {
+    public JobBuilderSupport(CubeSegment seg, String submitter) {
         Preconditions.checkNotNull(seg, "segment cannot be null");
         this.config = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
         this.seg = seg;
         this.submitter = submitter;
     }
     
-    protected MapReduceExecutable createRangeRowkeyDistributionStep(String inputPath, String jobId) {
-        MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable();
-        rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
+    public MapReduceExecutable createFactDistinctColumnsStep(String jobId) {
+        return createFactDistinctColumnsStep(jobId, false);
+    }
+    
+    public MapReduceExecutable createFactDistinctColumnsStepWithStats(String jobId) {
+        return createFactDistinctColumnsStep(jobId, true);
+    }
+    
+    private MapReduceExecutable createFactDistinctColumnsStep(String jobId, boolean withStats) {
+        MapReduceExecutable result = new MapReduceExecutable();
+        result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+        result.setMapReduceJobClass(FactDistinctColumnsJob.class);
         StringBuilder cmd = new StringBuilder();
-
         appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(jobId));
         appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getCubeInstance().getName() + "_Step");
+        appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId));
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
+        appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
+        appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
 
-        rowkeyDistributionStep.setMapReduceParams(cmd.toString());
-        rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class);
-        return rowkeyDistributionStep;
+        result.setMapReduceParams(cmd.toString());
+        return result;
     }
 
-    protected HadoopShellExecutable createCreateHTableStep(String jobId) {
-        HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
-        createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
+    public HadoopShellExecutable createBuildDictionaryStep(String jobId) {
+        // base cuboid job
+        HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
+        buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
         StringBuilder cmd = new StringBuilder();
         appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(config.isInMemCubing()));
-
-        createHtableStep.setJobParams(cmd.toString());
-        createHtableStep.setJobClass(CreateHTableJob.class);
+        appendExecCmdParameters(cmd, "input", getFactDistinctColumnsPath(jobId));
 
-        return createHtableStep;
+        buildDictionaryStep.setJobParams(cmd.toString());
+        buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
+        return buildDictionaryStep;
     }
 
-    protected MapReduceExecutable createConvertCuboidToHfileStep(String inputPath, String jobId) {
-        MapReduceExecutable createHFilesStep = new MapReduceExecutable();
-        createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getCubeInstance().getName() + "_Step");
-
-        createHFilesStep.setMapReduceParams(cmd.toString());
-        createHFilesStep.setMapReduceJobClass(CubeHFileJob.class);
-        createHFilesStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
-
-        return createHFilesStep;
+    public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) {
+        final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
+        updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+        updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
+        updateCubeInfoStep.setSegmentId(seg.getUuid());
+        updateCubeInfoStep.setCubingJobId(jobId);
+        return updateCubeInfoStep;
     }
 
-    protected HadoopShellExecutable createBulkLoadStep(String jobId) {
-        HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
-        bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
-
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "input", getHFilePath(jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-
-        bulkLoadStep.setJobParams(cmd.toString());
-        bulkLoadStep.setJobClass(BulkLoadJob.class);
-
-        return bulkLoadStep;
+    public MergeDictionaryStep createMergeDictionaryStep(List<String> mergingSegmentIds) {
+        MergeDictionaryStep result = new MergeDictionaryStep();
+        result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setMergingSegmentIds(mergingSegmentIds);
+        return result;
     }
-
-    protected GarbageCollectionStep createGarbageCollectionStep(List<String> oldHtables, String interimTable) {
-        GarbageCollectionStep result = new GarbageCollectionStep();
-        result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
-        result.setOldHTables(oldHtables);
-        result.setOldHiveTable(interimTable);
+    
+    public UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, String jobId) {
+        UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
+        result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setMergingSegmentIds(mergingSegmentIds);
+        result.setCubingJobId(jobId);
         return result;
     }
 
-    protected String getJobWorkingDir(String jobId) {
+    // ============================================================================
+
+    public String getJobWorkingDir(String jobId) {
         return getJobWorkingDir(config, jobId);
     }
     
-    protected String getCuboidRootPath(String jobId) {
+    public String getCuboidRootPath(String jobId) {
         return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
     }
     
-    protected String getCuboidRootPath(CubeSegment seg) {
+    public String getCuboidRootPath(CubeSegment seg) {
         return getCuboidRootPath(seg.getLastBuildJobID());
     }
     
-    protected void appendMapReduceParameters(StringBuilder buf, CubeSegment seg) {
+    public void appendMapReduceParameters(StringBuilder buf, CubeSegment seg) {
         try {
             String jobConf = config.getHadoopJobConfFilePath(seg.getCubeDesc().getModel().getCapacity());
             if (jobConf != null && jobConf.length() > 0) {
@@ -146,25 +143,16 @@ abstract public class JobBuilderSupport {
             throw new RuntimeException(e);
         }
     }
-
-    protected String getFactDistinctColumnsPath(String jobId) {
+    
+    public String getFactDistinctColumnsPath(String jobId) {
         return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
     }
 
 
-    protected String getStatisticsPath(String jobId) {
+    public String getStatisticsPath(String jobId) {
         return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/statistics";
     }
 
-
-    protected String getHFilePath(String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/";
-    }
-
-    protected String getRowkeyDistributionOutputPath(String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats";
-    }
-
     // ============================================================================
     // static methods also shared by other job flow participant
     // ----------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
index b374a99..61328c9 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
@@ -18,15 +18,9 @@
 
 package org.apache.kylin.engine.mr;
 
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.IBatchCubingEngine;
-import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.TableSourceFactory;
 
 public class MRBatchCubingEngine implements IBatchCubingEngine {
 
@@ -40,25 +34,14 @@ public class MRBatchCubingEngine implements IBatchCubingEngine {
         return new BatchMergeJobBuilder(mergeSegment, submitter).build();
     }
     
-    public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
-        TableDesc tableDesc = getTableDesc(seg.getCubeDesc().getFactTable());
-        return getMRInput(tableDesc).getBatchCubingInputSide(seg);
-    }
-
-    public static IMRTableInputFormat getTableInputFormat(String tableName) {
-        return getTableInputFormat(getTableDesc(tableName));
-    }
-
-    public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
-        return getMRInput(tableDesc).getTableInputFormat(tableDesc);
-    }
-
-    private static IMRInput getMRInput(TableDesc tableDesc) {
-        return TableSourceFactory.createEngineAdapter(tableDesc, IMRInput.class);
+    @Override
+    public Class<?> getSourceInterface() {
+        return IMRInput.class;
     }
 
-    private static TableDesc getTableDesc(String tableName) {
-        return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName);
+    @Override
+    public Class<?> getStorageInterface() {
+        return IMROutput.class;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
new file mode 100644
index 0000000..57ec128
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.IBatchCubingEngine;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class MRBatchCubingEngine2 implements IBatchCubingEngine {
+
+    @Override
+    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+        return new BatchCubingJobBuilder2(newSegment, submitter).build();
+    }
+
+    @Override
+    public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+        return new BatchMergeJobBuilder2(mergeSegment, submitter).build();
+    }
+    
+    @Override
+    public Class<?> getSourceInterface() {
+        return IMRInput.class;
+    }
+
+    @Override
+    public Class<?> getStorageInterface() {
+        return IMROutput2.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
new file mode 100644
index 0000000..dc0533e
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -0,0 +1,55 @@
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeInputSide2;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.TableSourceFactory;
+import org.apache.kylin.storage.StorageFactory2;
+
+public class MRUtil {
+
+    public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+        return TableSourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(seg);
+    }
+
+    public static IMRTableInputFormat getTableInputFormat(String tableName) {
+        return getTableInputFormat(getTableDesc(tableName));
+    }
+
+    public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
+        return TableSourceFactory.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc);
+    }
+
+    private static TableDesc getTableDesc(String tableName) {
+        return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName);
+    }
+
+    public static IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
+        return StorageFactory2.createEngineAdapter(seg, IMROutput.class).getBatchCubingOutputSide(seg);
+    }
+
+    public static IMRBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg) {
+        return StorageFactory2.createEngineAdapter(seg, IMROutput.class).getBatchMergeOutputSide(seg);
+    }
+
+    public static IMRBatchCubingOutputSide2 getBatchCubingOutputSide2(CubeSegment seg) {
+        return StorageFactory2.createEngineAdapter(seg, IMROutput2.class).getBatchCubingOutputSide(seg);
+    }
+    
+    public static IMRBatchMergeInputSide2 getBatchMergeInputSide2(CubeSegment seg) {
+        return StorageFactory2.createEngineAdapter(seg, IMROutput2.class).getBatchMergeInputSide(seg);
+    }
+    
+    public static IMRBatchMergeOutputSide2 getBatchMergeOutputSide2(CubeSegment seg) {
+        return StorageFactory2.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MergeDictionaryStep.java b/job/src/main/java/org/apache/kylin/engine/mr/MergeDictionaryStep.java
deleted file mode 100644
index 0010174..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/MergeDictionaryStep.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeBuilder;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import java.io.IOException;
-import java.util.*;
-
-public class MergeDictionaryStep extends AbstractExecutable {
-
-    private static final String CUBE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
-
-    public MergeDictionaryStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        KylinConfig conf = context.getConfig();
-        final CubeManager mgr = CubeManager.getInstance(conf);
-        final CubeInstance cube = mgr.getCube(getCubeName());
-        final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
-        final List<CubeSegment> mergingSegments = getMergingSegments(cube);
-        
-        Collections.sort(mergingSegments);
-        
-        try {
-            checkLookupSnapshotsMustIncremental(mergingSegments);
-            
-            makeDictForNewSegment(conf, cube, newSegment, mergingSegments);
-            makeSnapshotForNewSegment(cube, newSegment, mergingSegments);
-
-            CubeBuilder cubeBuilder = new CubeBuilder(cube);
-            cubeBuilder.setToUpdateSegs(newSegment);
-            mgr.updateCube(cubeBuilder);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to merge dictionary or lookup snapshots", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-    
-    private List<CubeSegment> getMergingSegments(CubeInstance cube) {
-        List<String> mergingSegmentIds = getMergingSegmentIds();
-        List<CubeSegment> result = Lists.newArrayListWithCapacity(mergingSegmentIds.size());
-        for (String id : mergingSegmentIds) {
-            result.add(cube.getSegmentById(id));
-        }
-        return result;
-    }
-
-    private void checkLookupSnapshotsMustIncremental(List<CubeSegment> mergingSegments) {
-
-        // FIXME check each newer snapshot has only NEW rows but no MODIFIED rows
-    }
-
-    /**
-     * For the new segment, we need to create dictionaries for it, too. For
-     * those dictionaries on fact table, create it by merging underlying
-     * dictionaries For those dictionaries on lookup table, just copy it from
-     * any one of the merging segments, it's guaranteed to be consistent(checked
-     * in CubeSegmentValidator)
-     *
-     * @param cube
-     * @param newSeg
-     * @throws IOException
-     */
-    private void makeDictForNewSegment(KylinConfig conf, CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) throws IOException {
-        HashSet<TblColRef> colsNeedMeringDict = new HashSet<TblColRef>();
-        HashSet<TblColRef> colsNeedCopyDict = new HashSet<TblColRef>();
-        DictionaryManager dictMgr = DictionaryManager.getInstance(conf);
-
-        CubeDesc cubeDesc = cube.getDescriptor();
-        for (DimensionDesc dim : cubeDesc.getDimensions()) {
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (newSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
-                    String dictTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
-                    if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
-                        colsNeedMeringDict.add(col);
-                    } else {
-                        colsNeedCopyDict.add(col);
-                    }
-                }
-            }
-        }
-
-        for (TblColRef col : colsNeedMeringDict) {
-            logger.info("Merging fact table dictionary on : " + col);
-            List<DictionaryInfo> dictInfos = new ArrayList<DictionaryInfo>();
-            for (CubeSegment segment : mergingSegments) {
-                logger.info("Including fact table dictionary of segment : " + segment);
-                if (segment.getDictResPath(col) != null) {
-                    DictionaryInfo dictInfo = dictMgr.getDictionaryInfo(segment.getDictResPath(col));
-                    dictInfos.add(dictInfo);
-                }
-            }
-            mergeDictionaries(dictMgr, newSeg, dictInfos, col);
-        }
-
-        for (TblColRef col : colsNeedCopyDict) {
-            String path = mergingSegments.get(0).getDictResPath(col);
-            newSeg.putDictResPath(col, path);
-        }
-    }
-
-    private DictionaryInfo mergeDictionaries(DictionaryManager dictMgr, CubeSegment cubeSeg, List<DictionaryInfo> dicts, TblColRef col) throws IOException {
-        DictionaryInfo dictInfo = dictMgr.mergeDictionary(dicts);
-        if (dictInfo != null)
-            cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
-
-        return dictInfo;
-    }
-
-    /**
-     * make snapshots for the new segment by copying from one of the underlying
-     * merging segments. it's guaranteed to be consistent(checked in
-     * CubeSegmentValidator)
-     *
-     * @param cube
-     * @param newSeg
-     */
-    private void makeSnapshotForNewSegment(CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) {
-        CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1);
-        for (Map.Entry<String, String> entry : lastSeg.getSnapshots().entrySet()) {
-            newSeg.putSnapshotResPath(entry.getKey(), entry.getValue());
-        }
-    }
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setMergingSegmentIds(List<String> ids) {
-        setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
-    }
-
-    private List<String> getMergingSegmentIds() {
-        final String ids = getParam(MERGING_SEGMENT_IDS);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id: splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/UpdateCubeInfoAfterBuildStep.java b/job/src/main/java/org/apache/kylin/engine/mr/UpdateCubeInfoAfterBuildStep.java
deleted file mode 100644
index 8361f7c..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/UpdateCubeInfoAfterBuildStep.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.io.IOException;
-
-import org.apache.kylin.cube.CubeBuilder;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-/**
- */
-public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
-
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String CUBE_NAME = "cubeName";
-    private static final String CUBING_JOB_ID = "cubingJobId";
-
-    public UpdateCubeInfoAfterBuildStep() {
-        super();
-    }
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setCubingJobId(String id) {
-        setParam(CUBING_JOB_ID, id);
-    }
-
-    private String getCubingJobId() {
-        return getParam(CUBING_JOB_ID);
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
-        final CubeInstance cube = cubeManager.getCube(getCubeName());
-        final CubeSegment segment = cube.getSegmentById(getSegmentId());
-        
-        CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
-        long sourceCount = cubingJob.findSourceRecordCount();
-        long sourceSizeBytes = cubingJob.findSourceSizeBytes();
-        long cubeSizeBytes = cubingJob.findCubeSizeBytes();
-        boolean segmentReady = cubeSizeBytes > 0; // for build+merge scenario, convert HFile not happen yet, so cube size is 0
-
-        segment.setLastBuildJobID(getCubingJobId());
-        segment.setLastBuildTime(System.currentTimeMillis());
-        segment.setSizeKB(cubeSizeBytes / 1024);
-        segment.setInputRecords(sourceCount);
-        segment.setInputRecordsSize(sourceSizeBytes);
-
-        try {
-            if (segmentReady) {
-                cubeManager.promoteNewlyBuiltSegments(cube, segment);
-            } else {
-                CubeBuilder cubeBuilder = new CubeBuilder(cube);
-                cubeBuilder.setToUpdateSegs(segment);
-                cubeManager.updateCube(cubeBuilder);
-            }
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to update cube after build", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/UpdateCubeInfoAfterMergeStep.java b/job/src/main/java/org/apache/kylin/engine/mr/UpdateCubeInfoAfterMergeStep.java
deleted file mode 100644
index 7defec4..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/UpdateCubeInfoAfterMergeStep.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
-
-    private static final String CUBE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
-    private static final String CUBING_JOB_ID = "cubingJobId";
-
-    private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-    public UpdateCubeInfoAfterMergeStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final CubeInstance cube = cubeManager.getCube(getCubeName());
-
-        CubeSegment mergedSegment = cube.getSegmentById(getSegmentId());
-        if (mergedSegment == null) {
-            return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + getSegmentId());
-        }
-        
-        CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
-        long cubeSizeBytes = cubingJob.findCubeSizeBytes();
-
-        // collect source statistics
-        List<String> mergingSegmentIds = getMergingSegmentIds();
-        if (mergingSegmentIds.isEmpty()) {
-            return new ExecuteResult(ExecuteResult.State.FAILED, "there are no merging segments");
-        }
-        long sourceCount = 0L;
-        long sourceSize = 0L;
-        for (String id : mergingSegmentIds) {
-            CubeSegment segment = cube.getSegmentById(id);
-            sourceCount += segment.getInputRecords();
-            sourceSize += segment.getInputRecordsSize();
-        }
-
-        // update segment info
-        mergedSegment.setSizeKB(cubeSizeBytes / 1024);
-        mergedSegment.setInputRecords(sourceCount);
-        mergedSegment.setInputRecordsSize(sourceSize);
-        mergedSegment.setLastBuildJobID(getCubingJobId());
-        mergedSegment.setLastBuildTime(System.currentTimeMillis());
-
-        try {
-            cubeManager.promoteNewlyBuiltSegments(cube, mergedSegment);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED);
-        } catch (IOException e) {
-            logger.error("fail to update cube after merge", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setMergingSegmentIds(List<String> ids) {
-        setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
-    }
-
-    private List<String> getMergingSegmentIds() {
-        final String ids = getParam(MERGING_SEGMENT_IDS);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id : splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-    public void setCubingJobId(String id) {
-        setParam(CUBING_JOB_ID, id);
-    }
-
-    private String getCubingJobId() {
-        return getParam(CUBING_JOB_ID);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
new file mode 100644
index 0000000..13175ec
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class InMemCuboidJob extends AbstractHadoopJob {
+
+    protected static final Logger logger = LoggerFactory.getLogger(InMemCuboidJob.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_JOB_FLOW_ID);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_SEGMENT_NAME);
+            parseOptions(options, args);
+
+            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+            String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            CubeManager cubeMgr = CubeManager.getInstance(config);
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            logger.info("Starting: " + job.getJobName());
+            
+            setJobClasspath(job);
+            
+            // add metadata to distributed cache
+            attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+            // set job configuration
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+            long timeout = 1000*60*60L; // 1 hour
+            job.getConfiguration().set("mapred.task.timeout", String.valueOf(timeout));
+            
+            // set input
+            IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+            flatTableInputFormat.configureJob(job);
+            
+            // set mapper
+            job.setMapperClass(InMemCuboidMapper.class);
+            job.setMapOutputKeyClass(ByteArrayWritable.class);
+            job.setMapOutputValueClass(ByteArrayWritable.class);
+            
+            // set output
+            IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
+            storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
+
+            return waitForCompletion(job);
+        } catch (Exception e) {
+            logger.error("error in CuboidJob", e);
+            printUsage(options);
+            throw e;
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        InMemCuboidJob job = new InMemCuboidJob();
+        int exitCode = ToolRunner.run(job, args);
+        System.exit(exitCode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
new file mode 100644
index 0000000..5ecd26a
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -0,0 +1,122 @@
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.mr.KylinMapper;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.job.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Maps;
+
+/**
+ */
+public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArrayWritable, ByteArrayWritable> {
+
+    private static final Log logger = LogFactory.getLog(InMemCuboidMapper.class);
+    private CubeInstance cube;
+    private CubeDesc cubeDesc;
+    private CubeSegment cubeSegment;
+    private IMRTableInputFormat flatTableInputFormat;
+
+    private int counter;
+    private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(10000);
+    private Future<?> future;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+
+        Configuration conf = context.getConfiguration();
+
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+        cube = CubeManager.getInstance(config).getCube(cubeName);
+        cubeDesc = cube.getDescriptor();
+        String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
+        cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+        flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
+
+        Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
+
+        for (DimensionDesc dim : cubeDesc.getDimensions()) {
+            // dictionary
+            for (TblColRef col : dim.getColumnRefs()) {
+                if (cubeDesc.getRowkey().isUseDictionary(col)) {
+                    Dictionary<?> dict = cubeSegment.getDictionary(col);
+                    if (dict == null) {
+                        logger.warn("Dictionary for " + col + " was not found.");
+                    }
+
+                    dictionaryMap.put(col, cubeSegment.getDictionary(col));
+                }
+            }
+        }
+
+        DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
+
+    }
+
+    @Override
+    public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+        // put each row to the queue
+        String[] row = flatTableInputFormat.parseMapperInput(record);
+        List<String> rowAsList = Arrays.asList(row);
+        
+        while (!future.isDone()) {
+            if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
+                counter++;
+                if (counter % BatchConstants.COUNTER_MAX == 0) {
+                    logger.info("Handled " + counter + " records!");
+                }
+                break;
+            }
+        }
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+        logger.info("Totally handled " + counter + " records!");
+
+        while (!future.isDone()) {
+            if (queue.offer(new ArrayList<String>(0), 1, TimeUnit.SECONDS)) {
+                break;
+            }
+        }
+        
+        try {
+            future.get();
+        } catch (Exception e) {
+            throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
+        }
+        queue.clear();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
new file mode 100644
index 0000000..1b3e2b2
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -0,0 +1,95 @@
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.mr.KylinReducer;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.measure.MeasureAggregators;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArrayWritable, Object, Object> {
+
+    private static final Logger logger = LoggerFactory.getLogger(InMemCuboidReducer.class);
+
+    private IMRStorageOutputFormat storageOutputFormat;
+    private MeasureCodec codec;
+    private MeasureAggregators aggs;
+
+    private int counter;
+    private Object[] input;
+    private Object[] result;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        
+        String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+        String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
+        boolean isMerge = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_IS_MERGE));
+
+        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+        CubeDesc cubeDesc = cube.getDescriptor();
+        CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+        if (isMerge)
+            storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat();
+        else
+            storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
+
+        List<MeasureDesc> measuresDescs = Lists.newArrayList();
+        for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+                for (MeasureDesc measure : colDesc.getMeasures()) {
+                    measuresDescs.add(measure);
+                }
+            }
+        }
+
+        codec = new MeasureCodec(measuresDescs);
+        aggs = new MeasureAggregators(measuresDescs);
+
+        input = new Object[measuresDescs.size()];
+        result = new Object[measuresDescs.size()];
+    }
+
+    @Override
+    public void reduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException {
+
+        aggs.reset();
+
+        for (ByteArrayWritable value : values) {
+            codec.decode(value.asBuffer(), input);
+            aggs.aggregate(input);
+        }
+        aggs.collectStates(result);
+        
+        storageOutputFormat.doReducerOutput(key, result, context);
+        
+        counter++;
+        if (counter % BatchConstants.COUNTER_MAX == 0) {
+            logger.info("Handled " + counter + " records!");
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
new file mode 100644
index 0000000..f2a5fcf
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -0,0 +1,96 @@
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.job.inmemcubing.ICuboidWriter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+
+/**
+ */
+public class MapContextGTRecordWriter implements ICuboidWriter {
+
+    private static final Log logger = LogFactory.getLog(MapContextGTRecordWriter.class);
+    protected MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext;
+    private Long lastCuboidId;
+    protected CubeSegment cubeSegment;
+    protected CubeDesc cubeDesc;
+
+    private int bytesLength;
+    private int dimensions;
+    private int measureCount;
+    private byte[] keyBuf;
+    private int[] measureColumnsIndex;
+    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+    private ByteArrayWritable outputKey = new ByteArrayWritable();
+    private ByteArrayWritable outputValue = new ByteArrayWritable();
+    private long cuboidRowCount = 0;
+
+    public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
+        this.mapContext = mapContext;
+        this.cubeDesc = cubeDesc;
+        this.cubeSegment = cubeSegment;
+        this.measureCount = cubeDesc.getMeasures().size();
+
+    }
+
+    @Override
+    public void write(long cuboidId, GTRecord record) throws IOException {
+
+        if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
+            // output another cuboid
+            initVariables(cuboidId);
+            if (lastCuboidId != null) {
+                logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows");
+                cuboidRowCount = 0;
+            }
+        }
+
+        cuboidRowCount++;
+        int offSet = RowConstants.ROWKEY_CUBOIDID_LEN;
+        for (int x = 0; x < dimensions; x++) {
+            System.arraycopy(record.get(x).array(), record.get(x).offset(), keyBuf, offSet, record.get(x).length());
+            offSet += record.get(x).length();
+        }
+
+        //output measures
+        valueBuf.clear();
+        record.exportColumns(measureColumnsIndex, valueBuf);
+
+        outputKey.set(keyBuf, 0, offSet);
+        outputValue.set(valueBuf.array(), 0, valueBuf.position());
+        try {
+            mapContext.write(outputKey, outputValue);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void initVariables(Long cuboidId) {
+        bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN;
+        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
+        for (TblColRef column : cuboid.getColumns()) {
+            bytesLength += cubeSegment.getColumnLength(column);
+        }
+
+        keyBuf = new byte[bytesLength];
+        dimensions = BitSet.valueOf(new long[]{cuboidId}).cardinality();
+        measureColumnsIndex = new int[measureCount];
+        for (int i = 0; i < measureCount; i++) {
+            measureColumnsIndex[i] = dimensions + i;
+        }
+
+        System.arraycopy(Bytes.toBytes(cuboidId), 0, keyBuf, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
new file mode 100644
index 0000000..3d3b1f4
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.cube.CuboidJob;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+/**
+ * @author shaoshi
+ */
+public class MergeCuboidFromStorageJob extends CuboidJob {
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_JOB_FLOW_ID);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_SEGMENT_NAME);
+            parseOptions(options, args);
+
+            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+            String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            CubeManager cubeMgr = CubeManager.getInstance(config);
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+
+            Configuration conf = this.getConf();
+            HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+
+            // start job
+            String jobName = getOptionValue(OPTION_JOB_NAME);
+            System.out.println("Starting: " + jobName);
+            job = Job.getInstance(conf, jobName);
+
+            setJobClasspath(job);
+            
+            // add metadata to distributed cache
+            attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+            // set job configuration
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+            job.getConfiguration().set(BatchConstants.CFG_IS_MERGE, "true");
+
+            // configure mapper input
+            IMRStorageInputFormat storageInputFormat = MRUtil.getBatchMergeInputSide2(cubeSeg).getStorageInputFormat();
+            storageInputFormat.configureInput(MergeCuboidFromStorageMapper.class, ByteArrayWritable.class, ByteArrayWritable.class, job);
+
+            // configure reducer output
+            IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat();
+            storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
+            
+            return waitForCompletion(job);
+        } catch (Exception e) {
+            logger.error("error in MergeCuboidFromHBaseJob", e);
+            printUsage(options);
+            throw e;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
new file mode 100644
index 0000000..6535777
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.mr.KylinMapper;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * @author shaoshi
+ */
+public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, ByteArrayWritable, ByteArrayWritable> {
+
+    private static final Logger logger = LoggerFactory.getLogger(MergeCuboidFromStorageMapper.class);
+    
+    private KylinConfig config;
+    private String cubeName;
+    private String segmentName;
+    private CubeManager cubeManager;
+    private CubeInstance cube;
+    private CubeDesc cubeDesc;
+    private CubeSegment mergedCubeSegment;
+    private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle
+    private IMRStorageInputFormat storageInputFormat;
+
+    private ByteArrayWritable outputKey = new ByteArrayWritable();
+    private byte[] newKeyBuf;
+    private RowKeySplitter rowKeySplitter;
+
+    private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
+
+    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+    private MeasureCodec codec;
+    private ByteArrayWritable outputValue = new ByteArrayWritable();
+
+    private Boolean checkNeedMerging(TblColRef col) throws IOException {
+        Boolean ret = dictsNeedMerging.get(col);
+        if (ret != null)
+            return ret;
+        else {
+            ret = cubeDesc.getRowkey().isUseDictionary(col);
+            if (ret) {
+                String dictTable = (String) DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
+                ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
+            }
+            dictsNeedMerging.put(col, ret);
+            return ret;
+        }
+    }
+
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+        config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
+
+        cubeManager = CubeManager.getInstance(config);
+        cube = cubeManager.getCube(cubeName);
+        cubeDesc = cube.getDescriptor();
+        mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+        storageInputFormat = MRUtil.getBatchMergeInputSide2(mergedCubeSegment).getStorageInputFormat();
+
+        newKeyBuf = new byte[256];// size will auto-grow
+
+        sourceCubeSegment = storageInputFormat.findSourceSegment(context, cube);
+        logger.info(sourceCubeSegment.toString());
+
+        this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+
+        List<MeasureDesc> measuresDescs = Lists.newArrayList();
+        for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+                for (MeasureDesc measure : colDesc.getMeasures()) {
+                    measuresDescs.add(measure);
+                }
+            }
+        }
+        codec = new MeasureCodec(measuresDescs);
+    }
+
+    @Override
+    public void map(Object inKey, Object inValue, Context context) throws IOException, InterruptedException {
+        Pair<ByteArrayWritable, Object[]> pair = storageInputFormat.parseMapperInput(inKey, inValue);
+        ByteArrayWritable key = pair.getFirst();
+        Object[] value = pair.getSecond();
+        
+        Preconditions.checkState(key.offset() == 0);
+        
+        long cuboidID = rowKeySplitter.split(key.array(), key.length());
+        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
+
+        SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
+        int bufOffset = 0;
+        BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
+        bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
+
+        for (int i = 0; i < cuboid.getColumns().size(); ++i) {
+            TblColRef col = cuboid.getColumns().get(i);
+
+            if (this.checkNeedMerging(col)) {
+                // if dictionary on fact table column, needs rewrite
+                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+                Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
+                Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
+
+                while (sourceDict.getSizeOfValue() > newKeyBuf.length - bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) {
+                    byte[] oldBuf = newKeyBuf;
+                    newKeyBuf = new byte[2 * newKeyBuf.length];
+                    System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
+                }
+
+                int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value, 0, splittedByteses[i + 1].length);
+
+                int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset);
+                int idInMergedDict;
+                if (size < 0) {
+                    idInMergedDict = mergedDict.nullId();
+                } else {
+                    idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size);
+                }
+                BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId());
+
+                bufOffset += mergedDict.getSizeOfId();
+            } else {
+                // keep as it is
+                while (splittedByteses[i + 1].length > newKeyBuf.length - bufOffset) {
+                    byte[] oldBuf = newKeyBuf;
+                    newKeyBuf = new byte[2 * newKeyBuf.length];
+                    System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
+                }
+
+                System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, bufOffset, splittedByteses[i + 1].length);
+                bufOffset += splittedByteses[i + 1].length;
+            }
+        }
+        byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset);
+        outputKey.set(newKey, 0, newKey.length);
+
+        valueBuf.clear();
+        codec.encode(value, valueBuf);
+        outputValue.set(valueBuf.array(), 0, valueBuf.position());
+        
+        context.write(outputKey, outputValue);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
new file mode 100644
index 0000000..d99cb03
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.io.IOException;
+import java.util.*;
+
+public class MergeDictionaryStep extends AbstractExecutable {
+
+    private static final String CUBE_NAME = "cubeName";
+    private static final String SEGMENT_ID = "segmentId";
+    private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
+
+    public MergeDictionaryStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        KylinConfig conf = context.getConfig();
+        final CubeManager mgr = CubeManager.getInstance(conf);
+        final CubeInstance cube = mgr.getCube(getCubeName());
+        final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
+        final List<CubeSegment> mergingSegments = getMergingSegments(cube);
+        
+        Collections.sort(mergingSegments);
+        
+        try {
+            checkLookupSnapshotsMustIncremental(mergingSegments);
+            
+            makeDictForNewSegment(conf, cube, newSegment, mergingSegments);
+            makeSnapshotForNewSegment(cube, newSegment, mergingSegments);
+
+            CubeUpdate cubeBuilder = new CubeUpdate(cube);
+            cubeBuilder.setToUpdateSegs(newSegment);
+            mgr.updateCube(cubeBuilder);
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+        } catch (IOException e) {
+            logger.error("fail to merge dictionary or lookup snapshots", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        }
+    }
+    
+    private List<CubeSegment> getMergingSegments(CubeInstance cube) {
+        List<String> mergingSegmentIds = getMergingSegmentIds();
+        List<CubeSegment> result = Lists.newArrayListWithCapacity(mergingSegmentIds.size());
+        for (String id : mergingSegmentIds) {
+            result.add(cube.getSegmentById(id));
+        }
+        return result;
+    }
+
+    private void checkLookupSnapshotsMustIncremental(List<CubeSegment> mergingSegments) {
+
+        // FIXME check each newer snapshot has only NEW rows but no MODIFIED rows
+    }
+
+    /**
+     * For the new segment, we need to create dictionaries for it, too. For
+     * those dictionaries on fact table, create it by merging underlying
+     * dictionaries For those dictionaries on lookup table, just copy it from
+     * any one of the merging segments, it's guaranteed to be consistent(checked
+     * in CubeSegmentValidator)
+     *
+     * @param cube
+     * @param newSeg
+     * @throws IOException
+     */
+    private void makeDictForNewSegment(KylinConfig conf, CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) throws IOException {
+        HashSet<TblColRef> colsNeedMeringDict = new HashSet<TblColRef>();
+        HashSet<TblColRef> colsNeedCopyDict = new HashSet<TblColRef>();
+        DictionaryManager dictMgr = DictionaryManager.getInstance(conf);
+
+        CubeDesc cubeDesc = cube.getDescriptor();
+        for (DimensionDesc dim : cubeDesc.getDimensions()) {
+            for (TblColRef col : dim.getColumnRefs()) {
+                if (newSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
+                    String dictTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
+                    if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
+                        colsNeedMeringDict.add(col);
+                    } else {
+                        colsNeedCopyDict.add(col);
+                    }
+                }
+            }
+        }
+
+        for (TblColRef col : colsNeedMeringDict) {
+            logger.info("Merging fact table dictionary on : " + col);
+            List<DictionaryInfo> dictInfos = new ArrayList<DictionaryInfo>();
+            for (CubeSegment segment : mergingSegments) {
+                logger.info("Including fact table dictionary of segment : " + segment);
+                if (segment.getDictResPath(col) != null) {
+                    DictionaryInfo dictInfo = dictMgr.getDictionaryInfo(segment.getDictResPath(col));
+                    dictInfos.add(dictInfo);
+                }
+            }
+            mergeDictionaries(dictMgr, newSeg, dictInfos, col);
+        }
+
+        for (TblColRef col : colsNeedCopyDict) {
+            String path = mergingSegments.get(0).getDictResPath(col);
+            newSeg.putDictResPath(col, path);
+        }
+    }
+
+    private DictionaryInfo mergeDictionaries(DictionaryManager dictMgr, CubeSegment cubeSeg, List<DictionaryInfo> dicts, TblColRef col) throws IOException {
+        DictionaryInfo dictInfo = dictMgr.mergeDictionary(dicts);
+        if (dictInfo != null)
+            cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
+
+        return dictInfo;
+    }
+
+    /**
+     * make snapshots for the new segment by copying from one of the underlying
+     * merging segments. it's guaranteed to be consistent(checked in
+     * CubeSegmentValidator)
+     *
+     * @param cube
+     * @param newSeg
+     */
+    private void makeSnapshotForNewSegment(CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) {
+        CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1);
+        for (Map.Entry<String, String> entry : lastSeg.getSnapshots().entrySet()) {
+            newSeg.putSnapshotResPath(entry.getKey(), entry.getValue());
+        }
+    }
+
+    public void setCubeName(String cubeName) {
+        this.setParam(CUBE_NAME, cubeName);
+    }
+
+    private String getCubeName() {
+        return getParam(CUBE_NAME);
+    }
+
+    public void setSegmentId(String segmentId) {
+        this.setParam(SEGMENT_ID, segmentId);
+    }
+
+    private String getSegmentId() {
+        return getParam(SEGMENT_ID);
+    }
+
+    public void setMergingSegmentIds(List<String> ids) {
+        setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
+    }
+
+    private List<String> getMergingSegmentIds() {
+        final String ids = getParam(MERGING_SEGMENT_IDS);
+        if (ids != null) {
+            final String[] splitted = StringUtils.split(ids, ",");
+            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+            for (String id: splitted) {
+                result.add(id);
+            }
+            return result;
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
new file mode 100644
index 0000000..9a7f761
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class MergeStatisticsStep extends AbstractExecutable {
+
+    private static final String CUBE_NAME = "cubeName";
+    private static final String SEGMENT_ID = "segmentId";
+    private static final String MERGING_SEGMENT_IS = "mergingSegmentIds";
+    private static final String MERGED_STATISTICS_PATH = "mergedStatisticsPath";
+    protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap();
+
+    public MergeStatisticsStep() {
+        super();
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        KylinConfig kylinConf = context.getConfig();
+        final CubeManager mgr = CubeManager.getInstance(kylinConf);
+        final CubeInstance cube = mgr.getCube(getCubeName());
+        final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
+
+        Configuration conf = new Configuration();
+        ResourceStore rs = ResourceStore.getStore(kylinConf);
+        try {
+
+            int averageSamplingPercentage = 0;
+            for (String segmentId : this.getMergingSegmentIds()) {
+                String fileKey = CubeSegment.getStatisticsResourcePath(getCubeName(), segmentId);
+                InputStream is = rs.getResource(fileKey);
+                File tempFile = null;
+                FileOutputStream tempFileStream = null;
+                try {
+                    tempFile = File.createTempFile(segmentId, ".seq");
+                    tempFileStream = new FileOutputStream(tempFile);
+                    org.apache.commons.io.IOUtils.copy(is, tempFileStream);
+                } finally {
+                    IOUtils.closeStream(is);
+                    IOUtils.closeStream(tempFileStream);
+                }
+
+                FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
+                SequenceFile.Reader reader = null;
+                try {
+                    reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
+                    LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                    BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+                    while (reader.next(key, value)) {
+                        if (key.get() == 0l) {
+                            // sampling percentage;
+                            averageSamplingPercentage += Bytes.toInt(value.getBytes());
+                        } else {
+                            HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
+                            ByteArray byteArray = new ByteArray(value.getBytes());
+                            hll.readRegisters(byteArray.asBuffer());
+
+                            if (cuboidHLLMap.get(key.get()) != null) {
+                                cuboidHLLMap.get(key.get()).merge(hll);
+                            } else {
+                                cuboidHLLMap.put(key.get(), hll);
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    throw e;
+                } finally {
+                    IOUtils.closeStream(reader);
+                }
+            }
+            averageSamplingPercentage = averageSamplingPercentage / this.getMergingSegmentIds().size();
+            FactDistinctColumnsReducer.writeCuboidStatistics(conf, new Path(getMergedStatisticsPath()), cuboidHLLMap, averageSamplingPercentage);
+            Path statisticsFilePath = new Path(getMergedStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+            FileSystem fs = statisticsFilePath.getFileSystem(conf);
+            FSDataInputStream is = fs.open(statisticsFilePath);
+            try {
+                // put the statistics to metadata store
+                String statisticsFileName = newSegment.getStatisticsResourcePath();
+                rs.putResource(statisticsFileName, is, System.currentTimeMillis());
+            } finally {
+                IOUtils.closeStream(is);
+            }
+
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+        } catch (IOException e) {
+            logger.error("fail to merge cuboid statistics", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        }
+    }
+
+
+    public void setCubeName(String cubeName) {
+        this.setParam(CUBE_NAME, cubeName);
+    }
+
+    private String getCubeName() {
+        return getParam(CUBE_NAME);
+    }
+
+    public void setSegmentId(String segmentId) {
+        this.setParam(SEGMENT_ID, segmentId);
+    }
+
+    private String getSegmentId() {
+        return getParam(SEGMENT_ID);
+    }
+
+    public void setMergingSegmentIds(List<String> ids) {
+        setParam(MERGING_SEGMENT_IS, StringUtils.join(ids, ","));
+    }
+
+    private List<String> getMergingSegmentIds() {
+        final String ids = getParam(MERGING_SEGMENT_IS);
+        if (ids != null) {
+            final String[] splitted = StringUtils.split(ids, ",");
+            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+            for (String id : splitted) {
+                result.add(id);
+            }
+            return result;
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+    public void setMergedStatisticsPath(String path) {
+        setParam(MERGED_STATISTICS_PATH, path);
+    }
+
+    private String getMergedStatisticsPath() {
+        return getParam(MERGED_STATISTICS_PATH);
+    }
+}