You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/21 12:53:15 UTC

[1/2] kylin git commit: KYLIN-1245 Put layer cubing and in-mem cubing side by side, random switch between them

Repository: kylin
Updated Branches:
  refs/heads/2.x-staging 2ef13d08e -> 3fc3883a4


http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index ac304f5..4c2737d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -18,53 +18,12 @@
 
 package org.apache.kylin.storage.hbase.steps;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.mapreduce.TableMapper;
-import org.apache.hadoop.hbase.mapreduce.TableSplit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.IMROutput2;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.steps.MergeCuboidMapper;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-
 /**
  * This "Transition" impl generates cuboid files and then convert to HFile.
  * The additional step slows down build process, but the gains is merge
@@ -77,6 +36,7 @@ import com.google.common.collect.Lists;
  */
 public class HBaseMROutput2Transition implements IMROutput2 {
 
+    @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(HBaseMROutput2Transition.class);
 
     @Override
@@ -85,19 +45,12 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             HBaseMRSteps steps = new HBaseMRSteps(seg);
 
             @Override
-            public IMRStorageOutputFormat getStorageOutputFormat() {
-                return new HBaseOutputFormat(seg);
-            }
-
-            @Override
             public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
                 jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId()));
             }
 
             @Override
-            public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
-                String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId());
-
+            public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath) {
                 jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId()));
                 jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
             }
@@ -110,34 +63,17 @@ public class HBaseMROutput2Transition implements IMROutput2 {
     }
 
     @Override
-    public IMRBatchMergeInputSide2 getBatchMergeInputSide(final CubeSegment seg) {
-        return new IMRBatchMergeInputSide2() {
-            @Override
-            public IMRStorageInputFormat getStorageInputFormat() {
-                return new HBaseInputFormat(seg);
-            }
-        };
-    }
-
-    @Override
     public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(final CubeSegment seg) {
         return new IMRBatchMergeOutputSide2() {
             HBaseMRSteps steps = new HBaseMRSteps(seg);
 
             @Override
-            public IMRStorageOutputFormat getStorageOutputFormat() {
-                return new HBaseOutputFormat(seg);
-            }
-
-            @Override
             public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
                 jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId()));
             }
 
             @Override
-            public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow) {
-                String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId());
-
+            public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath) {
                 jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId()));
                 jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
             }
@@ -148,263 +84,4 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             }
         };
     }
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    private static class HBaseInputFormat implements IMRStorageInputFormat {
-        final CubeSegment seg;
-
-        public HBaseInputFormat(CubeSegment seg) {
-            this.seg = seg;
-        }
-
-        @Override
-        public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException {
-            // merge by cuboid files
-            if (isMergeFromCuboidFiles(job.getConfiguration())) {
-                logger.info("Merge from cuboid files for " + seg);
-                
-                job.setInputFormatClass(SequenceFileInputFormat.class);
-                addCuboidInputDirs(job);
-                
-                job.setMapperClass(mapperClz);
-                job.setMapOutputKeyClass(outputKeyClz);
-                job.setMapOutputValueClass(outputValueClz);
-            }
-            // merge from HTable scan
-            else {
-                logger.info("Merge from HTables for " + seg);
-                
-                Configuration conf = job.getConfiguration();
-                HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
-
-                List<Scan> scans = new ArrayList<Scan>();
-                for (String htable : new HBaseMRSteps(seg).getMergingHTables()) {
-                    Scan scan = new Scan();
-                    scan.setCaching(512); // 1 is the default in Scan, which will be bad for MapReduce jobs
-                    scan.setCacheBlocks(false); // don't set to true for MR jobs
-                    scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
-                    scans.add(scan);
-                }
-                TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapperClz, outputKeyClz, outputValueClz, job);
-                TableMapReduceUtil.initCredentials(job);
-            }
-        }
-
-        private void addCuboidInputDirs(Job job) throws IOException {
-            List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
-            HBaseMRSteps steps = new HBaseMRSteps(seg);
-            
-            String[] inputs = new String[mergingSegments.size()];
-            for (int i = 0; i < mergingSegments.size(); i++) {
-                CubeSegment mergingSeg = mergingSegments.get(i);
-                String cuboidPath = steps.getCuboidRootPath(mergingSeg);
-                inputs[i] = cuboidPath + (cuboidPath.endsWith("/") ? "" : "/") + "*";
-            }
-            
-            AbstractHadoopJob.addInputDirs(inputs, job);
-        }
-
-        @Override
-        public CubeSegment findSourceSegment(Context context) throws IOException {
-            // merge by cuboid files
-            if (isMergeFromCuboidFiles(context.getConfiguration())) {
-                FileSplit fileSplit = (FileSplit) context.getInputSplit();
-                return MergeCuboidMapper.findSourceSegment(fileSplit, seg.getCubeInstance());
-            }
-            // merge from HTable scan
-            else {
-                TableSplit currentSplit = (TableSplit) context.getInputSplit();
-                byte[] tableName = currentSplit.getTableName();
-                String htableName = Bytes.toString(tableName);
-
-                // decide which source segment
-                for (CubeSegment segment : seg.getCubeInstance().getSegments()) {
-                    String segmentHtable = segment.getStorageLocationIdentifier();
-                    if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
-                        return segment;
-                    }
-                }
-                throw new IllegalStateException("No merging segment's storage location identifier equals " + htableName);
-            }
-        }
-
-        transient ByteArrayWritable parsedKey;
-        transient Object[] parsedValue;
-        transient Pair<ByteArrayWritable, Object[]> parsedPair;
-        
-        transient MeasureCodec measureCodec;
-        transient RowValueDecoder[] rowValueDecoders;
-
-        @Override
-        public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue) {
-            // lazy init
-            if (parsedPair == null) {
-                parsedKey = new ByteArrayWritable();
-                parsedValue = new Object[seg.getCubeDesc().getMeasures().size()];
-                parsedPair = Pair.newPair(parsedKey, parsedValue);
-            }
-            
-            // merge by cuboid files
-            if (isMergeFromCuboidFiles(null)) {
-                return parseMapperInputFromCuboidFile(inKey, inValue);
-            }
-            // merge from HTable scan
-            else {
-                return parseMapperInputFromHTable(inKey, inValue);
-            }
-        }
-
-        private Pair<ByteArrayWritable, Object[]> parseMapperInputFromCuboidFile(Object inKey, Object inValue) {
-            // lazy init
-            if (measureCodec == null) {
-                measureCodec = new MeasureCodec(seg.getCubeDesc().getMeasures());
-            }
-            
-            Text key = (Text) inKey;
-            parsedKey.set(key.getBytes(), 0, key.getLength());
-            
-            Text value = (Text) inValue;
-            measureCodec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), parsedValue);
-            
-            return parsedPair;
-        }
-
-        private Pair<ByteArrayWritable, Object[]> parseMapperInputFromHTable(Object inKey, Object inValue) {
-            // lazy init
-            if (rowValueDecoders == null) {
-                List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
-                List<MeasureDesc> measuresDescs = Lists.newArrayList();
-                for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHbaseMapping().getColumnFamily()) {
-                    for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                        valueDecoderList.add(new RowValueDecoder(colDesc));
-                        for (MeasureDesc measure : colDesc.getMeasures()) {
-                            measuresDescs.add(measure);
-                        }
-                    }
-                }
-                rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]);
-            }
-
-            ImmutableBytesWritable key = (ImmutableBytesWritable) inKey;
-            parsedKey.set(key.get(), key.getOffset(), key.getLength());
-
-            Result hbaseRow = (Result) inValue;
-            for (int i = 0; i < rowValueDecoders.length; i++) {
-                rowValueDecoders[i].decode(hbaseRow);
-                rowValueDecoders[i].loadCubeMeasureArray(parsedValue);
-            }
-
-            return parsedPair;
-        }
-
-        transient Boolean isMergeFromCuboidFiles;
-
-        // merge from cuboid files is better than merge from HTable, because no workload on HBase region server
-        private boolean isMergeFromCuboidFiles(Configuration jobConf) {
-            // cache in this object?
-            if (isMergeFromCuboidFiles != null)
-                return isMergeFromCuboidFiles.booleanValue();
-
-            final String confKey = "kylin.isMergeFromCuboidFiles";
-
-            // cache in job configuration?
-            if (jobConf != null) {
-                String result = jobConf.get(confKey);
-                if (result != null) {
-                    isMergeFromCuboidFiles = Boolean.valueOf(result);
-                    return isMergeFromCuboidFiles.booleanValue();
-                }
-            }
-
-            boolean result = true;
-
-            try {
-                List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
-                HBaseMRSteps steps = new HBaseMRSteps(seg);
-                for (CubeSegment mergingSeg : mergingSegments) {
-                    String cuboidRootPath = steps.getCuboidRootPath(mergingSeg);
-                    FileSystem fs = HadoopUtil.getFileSystem(cuboidRootPath);
-
-                    boolean cuboidFileExist = fs.exists(new Path(cuboidRootPath));
-                    if (cuboidFileExist == false) {
-                        logger.info("Merge from HTable because " + cuboidRootPath + " does not exist");
-                        result = false;
-                        break;
-                    }
-                }
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-
-            // put in cache
-            isMergeFromCuboidFiles = Boolean.valueOf(result);
-            if (jobConf != null) {
-                jobConf.set(confKey, "" + result);
-            }
-            return result;
-        }
-    }
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    private static class HBaseOutputFormat implements IMRStorageOutputFormat {
-        final CubeSegment seg;
-
-        Text outputKey;
-        Text outputValue;
-        ByteBuffer valueBuf;
-        MeasureCodec codec;
-
-        public HBaseOutputFormat(CubeSegment seg) {
-            this.seg = seg;
-        }
-
-        @Override
-        public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException {
-            job.setReducerClass(reducer);
-
-            // estimate the reducer number
-            String htableName = seg.getStorageLocationIdentifier();
-            Configuration conf = HBaseConfiguration.create(job.getConfiguration());
-            HTable htable = new HTable(conf, htableName);
-            int regions = htable.getStartKeys().length + 1;
-            htable.close();
-            
-            int reducerNum = regions * 3;
-            reducerNum = Math.max(1, reducerNum);
-            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-            reducerNum = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), reducerNum);
-
-            job.setNumReduceTasks(reducerNum);
-
-            // the cuboid file and KV class must be compatible with 0.7 version for smooth upgrade
-            job.setOutputFormatClass(SequenceFileOutputFormat.class);
-            job.setOutputKeyClass(Text.class);
-            job.setOutputValueClass(Text.class);
-
-            Path output = new Path(new HBaseMRSteps(seg).getCuboidRootPath(jobFlowId));
-            FileOutputFormat.setOutputPath(job, output);
-
-            HadoopUtil.deletePath(job.getConfiguration(), output);
-        }
-
-        @Override
-        public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException {
-            // lazy init
-            if (outputKey == null) {
-                outputKey = new Text();
-                outputValue = new Text();
-                valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-                codec = new MeasureCodec(seg.getCubeDesc().getMeasures());
-            }
-
-            outputKey.set(key.array(), key.offset(), key.length());
-
-            valueBuf.clear();
-            codec.encode(value, valueBuf);
-            outputValue.set(valueBuf.array(), 0, valueBuf.position());
-
-            context.write(outputKey, outputValue);
-        }
-    }
-
-}
+}
\ No newline at end of file


[2/2] kylin git commit: KYLIN-1245 Put layer cubing and in-mem cubing side by side, random switch between them

Posted by li...@apache.org.
KYLIN-1245 Put layer cubing and in-mem cubing side by side, random switch between them


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3fc3883a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3fc3883a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3fc3883a

Branch: refs/heads/2.x-staging
Commit: 3fc3883a41bfab899c31b300db4c0757bd5589d3
Parents: 2ef13d0
Author: Yang Li <li...@apache.org>
Authored: Mon Dec 21 19:52:31 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Dec 21 19:52:31 2015 +0800

----------------------------------------------------------------------
 .../kylin/engine/mr/BatchCubingJobBuilder2.java |  97 +++++-
 .../kylin/engine/mr/BatchMergeJobBuilder2.java  |  35 +-
 .../org/apache/kylin/engine/mr/IMROutput2.java  | 104 +++---
 .../java/org/apache/kylin/engine/mr/MRUtil.java |   6 -
 .../engine/mr/common/AbstractHadoopJob.java     |   1 -
 .../kylin/engine/mr/steps/InMemCuboidJob.java   | 201 ++++++++++-
 .../engine/mr/steps/InMemCuboidReducer.java     |  28 +-
 .../mr/steps/MergeCuboidFromStorageJob.java     |  94 ------
 .../mr/steps/MergeCuboidFromStorageMapper.java  | 239 -------------
 .../apache/kylin/engine/spark/SparkCubing.java  |   4 +-
 .../storage/hbase/steps/CreateHTableJob.java    | 143 +-------
 .../storage/hbase/steps/HBaseMROutput2.java     | 290 ----------------
 .../hbase/steps/HBaseMROutput2Transition.java   | 331 +------------------
 13 files changed, 378 insertions(+), 1195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index f8fbc33..476a763 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -18,11 +18,16 @@
 
 package org.apache.kylin.engine.mr;
 
+import java.util.Random;
+
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.RowKeyDesc;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
 import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
+import org.apache.kylin.engine.mr.steps.NDCuboidJob;
 import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.slf4j.Logger;
@@ -37,14 +42,15 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
     public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
         super(newSegment, submitter);
         this.inputSide = MRUtil.getBatchCubingInputSide(seg);
-        this.outputSide = MRUtil.getBatchCubingOutputSide2((CubeSegment)seg);
+        this.outputSide = MRUtil.getBatchCubingOutputSide2((CubeSegment) seg);
     }
 
     public CubingJob build() {
         logger.info("MR_V2 new job to BUILD segment " + seg);
-        
-        final CubingJob result = CubingJob.createBuildJob((CubeSegment)seg, submitter, config);
+
+        final CubingJob result = CubingJob.createBuildJob((CubeSegment) seg, submitter, config);
         final String jobId = result.getId();
+        final String cuboidRootPath = getCuboidRootPath(jobId);
 
         // Phase 1: Create Flat Table
         inputSide.addStepPhase1_CreateFlatTable(result);
@@ -56,8 +62,14 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         outputSide.addStepPhase2_BuildDictionary(result);
 
         // Phase 3: Build Cube
-        result.addTask(createInMemCubingStep(jobId));
-        outputSide.addStepPhase3_BuildCube(result);
+        if (new Random().nextBoolean()) {
+            // layer cubing
+            addLayerCubingSteps(result, jobId, cuboidRootPath);
+        } else {
+            // or in-mem cubing
+            result.addTask(createInMemCubingStep(jobId, cuboidRootPath));
+        }
+        outputSide.addStepPhase3_BuildCube(result, cuboidRootPath);
 
         // Phase 4: Update Metadata & Cleanup
         result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
@@ -67,6 +79,20 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         return result;
     }
 
+    private void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
+        RowKeyDesc rowKeyDesc = ((CubeSegment) seg).getCubeDesc().getRowkey();
+        final int groupRowkeyColumnsCount = ((CubeSegment) seg).getCubeDesc().getBuildLevel();
+        final int totalRowkeyColumnsCount = rowKeyDesc.getRowKeyColumns().length;
+        final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
+        // base cuboid step
+        result.addTask(createBaseCuboidStep(cuboidOutputTempPath, jobId));
+        // n dim cuboid steps
+        for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
+            int dimNum = totalRowkeyColumnsCount - i;
+            result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
+        }
+    }
+
     private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
         SaveStatisticsStep result = new SaveStatisticsStep();
         result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
@@ -76,19 +102,19 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         return result;
     }
 
-    private MapReduceExecutable createInMemCubingStep(String jobId) {
+    private MapReduceExecutable createInMemCubingStep(String jobId, String cuboidRootPath) {
         // base cuboid job
         MapReduceExecutable cubeStep = new MapReduceExecutable();
 
         StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
+        appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
 
         cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
 
         appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "output", cuboidRootPath);
         appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "jobflowid", jobId);
 
         cubeStep.setMapReduceParams(cmd.toString());
         cubeStep.setMapReduceJobClass(InMemCuboidJob.class);
@@ -96,4 +122,59 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         return cubeStep;
     }
 
+    private MapReduceExecutable createBaseCuboidStep(String[] cuboidOutputTempPath, String jobId) {
+        // base cuboid job
+        MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
+
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
+
+        baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
+
+        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "input", "FLAT_TABLE"); // marks flat table input
+        appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
+        appendExecCmdParameters(cmd, "level", "0");
+
+        baseCuboidStep.setMapReduceParams(cmd.toString());
+        baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
+        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
+        return baseCuboidStep;
+    }
+
+    private MapReduceExecutable createNDimensionCuboidStep(String[] cuboidOutputTempPath, int dimNum, int totalRowkeyColumnCount) {
+        // ND cuboid job
+        MapReduceExecutable ndCuboidStep = new MapReduceExecutable();
+
+        ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension");
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
+        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
+        appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
+        appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
+        appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum));
+
+        ndCuboidStep.setMapReduceParams(cmd.toString());
+        ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class);
+        return ndCuboidStep;
+    }
+
+    private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
+        String[] paths = new String[groupRowkeyColumnsCount + 1];
+        for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
+            int dimNum = totalRowkeyColumnCount - i;
+            if (dimNum == totalRowkeyColumnCount) {
+                paths[i] = cuboidRootPath + "base_cuboid";
+            } else {
+                paths[i] = cuboidRootPath + dimNum + "d_cuboid";
+            }
+        }
+        return paths;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index 48a717f..008d489 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -23,7 +23,7 @@ import java.util.List;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.steps.MergeCuboidFromStorageJob;
+import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
 import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.slf4j.Logger;
@@ -39,23 +39,24 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
 
     public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
         super(mergeSegment, submitter);
-        this.outputSide = MRUtil.getBatchMergeOutputSide2((CubeSegment)seg);
+        this.outputSide = MRUtil.getBatchMergeOutputSide2((CubeSegment) seg);
     }
 
     public CubingJob build() {
         logger.info("MR_V2 new job to MERGE segment " + seg);
-        
-        final CubeSegment cubeSegment = (CubeSegment)seg;
+
+        final CubeSegment cubeSegment = (CubeSegment) seg;
         final CubingJob result = CubingJob.createMergeJob(cubeSegment, submitter, config);
         final String jobId = result.getId();
+        final String cuboidRootPath = getCuboidRootPath(jobId);
 
         final List<CubeSegment> mergingSegments = cubeSegment.getCubeInstance().getMergingSegments(cubeSegment);
         Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
         final List<String> mergingSegmentIds = Lists.newArrayList();
-        final List<String> mergingHTables = Lists.newArrayList();
+        final List<String> mergingCuboidPaths = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingSegmentIds.add(merging.getUuid());
-            mergingHTables.add(merging.getStorageLocationIdentifier());
+            mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
         }
 
         // Phase 1: Merge Dictionary
@@ -63,10 +64,10 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
         result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
         outputSide.addStepPhase1_MergeDictionary(result);
 
-        // Phase 2: Merge Cube
-        String formattedTables = StringUtil.join(mergingHTables, ",");
-        result.addTask(createMergeCuboidDataFromStorageStep(formattedTables, jobId));
-        outputSide.addStepPhase2_BuildCube(result);
+        // Phase 2: Merge Cube Files
+        String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
+        result.addTask(createMergeCuboidDataStep(cubeSegment, formattedPath, cuboidRootPath));
+        outputSide.addStepPhase2_BuildCube(result, cuboidRootPath);
 
         // Phase 3: Update Metadata & Cleanup
         result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
@@ -85,20 +86,20 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
         return result;
     }
 
-    private MapReduceExecutable createMergeCuboidDataFromStorageStep(String inputTableNames, String jobId) {
+    private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) {
         MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
         mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
         StringBuilder cmd = new StringBuilder();
 
-        appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
-        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
+        appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getRealization().getName() + "_Step");
-        appendExecCmdParameters(cmd, "jobflowid", jobId);
+        appendExecCmdParameters(cmd, "input", inputPath);
+        appendExecCmdParameters(cmd, "output", outputPath);
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
 
         mergeCuboidDataStep.setMapReduceParams(cmd.toString());
-        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromStorageJob.class);
-        mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class);
         return mergeCuboidDataStep;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
index 3ad51c5..844eb07 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -1,13 +1,23 @@
-package org.apache.kylin.engine.mr;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
 
-import java.io.IOException;
+package org.apache.kylin.engine.mr;
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
@@ -17,93 +27,59 @@ public interface IMROutput2 {
     public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg);
 
     /**
-     * Participate the batch cubing flow as the output side.
+     * Participate the batch cubing flow as the output side. Responsible for saving
+     * the cuboid output to storage at the end of Phase 3.
      * 
      * - Phase 1: Create Flat Table
      * - Phase 2: Build Dictionary
-     * - Phase 3: Build Cube (with StorageOutputFormat)
+     * - Phase 3: Build Cube
      * - Phase 4: Update Metadata & Cleanup
      */
     public interface IMRBatchCubingOutputSide2 {
 
-        /** Return an output format for Phase 3: Build Cube MR */
-        public IMRStorageOutputFormat getStorageOutputFormat();
-
         /** Add step that executes after build dictionary and before build cube. */
         public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow);
 
-        /** Add step that executes after build cube. */
-        public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
-
-        /** Add step that does any necessary clean up. */
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
-    }
-
-    public IMRBatchMergeInputSide2 getBatchMergeInputSide(CubeSegment seg);
-
-    public interface IMRBatchMergeInputSide2 {
-        public IMRStorageInputFormat getStorageInputFormat();
-    }
-
-    /** Read in a cube as input of merge. Configure the input file format of mapper. */
-    @SuppressWarnings("rawtypes")
-    public interface IMRStorageInputFormat {
-
-        /** Configure MR mapper class and input file format. */
-        public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException;
-
-        /** Given a mapper context, figure out which segment the mapper reads from. */
-        public CubeSegment findSourceSegment(Mapper.Context context) throws IOException;
-
         /**
-         * Read in a row of cuboid. Given the input KV, de-serialize back cuboid ID, dimensions, and measures.
+         * Add step that saves cuboids from HDFS to storage.
          * 
-         * @return <code>ByteArrayWritable</code> is the cuboid ID (8 bytes) + dimension values in dictionary encoding
-         *         <code>Object[]</code> is the measure values in order of <code>CubeDesc.getMeasures()</code>
+         * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn, 
+         * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+         * dictionary encoding; Mx is measure value serialization form.
          */
-        public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue);
+        public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
+
+        /** Add step that does any necessary clean up. */
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
 
     /** Return a helper to participate in batch merge job flow. */
     public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(CubeSegment seg);
 
     /**
-     * Participate the batch merge flow as the output side.
+     * Participate the batch cubing flow as the output side. Responsible for saving
+     * the cuboid output to storage at the end of Phase 2.
      * 
      * - Phase 1: Merge Dictionary
-     * - Phase 2: Merge Cube (with StorageInputFormat & StorageOutputFormat)
+     * - Phase 2: Merge Cube
      * - Phase 3: Update Metadata & Cleanup
      */
     public interface IMRBatchMergeOutputSide2 {
 
-        /** Return an input format for Phase 2: Merge Cube MR */
-        public IMRStorageOutputFormat getStorageOutputFormat();
-
         /** Add step that executes after merge dictionary and before merge cube. */
         public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
 
-        /** Add step that executes after merge cube. */
-        public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow);
+        /**
+         * Add step that saves cuboid output from HDFS to storage.
+         * 
+         * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn, 
+         * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+         * dictionary encoding; Mx is measure value serialization form.
+         */
+        public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
 
         /** Add step that does any necessary clean up. */
         public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
     }
 
-    /** Write out a cube. Configure the output file format of reducer and do the actual K-V output. */
-    @SuppressWarnings("rawtypes")
-    public interface IMRStorageOutputFormat {
-        
-        /** Configure MR reducer class and output file format. */
-        public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException;
-
-        /**
-         * Write out a row of cuboid. Given the cuboid ID, dimensions, and measures, serialize in whatever
-         * way and output to reducer context.
-         * 
-         * @param key     The cuboid ID (8 bytes) + dimension values in dictionary encoding
-         * @param value   The measure values in order of <code>CubeDesc.getMeasures()</code>
-         * @param context The reducer context output goes to
-         */
-        public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 55fa9e2..41c8b6b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -7,12 +7,10 @@ import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
 import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
-import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeInputSide2;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.IRealizationSegment;
 import org.apache.kylin.source.SourceFactory;
 import org.apache.kylin.storage.StorageFactory;
@@ -47,10 +45,6 @@ public class MRUtil {
         return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchCubingOutputSide(seg);
     }
 
-    public static IMRBatchMergeInputSide2 getBatchMergeInputSide2(CubeSegment seg) {
-        return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeInputSide(seg);
-    }
-
     public static IMRBatchMergeOutputSide2 getBatchMergeOutputSide2(CubeSegment seg) {
         return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index f031f76..21ceff7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -72,7 +72,6 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     protected static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class);
 
     protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname");
-    protected static final Option OPTION_JOB_FLOW_ID = OptionBuilder.withArgName("job flow ID").hasArg().isRequired(true).withDescription("job flow ID").create("jobflowid");
     protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename");
     protected static final Option OPTION_II_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create("iiname");
     protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname");

http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 8f2b810..95eb725 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -18,23 +18,57 @@
 
 package org.apache.kylin.engine.mr.steps;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
 import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 /**
  */
 public class InMemCuboidJob extends AbstractHadoopJob {
@@ -47,13 +81,14 @@ public class InMemCuboidJob extends AbstractHadoopJob {
 
         try {
             options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_JOB_FLOW_ID);
             options.addOption(OPTION_CUBE_NAME);
             options.addOption(OPTION_SEGMENT_NAME);
+            options.addOption(OPTION_OUTPUT_PATH);
             parseOptions(options, args);
 
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
             String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+            String output = getOptionValue(OPTION_OUTPUT_PATH);
 
             KylinConfig config = KylinConfig.getInstanceFromEnv();
             CubeManager cubeMgr = CubeManager.getInstance(config);
@@ -84,9 +119,19 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             job.setMapOutputValueClass(ByteArrayWritable.class);
 
             // set output
-            IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
-            storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
+            job.setReducerClass(InMemCuboidReducer.class);
+            job.setNumReduceTasks(calculateReducerNum(cubeSeg));
+
+            // the cuboid file and KV class must be compatible with 0.7 version for smooth upgrade
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(Text.class);
+
+            Path outputPath = new Path(output);
+            FileOutputFormat.setOutputPath(job, outputPath);
 
+            HadoopUtil.deletePath(job.getConfiguration(), outputPath);
+            
             return waitForCompletion(job);
         } catch (Exception e) {
             logger.error("error in CuboidJob", e);
@@ -98,6 +143,154 @@ public class InMemCuboidJob extends AbstractHadoopJob {
         }
     }
 
+    private int calculateReducerNum(CubeSegment cubeSeg) throws IOException {
+        Configuration jobConf = job.getConfiguration();
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        
+        Map<Long, Double> cubeSizeMap = getCubeSizeMapFromCuboidStatistics(cubeSeg, kylinConfig, jobConf);
+        double totalSizeInM = 0;
+        for (Double cuboidSize : cubeSizeMap.values()) {
+            totalSizeInM += cuboidSize;
+        }
+        
+        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+        
+        // number of reduce tasks
+        int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB);
+
+        // at least 1 reducer
+        numReduceTasks = Math.max(1, numReduceTasks);
+        // no more than 5000 reducer by default
+        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+        logger.info("Having total map input MB " + Math.round(totalSizeInM));
+        logger.info("Having per reduce MB " + perReduceInputMB);
+        logger.info("Setting " + "mapred.reduce.tasks" + "=" + numReduceTasks);
+        return numReduceTasks;
+    }
+
+    public static Map<Long, Long> getCubeRowCountMapFromCuboidStatistics(CubeSegment cubeSegment, KylinConfig kylinConfig, Configuration conf) throws IOException {
+        ResourceStore rs = ResourceStore.getStore(kylinConfig);
+        String fileKey = cubeSegment.getStatisticsResourcePath();
+        InputStream is = rs.getResource(fileKey).inputStream;
+        File tempFile = null;
+        FileOutputStream tempFileStream = null;
+        try {
+            tempFile = File.createTempFile(cubeSegment.getUuid(), ".seq");
+            tempFileStream = new FileOutputStream(tempFile);
+            org.apache.commons.io.IOUtils.copy(is, tempFileStream);
+        } finally {
+            IOUtils.closeStream(is);
+            IOUtils.closeStream(tempFileStream);
+        }
+        Map<Long, HyperLogLogPlusCounter> counterMap = Maps.newHashMap();
+
+        FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
+        int samplingPercentage = 25;
+        SequenceFile.Reader reader = null;
+        try {
+            reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
+            LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+            BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+            while (reader.next(key, value)) {
+                if (key.get() == 0L) {
+                    samplingPercentage = Bytes.toInt(value.getBytes());
+                } else {
+                    HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
+                    ByteArray byteArray = new ByteArray(value.getBytes());
+                    hll.readRegisters(byteArray.asBuffer());
+                    counterMap.put(key.get(), hll);
+                }
+
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        } finally {
+            IOUtils.closeStream(reader);
+            tempFile.delete();
+        }
+        return getCubeRowCountMapFromCuboidStatistics(counterMap, samplingPercentage);
+    }
+    
+    public static Map<Long, Long> getCubeRowCountMapFromCuboidStatistics(Map<Long, HyperLogLogPlusCounter> counterMap, final int samplingPercentage) throws IOException {
+        Preconditions.checkArgument(samplingPercentage > 0);
+        return Maps.transformValues(counterMap, new Function<HyperLogLogPlusCounter, Long>() {
+            @Nullable
+            @Override
+            public Long apply(HyperLogLogPlusCounter input) {
+                return input.getCountEstimate() * 100 / samplingPercentage;
+            }
+        });
+    }
+
+    // return map of Cuboid ID => MB
+    public static Map<Long, Double> getCubeSizeMapFromCuboidStatistics(CubeSegment cubeSegment, KylinConfig kylinConfig, Configuration conf) throws IOException {
+        Map<Long, Long> rowCountMap = getCubeRowCountMapFromCuboidStatistics(cubeSegment, kylinConfig, conf);
+        Map<Long, Double> sizeMap = getCubeSizeMapFromRowCount(cubeSegment, rowCountMap);
+        return sizeMap;
+    }
+
+    public static Map<Long, Double> getCubeSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap) {
+        final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+        final List<Integer> rowkeyColumnSize = Lists.newArrayList();
+        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        final Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+        final List<TblColRef> columnList = baseCuboid.getColumns();
+
+        for (int i = 0; i < columnList.size(); i++) {
+            rowkeyColumnSize.add(cubeSegment.getColumnLength(columnList.get(i)));
+        }
+
+        Map<Long, Double> sizeMap = Maps.newHashMap();
+        for (Map.Entry<Long, Long> entry : rowCountMap.entrySet()) {
+            sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(), baseCuboidId, rowkeyColumnSize));
+        }
+        return sizeMap;
+    }
+
+    /**
+     * Estimate the cuboid's size
+     *
+     * @return the cuboid size in M bytes
+     */
+    private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) {
+
+        int bytesLength = cubeSegment.getRowKeyPreambleSize();
+
+        long mask = Long.highestOneBit(baseCuboidId);
+        long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId);
+        for (int i = 0; i < parentCuboidIdActualLength; i++) {
+            if ((mask & cuboidId) > 0) {
+                bytesLength += rowKeyColumnLength.get(i); //colIO.getColumnLength(columnList.get(i));
+            }
+            mask = mask >> 1;
+        }
+
+        // add the measure length
+        int space = 0;
+        boolean isMemoryHungry = false;
+        for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) {
+            if (measureDesc.getFunction().getMeasureType().isMemoryHungry()) {
+                isMemoryHungry = true;
+            }
+            DataType returnType = measureDesc.getFunction().getReturnDataType();
+            space += returnType.getStorageBytesEstimate();
+        }
+        bytesLength += space;
+
+        double ret = 1.0 * bytesLength * rowCount / (1024L * 1024L);
+        if (isMemoryHungry) {
+            logger.info("Cube is memory hungry, storage size estimation multiply 0.05");
+            ret *= 0.05;
+        } else {
+            logger.info("Cube is not memory hungry, storage size estimation multiply 0.25");
+            ret *= 0.25;
+        }
+        logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + bytesLength + " bytes." + " Total size is " + ret + "M.");
+        return ret;
+    }
+
     public static void main(String[] args) throws Exception {
         InMemCuboidJob job = new InMemCuboidJob();
         int exitCode = ToolRunner.run(job, args);

http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index c35e77f..9beacbb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -1,17 +1,18 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
 import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.measure.MeasureAggregators;
@@ -27,13 +28,16 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
 
     private static final Logger logger = LoggerFactory.getLogger(InMemCuboidReducer.class);
 
-    private IMRStorageOutputFormat storageOutputFormat;
     private MeasureCodec codec;
     private MeasureAggregators aggs;
 
     private int counter;
     private Object[] input;
     private Object[] result;
+    
+    private Text outputKey;
+    private Text outputValue;
+    private ByteBuffer valueBuf;
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -47,16 +51,16 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
         CubeDesc cubeDesc = cube.getDescriptor();
         CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-        if (isMerge)
-            storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat();
-        else
-            storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
 
         List<MeasureDesc> measuresDescs = cubeDesc.getMeasures();
         codec = new MeasureCodec(measuresDescs);
         aggs = new MeasureAggregators(measuresDescs);
         input = new Object[measuresDescs.size()];
         result = new Object[measuresDescs.size()];
+        
+        outputKey = new Text();
+        outputValue = new Text();
+        valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
     }
 
     @Override
@@ -70,8 +74,16 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
         }
         aggs.collectStates(result);
 
-        storageOutputFormat.doReducerOutput(key, result, context);
+        // output key
+        outputKey.set(key.array(), key.offset(), key.length());
 
+        // output value
+        valueBuf.clear();
+        codec.encode(result, valueBuf);
+        outputValue.set(valueBuf.array(), 0, valueBuf.position());
+
+        context.write(outputKey, outputValue);
+        
         counter++;
         if (counter % BatchConstants.COUNTER_MAX == 0) {
             logger.info("Handled " + counter + " records!");

http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
deleted file mode 100644
index 4485d17..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-/**
- */
-public class MergeCuboidFromStorageJob extends CuboidJob {
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_JOB_FLOW_ID);
-            options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_SEGMENT_NAME);
-            parseOptions(options, args);
-
-            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-            String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-            CubeManager cubeMgr = CubeManager.getInstance(config);
-            CubeInstance cube = cubeMgr.getCube(cubeName);
-            CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-
-            Configuration conf = this.getConf();
-
-            // start job
-            String jobName = getOptionValue(OPTION_JOB_NAME);
-            System.out.println("Starting: " + jobName);
-            job = Job.getInstance(conf, jobName);
-
-            setJobClasspath(job);
-
-            // add metadata to distributed cache
-            attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
-            // set job configuration
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
-            job.getConfiguration().set(BatchConstants.CFG_IS_MERGE, "true");
-
-            // configure mapper input
-            IMRStorageInputFormat storageInputFormat = MRUtil.getBatchMergeInputSide2(cubeSeg).getStorageInputFormat();
-            storageInputFormat.configureInput(MergeCuboidFromStorageMapper.class, ByteArrayWritable.class, ByteArrayWritable.class, job);
-
-            // configure reducer output
-            IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat();
-            storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
-
-            return waitForCompletion(job);
-        } catch (Exception e) {
-            logger.error("error in MergeCuboidFromHBaseJob", e);
-            printUsage(options);
-            throw e;
-        } finally {
-            if (job != null)
-                cleanupTempConfFile(job.getConfiguration());
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
deleted file mode 100644
index 18bce34..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.common.RowKeySplitter;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.kv.RowKeyEncoder;
-import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.measure.MeasureCodec;
-import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * @author shaoshi
- */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, ByteArrayWritable, ByteArrayWritable> {
-
-    private static final Logger logger = LoggerFactory.getLogger(MergeCuboidFromStorageMapper.class);
-
-    private KylinConfig config;
-    private String cubeName;
-    private String segmentName;
-    private CubeManager cubeManager;
-    private CubeInstance cube;
-    private CubeDesc cubeDesc;
-    private CubeSegment mergedCubeSegment;
-    private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle
-    private IMRStorageInputFormat storageInputFormat;
-
-    private ByteArrayWritable outputKey = new ByteArrayWritable();
-    private byte[] newKeyBodyBuf;
-    private ByteArray newKeyBuf;
-    private RowKeySplitter rowKeySplitter;
-    private RowKeyEncoderProvider rowKeyEncoderProvider;
-
-    private HashMap<TblColRef, Boolean> dimensionsNeedDict = new HashMap<TblColRef, Boolean>();
-
-    private List<MeasureDesc> measureDescs;
-    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-    private MeasureCodec codec;
-    private ByteArrayWritable outputValue = new ByteArrayWritable();
-
-    private List<Pair<Integer, MeasureIngester>> dictMeasures;
-    private Map<TblColRef, Dictionary<String>> oldDicts;
-    private Map<TblColRef, Dictionary<String>> newDicts;
-
-    @Override
-    protected void setup(Context context) throws IOException, InterruptedException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-        config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
-
-        cubeManager = CubeManager.getInstance(config);
-        cube = cubeManager.getCube(cubeName);
-        cubeDesc = cube.getDescriptor();
-        mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-        storageInputFormat = MRUtil.getBatchMergeInputSide2(mergedCubeSegment).getStorageInputFormat();
-
-        newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE]; // size will auto-grow
-        newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
-
-        sourceCubeSegment = storageInputFormat.findSourceSegment(context);
-        logger.info("Source cube segment: " + sourceCubeSegment);
-
-        rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
-        rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
-
-        measureDescs = cubeDesc.getMeasures();
-        codec = new MeasureCodec(measureDescs);
-
-        dictMeasures = Lists.newArrayList();
-        for (int i = 0; i < measureDescs.size(); i++) {
-            MeasureDesc measureDesc = measureDescs.get(i);
-            MeasureType measureType = measureDesc.getFunction().getMeasureType();
-            if (measureType.getColumnsNeedDictionary(measureDesc.getFunction()).isEmpty() == false) {
-                dictMeasures.add(Pair.newPair(i, measureType.newIngester()));
-            }
-        }
-        if (dictMeasures.size() > 0) {
-            oldDicts = sourceCubeSegment.buildDictionaryMap();
-            newDicts = mergedCubeSegment.buildDictionaryMap();
-        }
-    }
-
-    @Override
-    public void map(Object inKey, Object inValue, Context context) throws IOException, InterruptedException {
-        Pair<ByteArrayWritable, Object[]> pair = storageInputFormat.parseMapperInput(inKey, inValue);
-        ByteArrayWritable key = pair.getFirst();
-        Object[] value = pair.getSecond();
-
-        Preconditions.checkState(key.offset() == 0);
-
-        long cuboidID = rowKeySplitter.split(key.array());
-        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
-        RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
-
-        SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
-        int bufOffset = 0;
-        int bodySplitOffset = rowKeySplitter.getBodySplitOffset();
-
-        for (int i = 0; i < cuboid.getColumns().size(); ++i) {
-            int useSplit = i + bodySplitOffset;
-            TblColRef col = cuboid.getColumns().get(i);
-
-            if (this.checkNeedMerging(col)) {
-                // if dictionary on fact table column, needs rewrite
-                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-                Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
-                Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
-
-                while (sourceDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || //
-                        mergedDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || //
-                        mergedDict.getSizeOfId() > newKeyBodyBuf.length - bufOffset) {
-                    //also use this buf to hold value before translating
-                    byte[] oldBuf = newKeyBodyBuf;
-                    newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
-                }
-
-                int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0, splittedByteses[useSplit].length);
-                int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
-
-                int idInMergedDict;
-                if (size < 0) {
-                    idInMergedDict = mergedDict.nullId();
-                } else {
-                    idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
-                }
-                BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-
-                bufOffset += mergedDict.getSizeOfId();
-            } else {
-                // keep as it is
-                while (splittedByteses[useSplit].length > newKeyBodyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBodyBuf;
-                    newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
-                }
-
-                System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset, splittedByteses[useSplit].length);
-                bufOffset += splittedByteses[useSplit].length;
-            }
-        }
-
-        int fullKeySize = rowkeyEncoder.getBytesLength();
-        while (newKeyBuf.array().length < fullKeySize) {
-            newKeyBuf.set(new byte[newKeyBuf.length() * 2]);
-        }
-        newKeyBuf.set(0, fullKeySize);
-
-        rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf);
-        outputKey.set(newKeyBuf.array(), 0, fullKeySize);
-
-        // re-encode measures if dictionary is used
-        if (dictMeasures.size() > 0) {
-            reEncodeMeasure(value);
-        }
-
-        valueBuf.clear();
-        codec.encode(value, valueBuf);
-        outputValue.set(valueBuf.array(), 0, valueBuf.position());
-
-        context.write(outputKey, outputValue);
-    }
-
-    private Boolean checkNeedMerging(TblColRef col) throws IOException {
-        Boolean ret = dimensionsNeedDict.get(col);
-        if (ret != null)
-            return ret;
-        else {
-            ret = cubeDesc.getRowkey().isUseDictionary(col);
-            if (ret) {
-                String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().isUseDictionary(col), col).getTable();
-                ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
-            }
-            dimensionsNeedDict.put(col, ret);
-            return ret;
-        }
-    }
-
-    private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException {
-        for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
-            int i = pair.getFirst();
-            MeasureIngester ingester = pair.getSecond();
-            measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index be81c2b..59a19d3 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -56,6 +56,7 @@ import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.*;
 import org.apache.kylin.cube.util.CubingUtils;
 import org.apache.kylin.dict.*;
+import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
 import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter;
 import org.apache.kylin.engine.spark.cube.DefaultTupleConverter;
 import org.apache.kylin.engine.spark.util.IteratorUtils;
@@ -453,7 +454,8 @@ public class SparkCubing extends AbstractApplication {
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
         final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
-        final Map<Long, Long> cubeSizeMap = CreateHTableJob.getCubeRowCountMapFromCuboidStatistics(samplingResult, 100);
+        final Map<Long, Long> rowCountMap = InMemCuboidJob.getCubeRowCountMapFromCuboidStatistics(samplingResult, 100);
+        final Map<Long, Double> cubeSizeMap = InMemCuboidJob.getCubeSizeMapFromRowCount(cubeSegment, rowCountMap);
         System.out.println("cube size estimation:" + cubeSizeMap);
         final byte[][] splitKeys = CreateHTableJob.getSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig, cubeSegment);
         CubeHTableUtil.createHTable(cubeSegment, splitKeys);

http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 4fac4fc..85c9200 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -18,26 +18,19 @@
 
 package org.apache.kylin.storage.hbase.steps;
 
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import javax.annotation.Nullable;
-
 import org.apache.commons.cli.Options;
 import org.apache.commons.math3.primes.Primes;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -45,31 +38,22 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.CuboidShardUtil;
-import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
 import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -110,7 +94,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         try {
             byte[][] splitKeys;
             if (statsEnabled) {
-                final Map<Long, Long> cuboidSizeMap = getCubeRowCountMapFromCuboidStatistics(cubeSegment, kylinConfig, conf);
+                final Map<Long, Double> cuboidSizeMap = InMemCuboidJob.getCubeSizeMapFromCuboidStatistics(cubeSegment, kylinConfig, conf);
                 splitKeys = getSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment);
             } else {
                 splitKeys = getSplits(conf, partitionFilePath);
@@ -160,50 +144,6 @@ public class CreateHTableJob extends AbstractHadoopJob {
         return retValue.length == 0 ? null : retValue;
     }
 
-    public static Map<Long, Long> getCubeRowCountMapFromCuboidStatistics(CubeSegment cubeSegment, KylinConfig kylinConfig, Configuration conf) throws IOException {
-        ResourceStore rs = ResourceStore.getStore(kylinConfig);
-        String fileKey = cubeSegment.getStatisticsResourcePath();
-        InputStream is = rs.getResource(fileKey).inputStream;
-        File tempFile = null;
-        FileOutputStream tempFileStream = null;
-        try {
-            tempFile = File.createTempFile(cubeSegment.getUuid(), ".seq");
-            tempFileStream = new FileOutputStream(tempFile);
-            org.apache.commons.io.IOUtils.copy(is, tempFileStream);
-        } finally {
-            IOUtils.closeStream(is);
-            IOUtils.closeStream(tempFileStream);
-        }
-        Map<Long, HyperLogLogPlusCounter> counterMap = Maps.newHashMap();
-
-        FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
-        int samplingPercentage = 25;
-        SequenceFile.Reader reader = null;
-        try {
-            reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
-            LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-            BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-            while (reader.next(key, value)) {
-                if (key.get() == 0L) {
-                    samplingPercentage = Bytes.toInt(value.getBytes());
-                } else {
-                    HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
-                    ByteArray byteArray = new ByteArray(value.getBytes());
-                    hll.readRegisters(byteArray.asBuffer());
-                    counterMap.put(key.get(), hll);
-                }
-
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw e;
-        } finally {
-            IOUtils.closeStream(reader);
-            tempFile.delete();
-        }
-        return getCubeRowCountMapFromCuboidStatistics(counterMap, samplingPercentage);
-    }
-
     //one region for one shard
     private static byte[][] getSplitsByRegionCount(int regionCount) {
         byte[][] result = new byte[regionCount - 1][];
@@ -215,51 +155,24 @@ public class CreateHTableJob extends AbstractHadoopJob {
         return result;
     }
 
-    public static Map<Long, Long> getCubeRowCountMapFromCuboidStatistics(Map<Long, HyperLogLogPlusCounter> counterMap, final int samplingPercentage) throws IOException {
-        Preconditions.checkArgument(samplingPercentage > 0);
-        return Maps.transformValues(counterMap, new Function<HyperLogLogPlusCounter, Long>() {
-            @Nullable
-            @Override
-            public Long apply(HyperLogLogPlusCounter input) {
-                return input.getCountEstimate() * 100 / samplingPercentage;
-            }
-        });
-    }
-
-    public static byte[][] getSplitsFromCuboidStatistics(final Map<Long, Long> cubeRowCountMap, KylinConfig kylinConfig, CubeSegment cubeSegment) throws IOException {
+    public static byte[][] getSplitsFromCuboidStatistics(final Map<Long, Double> cubeSizeMap, KylinConfig kylinConfig, CubeSegment cubeSegment) throws IOException {
 
         final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
 
-        final List<Integer> rowkeyColumnSize = Lists.newArrayList();
-        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-        List<TblColRef> columnList = baseCuboid.getColumns();
-
-        for (int i = 0; i < columnList.size(); i++) {
-            logger.info("Rowkey column " + i + " length " + cubeSegment.getColumnLength(columnList.get(i)));
-            rowkeyColumnSize.add(cubeSegment.getColumnLength(columnList.get(i)));
-        }
-
         DataModelDesc.RealizationCapacity cubeCapacity = cubeDesc.getModel().getCapacity();
         int cut = kylinConfig.getHBaseRegionCut(cubeCapacity.toString());
 
         logger.info("Cube capacity " + cubeCapacity.toString() + ", chosen cut for HTable is " + cut + "GB");
 
         double totalSizeInM = 0;
-
-        List<Long> allCuboids = Lists.newArrayList();
-        allCuboids.addAll(cubeRowCountMap.keySet());
-        Collections.sort(allCuboids);
-
-        Map<Long, Double> cubeSizeMap = Maps.newHashMap();
-        for (Map.Entry<Long, Long> entry : cubeRowCountMap.entrySet()) {
-            cubeSizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(), baseCuboidId, rowkeyColumnSize));
-        }
-
         for (Double cuboidSize : cubeSizeMap.values()) {
             totalSizeInM += cuboidSize;
         }
 
+        List<Long> allCuboids = Lists.newArrayList();
+        allCuboids.addAll(cubeSizeMap.keySet());
+        Collections.sort(allCuboids);
+
         int nRegion = Math.round((float) (totalSizeInM / (cut * 1024L)));
         nRegion = Math.max(kylinConfig.getHBaseRegionCountMin(), nRegion);
         nRegion = Math.min(kylinConfig.getHBaseRegionCountMax(), nRegion);
@@ -350,48 +263,6 @@ public class CreateHTableJob extends AbstractHadoopJob {
         }
     }
 
-    /**
-     * Estimate the cuboid's size
-     *
-     * @return the cuboid size in M bytes
-     */
-    private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) {
-
-        int bytesLength = cubeSegment.getRowKeyPreambleSize();
-
-        long mask = Long.highestOneBit(baseCuboidId);
-        long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId);
-        for (int i = 0; i < parentCuboidIdActualLength; i++) {
-            if ((mask & cuboidId) > 0) {
-                bytesLength += rowKeyColumnLength.get(i); //colIO.getColumnLength(columnList.get(i));
-            }
-            mask = mask >> 1;
-        }
-
-        // add the measure length
-        int space = 0;
-        boolean isMemoryHungry = false;
-        for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) {
-            if (measureDesc.getFunction().getMeasureType().isMemoryHungry()) {
-                isMemoryHungry = true;
-            }
-            DataType returnType = measureDesc.getFunction().getReturnDataType();
-            space += returnType.getStorageBytesEstimate();
-        }
-        bytesLength += space;
-
-        double ret = 1.0 * bytesLength * rowCount / (1024L * 1024L);
-        if (isMemoryHungry) {
-            logger.info("Cube is memory hungry, storage size estimation multiply 0.05");
-            ret *= 0.05;
-        } else {
-            logger.info("Cube is not memory hungry, storage size estimation multiply 0.25");
-            ret *= 0.25;
-        }
-        logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + bytesLength + " bytes." + " Total size is " + ret + "M.");
-        return ret;
-    }
-
     public static void main(String[] args) throws Exception {
         int exitCode = ToolRunner.run(new CreateHTableJob(), args);
         System.exit(exitCode);

http://git-wip-us.apache.org/repos/asf/kylin/blob/3fc3883a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
deleted file mode 100644
index 397f4fe..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.mapreduce.TableMapper;
-import org.apache.hadoop.hbase.mapreduce.TableSplit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.IMROutput2;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
-import com.google.common.collect.Lists;
-
-public class HBaseMROutput2 implements IMROutput2 {
-
-    @Override
-    public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) {
-        return new IMRBatchCubingOutputSide2() {
-            HBaseMRSteps steps = new HBaseMRSteps(seg);
-
-            @Override
-            public IMRStorageOutputFormat getStorageOutputFormat() {
-                return new HBaseOutputFormat(seg);
-            }
-
-            @Override
-            public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
-                jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId()));
-            }
-
-            @Override
-            public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
-                jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
-            }
-
-            @Override
-            public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-                // nothing to do
-            }
-        };
-    }
-
-    @Override
-    public IMRBatchMergeInputSide2 getBatchMergeInputSide(final CubeSegment seg) {
-        return new IMRBatchMergeInputSide2() {
-            @Override
-            public IMRStorageInputFormat getStorageInputFormat() {
-                return new HBaseInputFormat(seg);
-            }
-        };
-    }
-
-    @Override
-    public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(final CubeSegment seg) {
-        return new IMRBatchMergeOutputSide2() {
-            HBaseMRSteps steps = new HBaseMRSteps(seg);
-
-            @Override
-            public IMRStorageOutputFormat getStorageOutputFormat() {
-                return new HBaseOutputFormat(seg);
-            }
-
-            @Override
-            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
-                jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId()));
-            }
-
-            @Override
-            public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow) {
-                jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
-            }
-
-            @Override
-            public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
-                jobFlow.addTask(steps.createMergeGCStep());
-            }
-        };
-    }
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    private static class HBaseInputFormat implements IMRStorageInputFormat {
-        final CubeSegment seg;
-
-        final RowValueDecoder[] rowValueDecoders;
-        final ByteArrayWritable parsedKey;
-        final Object[] parsedValue;
-        final Pair<ByteArrayWritable, Object[]> parsedPair;
-
-        public HBaseInputFormat(CubeSegment seg) {
-            this.seg = seg;
-
-            List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
-            List<MeasureDesc> measuresDescs = Lists.newArrayList();
-            for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHbaseMapping().getColumnFamily()) {
-                for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                    valueDecoderList.add(new RowValueDecoder(colDesc));
-                    for (MeasureDesc measure : colDesc.getMeasures()) {
-                        measuresDescs.add(measure);
-                    }
-                }
-            }
-            this.rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]);
-
-            this.parsedKey = new ByteArrayWritable();
-            this.parsedValue = new Object[measuresDescs.size()];
-            this.parsedPair = Pair.newPair(parsedKey, parsedValue);
-        }
-
-        @Override
-        public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException {
-            Configuration conf = job.getConfiguration();
-            HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
-
-            List<Scan> scans = new ArrayList<Scan>();
-            for (String htable : new HBaseMRSteps(seg).getMergingHTables()) {
-                Scan scan = new Scan();
-                scan.setCaching(512); // 1 is the default in Scan, which will be bad for MapReduce jobs
-                scan.setCacheBlocks(false); // don't set to true for MR jobs
-                scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
-                scans.add(scan);
-            }
-
-            TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapperClz, outputKeyClz, outputValueClz, job);
-            TableMapReduceUtil.initCredentials(job);
-
-        }
-
-        @Override
-        public CubeSegment findSourceSegment(Context context) throws IOException {
-            TableSplit currentSplit = (TableSplit) context.getInputSplit();
-            byte[] tableName = currentSplit.getTableName();
-            String htableName = Bytes.toString(tableName);
-
-            // decide which source segment
-            for (CubeSegment segment : seg.getCubeInstance().getSegments()) {
-                String segmentHtable = segment.getStorageLocationIdentifier();
-                if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
-                    return segment;
-                }
-            }
-            throw new IllegalStateException("No merging segment's storage location identifier equals " + htableName);
-        }
-
-        @Override
-        public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue) {
-            ImmutableBytesWritable key = (ImmutableBytesWritable) inKey;
-            parsedKey.set(key.get(), key.getOffset(), key.getLength());
-
-            Result hbaseRow = (Result) inValue;
-            for (int i = 0; i < rowValueDecoders.length; i++) {
-                rowValueDecoders[i].decode(hbaseRow);
-                rowValueDecoders[i].loadCubeMeasureArray(parsedValue);
-            }
-
-            return parsedPair;
-        }
-    }
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    private static class HBaseOutputFormat implements IMRStorageOutputFormat {
-        final CubeSegment seg;
-
-        final List<KeyValueCreator> keyValueCreators = Lists.newArrayList();
-        final ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
-
-        public HBaseOutputFormat(CubeSegment seg) {
-            this.seg = seg;
-        }
-
-        @Override
-        public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException {
-            Path hfilePath = new Path(new HBaseMRSteps(seg).getHFilePath(jobFlowId));
-            FileOutputFormat.setOutputPath(job, hfilePath);
-
-            String htableName = seg.getStorageLocationIdentifier();
-            Configuration conf = HBaseConfiguration.create(job.getConfiguration());
-            HTable htable = new HTable(conf, htableName);
-            HFileOutputFormat.configureIncrementalLoad(job, htable);
-
-            // set Reducer; This need be after configureIncrementalLoad, to overwrite the default reducer class
-            job.setReducerClass(reducer);
-
-            // kylin uses ByteArrayWritable instead of ImmutableBytesWritable as mapper output key
-            rewriteTotalOrderPartitionerFile(job);
-
-            HadoopUtil.deletePath(job.getConfiguration(), hfilePath);
-        }
-
-        @Override
-        public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException {
-            if (keyValueCreators.size() == 0) {
-                for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHbaseMapping().getColumnFamily()) {
-                    for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                        keyValueCreators.add(new KeyValueCreator(seg.getCubeDesc(), colDesc));
-                    }
-                }
-            }
-
-            outputKey.set(key.array(), key.offset(), key.length());
-
-            KeyValue outputValue;
-            for (int i = 0; i < keyValueCreators.size(); i++) {
-                outputValue = keyValueCreators.get(i).create(key.array(), key.offset(), key.length(), value);
-                context.write(outputKey, outputValue);
-            }
-        }
-
-        private void rewriteTotalOrderPartitionerFile(Job job) throws IOException {
-            Configuration conf = job.getConfiguration();
-            String partitionsFile = TotalOrderPartitioner.getPartitionFile(conf);
-            if (StringUtils.isBlank(partitionsFile))
-                throw new IllegalStateException("HFileOutputFormat.configureIncrementalLoad don't configure TotalOrderPartitioner any more?");
-
-            Path partitionsPath = new Path(partitionsFile);
-
-            // read in partition file in ImmutableBytesWritable
-            List<ByteArrayWritable> keys = Lists.newArrayList();
-            Reader reader = new SequenceFile.Reader(conf, Reader.file(partitionsPath));
-            try {
-                ImmutableBytesWritable key = new ImmutableBytesWritable();
-                while (reader.next(key, NullWritable.get())) {
-                    keys.add(new ByteArrayWritable(key.copyBytes()));
-                }
-            } finally {
-                reader.close();
-            }
-
-            // write out again in ByteArrayWritable
-            Writer writer = SequenceFile.createWriter(conf, Writer.file(partitionsPath), Writer.keyClass(ByteArrayWritable.class), Writer.valueClass(NullWritable.class));
-            try {
-                for (ByteArrayWritable key : keys) {
-                    writer.append(key, NullWritable.get());
-                }
-            } finally {
-                writer.close();
-            }
-        }
-
-    }
-
-}