You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/27 11:21:39 UTC

[37/52] [abbrv] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java
new file mode 100644
index 0000000..e7dfb9a
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.engine.mr.KylinReducer;
+
+/**
+ * @author yangli9
+ */
+public class FactDistinctColumnsCombiner extends KylinReducer<LongWritable, Text, LongWritable, Text> {
+
+    private Text outputValue = new Text();
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+    }
+
+    @Override
+    public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+
+        if(key.get() >= 0) {
+            HashSet<ByteArray> set = new HashSet<ByteArray>();
+            for (Text textValue : values) {
+                ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
+                set.add(value);
+            }
+
+            for (ByteArray value : set) {
+                outputValue.set(value.array(), value.offset(), value.length());
+                context.write(key, outputValue);
+            }
+        } else {
+            // for hll, each key only has one output, no need to do local combine;
+            outputValue.set(values.iterator().next().getBytes());
+            context.write(key, outputValue);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
new file mode 100644
index 0000000..b51eb15
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class FactDistinctColumnsJob extends AbstractHadoopJob {
+    protected static final Logger log = LoggerFactory.getLogger(FactDistinctColumnsJob.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_SEGMENT_NAME);
+            options.addOption(OPTION_STATISTICS_ENABLED);
+            options.addOption(OPTION_STATISTICS_OUTPUT);
+            options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT);
+            parseOptions(options, args);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            String cubeName = getOptionValue(OPTION_CUBE_NAME);
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            
+            String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+            String statistics_enabled = getOptionValue(OPTION_STATISTICS_ENABLED);
+            String statistics_output = getOptionValue(OPTION_STATISTICS_OUTPUT);
+            String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT);
+
+            // ----------------------------------------------------------------------------
+            // add metadata to distributed cache
+            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+            job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
+            job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output);
+            job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
+            log.info("Starting: " + job.getJobName());
+
+            setJobClasspath(job);
+
+            setupMapper(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
+            setupReducer(output);
+
+            // CubeSegment seg = cubeMgr.getCube(cubeName).getTheOnlySegment();
+            attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+            return waitForCompletion(job);
+
+        } catch (Exception e) {
+            logger.error("error in FactDistinctColumnsJob", e);
+            printUsage(options);
+            throw e;
+        }
+
+    }
+
+    private void setupMapper(CubeSegment cubeSeg) throws IOException {
+        IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+        flatTableInputFormat.configureJob(job);
+
+        job.setMapperClass(FactDistinctHiveColumnsMapper.class);
+        job.setCombinerClass(FactDistinctColumnsCombiner.class);
+        job.setMapOutputKeyClass(LongWritable.class);
+        job.setMapOutputValueClass(Text.class);
+    }
+
+    private void setupReducer(Path output) throws IOException {
+        job.setReducerClass(FactDistinctColumnsReducer.class);
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputValueClass(Text.class);
+
+        FileOutputFormat.setOutputPath(job, output);
+        job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+
+        job.setNumReduceTasks(1);
+
+        deletePath(job.getConfiguration(), output);
+    }
+
+
+    public static void main(String[] args) throws Exception {
+        FactDistinctColumnsJob job = new FactDistinctColumnsJob();
+        int exitCode = ToolRunner.run(job, args);
+        System.exit(exitCode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
new file mode 100644
index 0000000..592c45d
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -0,0 +1,89 @@
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.RowKeyDesc;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ */
+public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, LongWritable, Text> {
+
+    protected String cubeName;
+    protected CubeInstance cube;
+    protected CubeSegment cubeSeg;
+    protected CubeDesc cubeDesc;
+    protected long baseCuboidId;
+    protected List<TblColRef> columns;
+    protected ArrayList<Integer> factDictCols;
+    protected IMRTableInputFormat flatTableInputFormat;
+
+    protected LongWritable outputKey = new LongWritable();
+    protected Text outputValue = new Text();
+    protected int errorRecordCounter = 0;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        Configuration conf = context.getConfiguration();
+        bindCurrentConfiguration(conf);
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+        cube = CubeManager.getInstance(config).getCube(cubeName);
+        cubeSeg = cube.getSegment(conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME), SegmentStatusEnum.NEW);
+        cubeDesc = cube.getDescriptor();
+        baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        columns = Cuboid.findById(cubeDesc, baseCuboidId).getColumns();
+
+        factDictCols = new ArrayList<Integer>();
+        RowKeyDesc rowKey = cubeDesc.getRowkey();
+        DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+        for (int i = 0; i < columns.size(); i++) {
+            TblColRef col = columns.get(i);
+            if (!rowKey.isUseDictionary(col))
+                continue;
+
+            String scanTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
+            if (cubeDesc.getModel().isFactTable(scanTable)) {
+                factDictCols.add(i);
+            }
+        }
+        
+        flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+    }
+
+    protected void handleErrorRecord(String[] record, Exception ex) throws IOException {
+
+        System.err.println("Insane record: " + Arrays.toString(record));
+        ex.printStackTrace(System.err);
+
+        errorRecordCounter++;
+        if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) {
+            if (ex instanceof IOException)
+                throw (IOException) ex;
+            else if (ex instanceof RuntimeException)
+                throw (RuntimeException) ex;
+            else
+                throw new RuntimeException("", ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
new file mode 100644
index 0000000..08140e5
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/**
+ * @author yangli9
+ */
+public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text, NullWritable, Text> {
+
+    private List<TblColRef> columnList = new ArrayList<TblColRef>();
+    private boolean collectStatistics = false;
+    private String statisticsOutput = null;
+    private List<Long> baseCuboidRowCountInMappers;
+    protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = null;
+    protected long baseCuboidId;
+    protected CubeDesc cubeDesc;
+    private long totalRowsBeforeMerge = 0;
+    private int SAMPING_PERCENTAGE = 5;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+
+        Configuration conf = context.getConfiguration();
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+        cubeDesc = cube.getDescriptor();
+
+        baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+        columnList = baseCuboid.getColumns();
+        collectStatistics = Boolean.parseBoolean(conf.get(BatchConstants.CFG_STATISTICS_ENABLED));
+        statisticsOutput = conf.get(BatchConstants.CFG_STATISTICS_OUTPUT);
+
+        if (collectStatistics) {
+            baseCuboidRowCountInMappers = Lists.newArrayList();
+            cuboidHLLMap = Maps.newHashMap();
+            SAMPING_PERCENTAGE = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "5"));
+        }
+    }
+
+    @Override
+    public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+
+        if (key.get() >= 0) {
+            TblColRef col = columnList.get((int) key.get());
+
+            HashSet<ByteArray> set = new HashSet<ByteArray>();
+            for (Text textValue : values) {
+                ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
+                set.add(value);
+            }
+
+            Configuration conf = context.getConfiguration();
+            FileSystem fs = FileSystem.get(conf);
+            String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
+            FSDataOutputStream out = fs.create(new Path(outputPath, col.getName()));
+
+            try {
+                for (ByteArray value : set) {
+                    out.write(value.array(), value.offset(), value.length());
+                    out.write('\n');
+                }
+            } finally {
+                out.close();
+            }
+        } else {
+            // for hll
+            long cuboidId = 0 - key.get();
+
+            for (Text value : values) {
+                HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
+                ByteArray byteArray = new ByteArray(value.getBytes());
+                hll.readRegisters(byteArray.asBuffer());
+
+                totalRowsBeforeMerge += hll.getCountEstimate();
+
+                if (cuboidId == baseCuboidId) {
+                    baseCuboidRowCountInMappers.add(hll.getCountEstimate());
+                }
+
+                if (cuboidHLLMap.get(cuboidId) != null) {
+                    cuboidHLLMap.get(cuboidId).merge(hll);
+                } else {
+                    cuboidHLLMap.put(cuboidId, hll);
+                }
+            }
+        }
+
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+
+        //output the hll info;
+        if (collectStatistics) {
+            writeMapperAndCuboidStatistics(context); // for human check
+            writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), cuboidHLLMap, SAMPING_PERCENTAGE); // for CreateHTableJob
+        }
+    }
+
+    private void writeMapperAndCuboidStatistics(Context context) throws IOException {
+        Configuration conf = context.getConfiguration();
+        FileSystem fs = FileSystem.get(conf);
+        FSDataOutputStream out = fs.create(new Path(statisticsOutput, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION));
+
+        try {
+            String msg;
+
+            List<Long> allCuboids = new ArrayList<Long>();
+            allCuboids.addAll(cuboidHLLMap.keySet());
+            Collections.sort(allCuboids);
+
+            msg = "Total cuboid number: \t" + allCuboids.size();
+            writeLine(out, msg);
+            msg = "Samping percentage: \t" + SAMPING_PERCENTAGE;
+            writeLine(out, msg);
+
+            writeLine(out, "The following statistics are collected based sampling data.");
+            for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
+                if (baseCuboidRowCountInMappers.get(i) > 0) {
+                    msg = "Base Cuboid in Mapper " + i + " row count: \t " + baseCuboidRowCountInMappers.get(i);
+                    writeLine(out, msg);
+                }
+            }
+
+            long grantTotal = 0;
+            for (long i : allCuboids) {
+                grantTotal += cuboidHLLMap.get(i).getCountEstimate();
+                msg = "Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate();
+                writeLine(out, msg);
+            }
+
+            msg = "Sum of all the cube segments (before merge) is: \t " + totalRowsBeforeMerge;
+            writeLine(out, msg);
+
+            msg = "After merge, the cube has row count: \t " + grantTotal;
+            writeLine(out, msg);
+
+            if (grantTotal > 0) {
+                msg = "The compaction factor is: \t" + totalRowsBeforeMerge / grantTotal;
+                writeLine(out, msg);
+            }
+
+        } finally {
+            out.close();
+        }
+    }
+
+    private void writeLine(FSDataOutputStream out, String msg) throws IOException {
+        out.write(msg.getBytes());
+        out.write('\n');
+
+    }
+
+    public static void writeCuboidStatistics(Configuration conf, Path outputPath, Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage) throws IOException {
+        Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+        SequenceFile.Writer writer = SequenceFile.createWriter(conf,
+                SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class),
+                SequenceFile.Writer.valueClass(BytesWritable.class));
+
+        List<Long> allCuboids = new ArrayList<Long>();
+        allCuboids.addAll(cuboidHLLMap.keySet());
+        Collections.sort(allCuboids);
+
+        // persist the sample percentage with key 0
+        writer.append(new LongWritable(0l), new BytesWritable(Bytes.toBytes(samplingPercentage)));
+        ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+        try {
+            for (long i : allCuboids) {
+                valueBuf.clear();
+                cuboidHLLMap.get(i).writeRegisters(valueBuf);
+                valueBuf.flip();
+                writer.append(new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()));
+            }
+        } finally {
+            writer.close();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
new file mode 100644
index 0000000..8ea7fd3
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+/**
+ * @author yangli9
+ */
+public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> {
+
+    private CubeJoinedFlatTableDesc intermediateTableDesc;
+
+    protected boolean collectStatistics = false;
+    protected CuboidScheduler cuboidScheduler = null;
+    protected int nRowKey;
+    private Integer[][] allCuboidsBitSet = null;
+    private HyperLogLogPlusCounter[] allCuboidsHLL = null;
+    private Long[] cuboidIds;
+    private HashFunction hf = null;
+    private int rowCount = 0;
+    private int SAMPING_PERCENTAGE = 5;
+    private ByteArray[] row_hashcodes = null;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.setup(context);
+        intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+        collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
+        if (collectStatistics) {
+            SAMPING_PERCENTAGE = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "5"));
+            cuboidScheduler = new CuboidScheduler(cubeDesc);
+            nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+
+            List<Long> cuboidIdList = Lists.newArrayList();
+            List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
+            addCuboidBitSet(baseCuboidId, allCuboidsBitSetList, cuboidIdList);
+
+            allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[cuboidIdList.size()][]);
+            cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]);
+
+            allCuboidsHLL = new HyperLogLogPlusCounter[cuboidIds.length];
+            for (int i = 0; i < cuboidIds.length; i++) {
+                allCuboidsHLL[i] = new HyperLogLogPlusCounter(14);
+            }
+
+            hf = Hashing.murmur3_32();
+            row_hashcodes = new ByteArray[nRowKey];
+            for (int i = 0; i < nRowKey; i++) {
+                row_hashcodes[i] = new ByteArray();
+            }
+        }
+    }
+
+    private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet, List<Long> allCuboids) {
+        allCuboids.add(cuboidId);
+        BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+        Integer[] indice = new Integer[bitSet.cardinality()];
+
+        long mask = Long.highestOneBit(baseCuboidId);
+        int position = 0;
+        for (int i = 0; i < nRowKey; i++) {
+            if ((mask & cuboidId) > 0) {
+                indice[position] = i;
+                position++;
+            }
+            mask = mask >> 1;
+        }
+
+        allCuboidsBitSet.add(indice);
+        Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
+        for (Long childId : children) {
+            addCuboidBitSet(childId, allCuboidsBitSet, allCuboids);
+        }
+    }
+
+    @Override
+    public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+        String[] row = flatTableInputFormat.parseMapperInput(record);
+        try {
+            for (int i : factDictCols) {
+                outputKey.set((long) i);
+                String fieldValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
+                if (fieldValue == null)
+                    continue;
+                byte[] bytes = Bytes.toBytes(fieldValue);
+                outputValue.set(bytes, 0, bytes.length);
+                context.write(outputKey, outputValue);
+            }
+        } catch (Exception ex) {
+            handleErrorRecord(row, ex);
+        }
+
+        if (collectStatistics && rowCount < SAMPING_PERCENTAGE) {
+            putRowKeyToHLL(row);
+        }
+
+        if (rowCount++ == 100)
+            rowCount = 0;
+    }
+
+    private void putRowKeyToHLL(String[] row) {
+
+        //generate hash for each row key column
+        for (int i = 0; i < nRowKey; i++) {
+            Hasher hc = hf.newHasher();
+            String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
+            if (colValue != null) {
+                row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
+            } else {
+                row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
+            }
+        }
+
+        // user the row key column hash to get a consolidated hash for each cuboid
+        for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+            Hasher hc = hf.newHasher();
+            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+                hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array());
+            }
+
+            allCuboidsHLL[i].add(hc.hash().asBytes());
+        }
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+        if (collectStatistics) {
+            ByteBuffer hllBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+            // output each cuboid's hll to reducer, key is 0 - cuboidId
+            HyperLogLogPlusCounter hll;
+            for (int i = 0; i < cuboidIds.length; i++) {
+                hll = allCuboidsHLL[i];
+                outputKey.set(0 - cuboidIds[i]);
+                hllBuf.clear();
+                hll.writeRegisters(hllBuf);
+                outputValue.set(hllBuf.array(), 0, hllBuf.position());
+                context.write(outputKey, outputValue);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
new file mode 100644
index 0000000..266debe
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.MRUtil;
+
+/**
+ * @author George Song (ysong1)
+ */
+public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, Object> {
+    
+    private IMRTableInputFormat flatTableInputFormat;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.setup(context);
+        flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
+    }
+
+    @Override
+    public void map(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
+        counter++;
+        if (counter % BatchConstants.COUNTER_MAX == 0) {
+            logger.info("Handled " + counter + " records!");
+        }
+        
+
+        try {
+            //put a record into the shared bytesSplitter
+            String[] row = flatTableInputFormat.parseMapperInput(value);
+            bytesSplitter.setBuffers(convertUTF8Bytes(row));
+            //take care of the data in bytesSplitter
+            outputKV(context);
+
+        } catch (Exception ex) {
+            handleErrorRecord(bytesSplitter, ex);
+        }
+    }
+
+    private byte[][] convertUTF8Bytes(String[] row) throws UnsupportedEncodingException {
+        byte[][] result = new byte[row.length][];
+        for (int i = 0; i < row.length; i++) {
+            result[i] = row[i].getBytes("UTF-8");
+        }
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
new file mode 100644
index 0000000..685243c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class InMemCuboidJob extends AbstractHadoopJob {
+
+    protected static final Logger logger = LoggerFactory.getLogger(InMemCuboidJob.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_JOB_FLOW_ID);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_SEGMENT_NAME);
+            parseOptions(options, args);
+
+            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+            String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            CubeManager cubeMgr = CubeManager.getInstance(config);
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            logger.info("Starting: " + job.getJobName());
+            
+            setJobClasspath(job);
+            
+            // add metadata to distributed cache
+            attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+            // set job configuration
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+            long timeout = 1000*60*60L; // 1 hour
+            job.getConfiguration().set("mapred.task.timeout", String.valueOf(timeout));
+            
+            // set input
+            IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+            flatTableInputFormat.configureJob(job);
+            
+            // set mapper
+            job.setMapperClass(InMemCuboidMapper.class);
+            job.setMapOutputKeyClass(ByteArrayWritable.class);
+            job.setMapOutputValueClass(ByteArrayWritable.class);
+            
+            // set output
+            IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
+            storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
+            
+            return waitForCompletion(job);
+        } catch (Exception e) {
+            logger.error("error in CuboidJob", e);
+            printUsage(options);
+            throw e;
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        InMemCuboidJob job = new InMemCuboidJob();
+        int exitCode = ToolRunner.run(job, args);
+        System.exit(exitCode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
new file mode 100644
index 0000000..e9340de
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -0,0 +1,122 @@
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Maps;
+
+/**
+ */
+public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArrayWritable, ByteArrayWritable> {
+
+    private static final Log logger = LogFactory.getLog(InMemCuboidMapper.class);
+    private CubeInstance cube;
+    private CubeDesc cubeDesc;
+    private CubeSegment cubeSegment;
+    private IMRTableInputFormat flatTableInputFormat;
+
+    private int counter;
+    private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(10000);
+    private Future<?> future;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+
+        Configuration conf = context.getConfiguration();
+
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+        cube = CubeManager.getInstance(config).getCube(cubeName);
+        cubeDesc = cube.getDescriptor();
+        String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
+        cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+        flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
+
+        Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
+
+        for (DimensionDesc dim : cubeDesc.getDimensions()) {
+            // dictionary
+            for (TblColRef col : dim.getColumnRefs()) {
+                if (cubeDesc.getRowkey().isUseDictionary(col)) {
+                    Dictionary<?> dict = cubeSegment.getDictionary(col);
+                    if (dict == null) {
+                        logger.warn("Dictionary for " + col + " was not found.");
+                    }
+
+                    dictionaryMap.put(col, cubeSegment.getDictionary(col));
+                }
+            }
+        }
+
+        DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
+
+    }
+
+    @Override
+    public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+        // put each row to the queue
+        String[] row = flatTableInputFormat.parseMapperInput(record);
+        List<String> rowAsList = Arrays.asList(row);
+        
+        while (!future.isDone()) {
+            if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
+                counter++;
+                if (counter % BatchConstants.COUNTER_MAX == 0) {
+                    logger.info("Handled " + counter + " records!");
+                }
+                break;
+            }
+        }
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+        logger.info("Totally handled " + counter + " records!");
+
+        while (!future.isDone()) {
+            if (queue.offer(new ArrayList<String>(0), 1, TimeUnit.SECONDS)) {
+                break;
+            }
+        }
+        
+        try {
+            future.get();
+        } catch (Exception e) {
+            throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
+        }
+        queue.clear();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
new file mode 100644
index 0000000..004eb17
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -0,0 +1,95 @@
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.measure.MeasureAggregators;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArrayWritable, Object, Object> {
+
+    private static final Logger logger = LoggerFactory.getLogger(InMemCuboidReducer.class);
+
+    private IMRStorageOutputFormat storageOutputFormat;
+    private MeasureCodec codec;
+    private MeasureAggregators aggs;
+
+    private int counter;
+    private Object[] input;
+    private Object[] result;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        
+        String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+        String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
+        boolean isMerge = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_IS_MERGE));
+
+        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+        CubeDesc cubeDesc = cube.getDescriptor();
+        CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+        if (isMerge)
+            storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat();
+        else
+            storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
+
+        List<MeasureDesc> measuresDescs = Lists.newArrayList();
+        for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+                for (MeasureDesc measure : colDesc.getMeasures()) {
+                    measuresDescs.add(measure);
+                }
+            }
+        }
+
+        codec = new MeasureCodec(measuresDescs);
+        aggs = new MeasureAggregators(measuresDescs);
+
+        input = new Object[measuresDescs.size()];
+        result = new Object[measuresDescs.size()];
+    }
+
+    @Override
+    public void reduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException {
+
+        aggs.reset();
+
+        for (ByteArrayWritable value : values) {
+            codec.decode(value.asBuffer(), input);
+            aggs.aggregate(input);
+        }
+        aggs.collectStates(result);
+        
+        storageOutputFormat.doReducerOutput(key, result, context);
+        
+        counter++;
+        if (counter % BatchConstants.COUNTER_MAX == 0) {
+            logger.info("Handled " + counter + " records!");
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionJob.java
new file mode 100644
index 0000000..686f2b2
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionJob.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+///*
+// * Copyright 2013-2014 eBay Software Foundation
+// *
+// * Licensed under the Apache License, Version 2.0 (the "License");
+// * you may not use this file except in compliance with the License.
+// * You may obtain a copy of the License at
+// *
+// *   http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.kylin.index.cube;
+//
+//import org.apache.commons.cli.Options;
+//import org.apache.hadoop.fs.FileSystem;
+//import org.apache.hadoop.fs.Path;
+//import org.apache.hadoop.io.LongWritable;
+//import org.apache.hadoop.io.Text;
+//import org.apache.hadoop.mapreduce.Job;
+//import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+//import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+//import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+//import org.apache.hadoop.util.ToolRunner;
+//
+//import org.apache.kylin.cube.CubeInstance;
+//import org.apache.kylin.cube.CubeManager;
+//import org.apache.kylin.cube.cuboid.Cuboid;
+//import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+//import org.apache.kylin.cube.kv.RowKeyEncoder;
+//import org.apache.kylin.index.AbstractHadoopJob;
+//import org.apache.kylin.metadata.model.cube.CubeDesc;
+//
+///**
+// * @author xjiang
+// *
+// */
+//
+//public class KeyDistributionJob extends AbstractHadoopJob {
+//
+//    public static final String JOB_TITLE = "Kylin Row Key Distribution Job";
+//    public static final String KEY_HEADER_LENGTH = "key_header_length";
+//    public static final String KEY_COLUMN_PERCENTAGE = "key_column_percentage";
+//    public static final String KEY_SPLIT_NUMBER = "key_split_number";
+//
+//    /* (non-Javadoc)
+//     * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
+//     */
+//    @Override
+//    public int run(String[] args) throws Exception {
+//        Options options = new Options();
+//
+//        try {
+//            options.addOption(OPTION_INPUT_PATH);
+//            options.addOption(OPTION_OUTPUT_PATH);
+//            options.addOption(OPTION_METADATA_URL);
+//            options.addOption(OPTION_CUBE_NAME);
+//            options.addOption(OPTION_KEY_COLUMN_PERCENTAGE);
+//            options.addOption(OPTION_KEY_SPLIT_NUMBER);
+//            parseOptions(options, args);
+//
+//            // start job
+//            String jobName = JOB_TITLE + getOptionsAsString();
+//            System.out.println("Starting: " + jobName);
+//            Job job = Job.getInstanceFromEnv(getConf(), jobName);
+//
+//            // set job configuration - basic 
+//            setJobClasspath(job);
+//            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
+//
+//            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+//            FileOutputFormat.setOutputPath(job, output);
+//            //job.getConfiguration().set("dfs.block.size", "67108864");
+//
+//            // set job configuration - key prefix size & key split number
+//            String keyColumnPercentage = getOptionValue(OPTION_KEY_COLUMN_PERCENTAGE);
+//            job.getConfiguration().set(KEY_COLUMN_PERCENTAGE, keyColumnPercentage);
+//            String metadataUrl = validateMetadataUrl(getOptionValue(OPTION_METADATA_URL));
+//            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+//            int keyHeaderLen = getKeyHeaderLength(metadataUrl, cubeName);
+//            job.getConfiguration().set(KEY_HEADER_LENGTH, String.valueOf(keyHeaderLen));
+//            job.getConfiguration().set(KEY_SPLIT_NUMBER, getOptionValue(OPTION_KEY_SPLIT_NUMBER));
+//
+//            // Mapper
+//            job.setInputFormatClass(SequenceFileInputFormat.class);
+//            job.setMapperClass(KeyDistributionMapper.class);
+//            job.setMapOutputKeyClass(Text.class);
+//            job.setMapOutputValueClass(LongWritable.class);
+//
+//            // Combiner, not needed any more as mapper now does the groping
+//            //job.setCombinerClass(KeyDistributionCombiner.class);
+//
+//            // Reducer - only one
+//            job.setReducerClass(KeyDistributionReducer.class);
+//            // use sequence file as output
+//            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+//            // key is text
+//            job.setOutputKeyClass(Text.class);
+//            // value is long
+//            job.setOutputValueClass(LongWritable.class);
+//            job.setNumReduceTasks(1);
+//
+//            FileSystem fs = FileSystem.get(job.getConfiguration());
+//            if (fs.exists(output))
+//                fs.delete(output, true);
+//
+//            return waitForCompletion(job);
+//        } catch (Exception e) {
+//            printUsage(options);
+//            e.printStackTrace(System.err);
+//            return 2;
+//        }
+//    }
+//
+//    private int getKeyHeaderLength(String metadataUrl, String cubeName) {
+//        CubeManager cubeMgr = CubeManager.getInstanceFromEnv(metadataUrl);
+//        CubeInstance cubeInstance = cubeMgr.getCube(cubeName);
+//        CubeDesc cubeDesc = cubeInstance.getDescriptor();
+//        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+//        Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+//        RowKeyEncoder rowKeyEncoder =
+//                (RowKeyEncoder) AbstractRowKeyEncoder.createInstance(cubeInstance.getTheOnlySegment(),
+//                        baseCuboid);
+//
+//        return rowKeyEncoder.getHeaderLength();
+//
+//    }
+//
+//    public static void main(String[] args) throws Exception {
+//        int exitCode = ToolRunner.run(new KeyDistributionJob(), args);
+//        System.exit(exitCode);
+//    }
+// }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionMapper.java
new file mode 100644
index 0000000..e08a56b
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionMapper.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+///*
+// * Copyright 2013-2014 eBay Software Foundation
+// *
+// * Licensed under the Apache License, Version 2.0 (the "License");
+// * you may not use this file except in compliance with the License.
+// * You may obtain a copy of the License at
+// *
+// *   http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.kylin.index.cube;
+//
+//import java.io.IOException;
+//
+//import org.apache.hadoop.io.LongWritable;
+//import org.apache.hadoop.io.Text;
+//import org.apache.hadoop.mapreduce.Mapper;
+//
+///**
+// * @author xjiang
+// *
+// */
+//public class KeyDistributionMapper extends KylinMapper<Text, Text, Text, LongWritable> {
+//
+//    private int headerLength;
+//
+//    private Text currentKey;
+//    private long outputLong;
+//    private Text outputKey;
+//    private LongWritable outputValue;
+//    private int columnPercentage;
+//    private int allRowCount;
+//
+//    @Override
+//    protected void setup(Context context) throws IOException {
+//super.publishConfiguration(context.getConfiguration());
+
+//        String percentStr = context.getConfiguration().get(KeyDistributionJob.KEY_COLUMN_PERCENTAGE);
+//        this.columnPercentage = Integer.valueOf(percentStr).intValue();
+//        if (this.columnPercentage <= 0 || this.columnPercentage >= 100) {
+//            this.columnPercentage = 20;
+//        }
+//        String headerLenStr = context.getConfiguration().get(KeyDistributionJob.KEY_HEADER_LENGTH);
+//        this.headerLength = Integer.valueOf(headerLenStr).intValue();
+//
+//        currentKey = new Text();
+//        outputLong = 0;
+//        outputKey = new Text();
+//        outputValue = new LongWritable(1);
+//        allRowCount = 0;
+//    }
+//
+//    @Override
+//    protected void cleanup(Context context) throws IOException, InterruptedException {
+//        emit(context); // emit the last holding record
+//
+//        byte[] zerokey = new byte[] { 0 };
+//        outputKey.set(zerokey);
+//        outputValue.set(allRowCount);
+//        context.write(outputKey, outputValue);
+//    }
+//
+//    @Override
+//    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+//        byte[] bytes = key.getBytes();
+//        int columnLength = bytes.length - this.headerLength;
+//        int columnPrefixLen = columnLength * this.columnPercentage / 100;
+//        if (columnPrefixLen == 0 && columnLength > 0) {
+//            columnPrefixLen = 1;
+//        }
+//        if (columnPrefixLen > 0) {
+//            currentKey.set(bytes, 0, this.headerLength + columnPrefixLen);
+//        } else {
+//            currentKey.set(bytes);
+//        }
+//
+//        allRowCount++;
+//
+//        if (outputKey.getLength() == 0) { // first record
+//            outputKey.set(currentKey);
+//            outputLong = 1;
+//        } else if (outputKey.equals(currentKey)) { // same key, note input is sorted
+//            outputLong++;
+//        } else { // the next key
+//            emit(context);
+//            outputKey.set(currentKey);
+//            outputLong = 1;
+//        }
+//    }
+//
+//    private void emit(Context context) throws IOException, InterruptedException {
+//        if (outputLong == 0)
+//            return;
+//
+//        outputValue.set(outputLong);
+//        context.write(outputKey, outputValue);
+//    }
+// }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionReducer.java
new file mode 100644
index 0000000..ed3f966
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KeyDistributionReducer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+///*
+// * Copyright 2013-2014 eBay Software Foundation
+// *
+// * Licensed under the Apache License, Version 2.0 (the "License");
+// * you may not use this file except in compliance with the License.
+// * You may obtain a copy of the License at
+// *
+// *   http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.kylin.index.cube;
+//
+//import java.io.IOException;
+//
+//import org.apache.hadoop.io.LongWritable;
+//import org.apache.hadoop.io.Text;
+//import org.apache.hadoop.mapreduce.Reducer;
+//import org.apache.hadoop.util.StringUtils;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+///**
+// * @author xjiang
+// *
+// */
+//public class KeyDistributionReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
+//
+//    private static final Logger logger = LoggerFactory.getLogger(KeyDistributionReducer.class);
+//
+//    private LongWritable outputValue;
+//    private boolean isTotalCount;
+//    private long totalCount;
+//    private int splitNumber;
+//    private long splitQuota;
+//    private long splitRemain;
+//
+//    @Override
+//    protected void setup(Context context) throws IOException, InterruptedException {
+//        super.publishConfiguration(context.getConfiguration());
+
+//        String splitStr = context.getConfiguration().get(KeyDistributionJob.KEY_SPLIT_NUMBER);
+//        splitNumber = Integer.valueOf(splitStr).intValue();
+//        outputValue = new LongWritable();
+//        isTotalCount = true;
+//        totalCount = 0;
+//        splitQuota = 0;
+//        splitRemain = 0;
+//    }
+//
+//    @Override
+//    protected void cleanup(Context context) throws IOException, InterruptedException {
+//        logger.info("---------------");
+//        long splitCount = splitQuota - splitRemain;
+//        logger.info("Total Count = " + totalCount + ", Left Count = " + splitCount);
+//    }
+//
+//    @Override
+//    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,
+//            InterruptedException {
+//
+//        // calculate split quota
+//        if (isTotalCount) {
+//            for (LongWritable count : values) {
+//                totalCount += count.get();
+//            }
+//            splitQuota = totalCount / splitNumber;
+//            splitRemain = splitQuota;
+//            isTotalCount = false;
+//            return;
+//        }
+//
+//        // output key when split quota is used up 
+//        for (LongWritable count : values) {
+//            splitRemain -= count.get();
+//        }
+//        if (splitRemain <= 0) {
+//            long splitCount = splitQuota - splitRemain;
+//            String hexKey = StringUtils.byteToHexString(key.getBytes());
+//            logger.info(hexKey + "\t\t" + splitCount);
+//
+//            outputValue.set(splitCount);
+//            context.write(key, outputValue);
+//            splitRemain = splitQuota;
+//        }
+//
+//    }
+// }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
new file mode 100644
index 0000000..4585dd1
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -0,0 +1,96 @@
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ */
+public class MapContextGTRecordWriter implements ICuboidWriter {
+
+    private static final Log logger = LogFactory.getLog(MapContextGTRecordWriter.class);
+    protected MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext;
+    private Long lastCuboidId;
+    protected CubeSegment cubeSegment;
+    protected CubeDesc cubeDesc;
+
+    private int bytesLength;
+    private int dimensions;
+    private int measureCount;
+    private byte[] keyBuf;
+    private int[] measureColumnsIndex;
+    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+    private ByteArrayWritable outputKey = new ByteArrayWritable();
+    private ByteArrayWritable outputValue = new ByteArrayWritable();
+    private long cuboidRowCount = 0;
+
+    public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
+        this.mapContext = mapContext;
+        this.cubeDesc = cubeDesc;
+        this.cubeSegment = cubeSegment;
+        this.measureCount = cubeDesc.getMeasures().size();
+
+    }
+
+    @Override
+    public void write(long cuboidId, GTRecord record) throws IOException {
+
+        if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
+            // output another cuboid
+            initVariables(cuboidId);
+            if (lastCuboidId != null) {
+                logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows");
+                cuboidRowCount = 0;
+            }
+        }
+
+        cuboidRowCount++;
+        int offSet = RowConstants.ROWKEY_CUBOIDID_LEN;
+        for (int x = 0; x < dimensions; x++) {
+            System.arraycopy(record.get(x).array(), record.get(x).offset(), keyBuf, offSet, record.get(x).length());
+            offSet += record.get(x).length();
+        }
+
+        //output measures
+        valueBuf.clear();
+        record.exportColumns(measureColumnsIndex, valueBuf);
+
+        outputKey.set(keyBuf, 0, offSet);
+        outputValue.set(valueBuf.array(), 0, valueBuf.position());
+        try {
+            mapContext.write(outputKey, outputValue);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void initVariables(Long cuboidId) {
+        bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN;
+        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
+        for (TblColRef column : cuboid.getColumns()) {
+            bytesLength += cubeSegment.getColumnLength(column);
+        }
+
+        keyBuf = new byte[bytesLength];
+        dimensions = BitSet.valueOf(new long[]{cuboidId}).cardinality();
+        measureColumnsIndex = new int[measureCount];
+        for (int i = 0; i < measureCount; i++) {
+            measureColumnsIndex[i] = dimensions + i;
+        }
+
+        System.arraycopy(Bytes.toBytes(cuboidId), 0, keyBuf, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
new file mode 100644
index 0000000..6525d93
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+/**
+ */
+public class MergeCuboidFromStorageJob extends CuboidJob {
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_JOB_FLOW_ID);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_SEGMENT_NAME);
+            parseOptions(options, args);
+
+            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+            String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            CubeManager cubeMgr = CubeManager.getInstance(config);
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+
+            Configuration conf = this.getConf();
+
+            // start job
+            String jobName = getOptionValue(OPTION_JOB_NAME);
+            System.out.println("Starting: " + jobName);
+            job = Job.getInstance(conf, jobName);
+
+            setJobClasspath(job);
+            
+            // add metadata to distributed cache
+            attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+            // set job configuration
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+            job.getConfiguration().set(BatchConstants.CFG_IS_MERGE, "true");
+
+            // configure mapper input
+            IMRStorageInputFormat storageInputFormat = MRUtil.getBatchMergeInputSide2(cubeSeg).getStorageInputFormat();
+            storageInputFormat.configureInput(MergeCuboidFromStorageMapper.class, ByteArrayWritable.class, ByteArrayWritable.class, job);
+
+            // configure reducer output
+            IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat();
+            storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
+            
+            return waitForCompletion(job);
+        } catch (Exception e) {
+            logger.error("error in MergeCuboidFromHBaseJob", e);
+            printUsage(options);
+            throw e;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
new file mode 100644
index 0000000..2281432
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * @author shaoshi
+ */
+public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, ByteArrayWritable, ByteArrayWritable> {
+
+    private static final Logger logger = LoggerFactory.getLogger(MergeCuboidFromStorageMapper.class);
+    
+    private KylinConfig config;
+    private String cubeName;
+    private String segmentName;
+    private CubeManager cubeManager;
+    private CubeInstance cube;
+    private CubeDesc cubeDesc;
+    private CubeSegment mergedCubeSegment;
+    private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle
+    private IMRStorageInputFormat storageInputFormat;
+
+    private ByteArrayWritable outputKey = new ByteArrayWritable();
+    private byte[] newKeyBuf;
+    private RowKeySplitter rowKeySplitter;
+
+    private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
+
+    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+    private MeasureCodec codec;
+    private ByteArrayWritable outputValue = new ByteArrayWritable();
+
+    private Boolean checkNeedMerging(TblColRef col) throws IOException {
+        Boolean ret = dictsNeedMerging.get(col);
+        if (ret != null)
+            return ret;
+        else {
+            ret = cubeDesc.getRowkey().isUseDictionary(col);
+            if (ret) {
+                String dictTable = (String) DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
+                ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
+            }
+            dictsNeedMerging.put(col, ret);
+            return ret;
+        }
+    }
+
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+        config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
+
+        cubeManager = CubeManager.getInstance(config);
+        cube = cubeManager.getCube(cubeName);
+        cubeDesc = cube.getDescriptor();
+        mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+        storageInputFormat = MRUtil.getBatchMergeInputSide2(mergedCubeSegment).getStorageInputFormat();
+
+        newKeyBuf = new byte[256];// size will auto-grow
+
+        sourceCubeSegment = storageInputFormat.findSourceSegment(context, cube);
+        logger.info(sourceCubeSegment.toString());
+
+        this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+
+        List<MeasureDesc> measuresDescs = Lists.newArrayList();
+        for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+                for (MeasureDesc measure : colDesc.getMeasures()) {
+                    measuresDescs.add(measure);
+                }
+            }
+        }
+        codec = new MeasureCodec(measuresDescs);
+    }
+
+    @Override
+    public void map(Object inKey, Object inValue, Context context) throws IOException, InterruptedException {
+        Pair<ByteArrayWritable, Object[]> pair = storageInputFormat.parseMapperInput(inKey, inValue);
+        ByteArrayWritable key = pair.getFirst();
+        Object[] value = pair.getSecond();
+        
+        Preconditions.checkState(key.offset() == 0);
+        
+        long cuboidID = rowKeySplitter.split(key.array(), key.length());
+        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
+
+        SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
+        int bufOffset = 0;
+        BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
+        bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
+
+        for (int i = 0; i < cuboid.getColumns().size(); ++i) {
+            TblColRef col = cuboid.getColumns().get(i);
+
+            if (this.checkNeedMerging(col)) {
+                // if dictionary on fact table column, needs rewrite
+                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+                Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
+                Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
+
+                while (sourceDict.getSizeOfValue() > newKeyBuf.length - bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) {
+                    byte[] oldBuf = newKeyBuf;
+                    newKeyBuf = new byte[2 * newKeyBuf.length];
+                    System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
+                }
+
+                int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value, 0, splittedByteses[i + 1].length);
+
+                int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset);
+                int idInMergedDict;
+                if (size < 0) {
+                    idInMergedDict = mergedDict.nullId();
+                } else {
+                    idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size);
+                }
+                BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId());
+
+                bufOffset += mergedDict.getSizeOfId();
+            } else {
+                // keep as it is
+                while (splittedByteses[i + 1].length > newKeyBuf.length - bufOffset) {
+                    byte[] oldBuf = newKeyBuf;
+                    newKeyBuf = new byte[2 * newKeyBuf.length];
+                    System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
+                }
+
+                System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, bufOffset, splittedByteses[i + 1].length);
+                bufOffset += splittedByteses[i + 1].length;
+            }
+        }
+        byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset);
+        outputKey.set(newKey, 0, newKey.length);
+
+        valueBuf.clear();
+        codec.encode(value, valueBuf);
+        outputValue.set(valueBuf.array(), 0, valueBuf.position());
+        
+        context.write(outputKey, outputValue);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
new file mode 100644
index 0000000..ceebfc1
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+/**
+ * @author ysong1
+ */
+public class MergeCuboidJob extends CuboidJob {
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_SEGMENT_NAME);
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_OUTPUT_PATH);
+            parseOptions(options, args);
+
+            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+            String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            CubeManager cubeMgr = CubeManager.getInstance(config);
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+
+            // start job
+            String jobName = getOptionValue(OPTION_JOB_NAME);
+            System.out.println("Starting: " + jobName);
+            job = Job.getInstance(getConf(), jobName);
+
+            setJobClasspath(job);
+
+            // set inputs
+            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
+
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            FileOutputFormat.setOutputPath(job, output);
+
+            // Mapper
+            job.setInputFormatClass(SequenceFileInputFormat.class);
+            job.setMapperClass(MergeCuboidMapper.class);
+            job.setMapOutputKeyClass(Text.class);
+            job.setMapOutputValueClass(Text.class);
+
+            // Reducer - only one
+            job.setReducerClass(CuboidReducer.class);
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(Text.class);
+
+            // set job configuration
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+
+            // add metadata to distributed cache
+            attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+            setReduceTaskNum(job, config, cubeName, 0);
+
+            this.deletePath(job.getConfiguration(), output);
+
+            return waitForCompletion(job);
+        } catch (Exception e) {
+            logger.error("error in MergeCuboidJob", e);
+            printUsage(options);
+            throw e;
+        }
+    }
+
+}