You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by nj...@apache.org on 2017/12/02 17:23:58 UTC

[05/19] kylin git commit: APACHE-KYLIN-2733: Introduce optimize job for adjusting cuboid set

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
new file mode 100644
index 0000000..0379f64
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.job.exception.JobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MapReduceUtil {
+
+    private static final Logger logger = LoggerFactory.getLogger(MapReduceUtil.class);
+
+    /**
+     * @param cuboidScheduler specified can provide more flexibility
+     * */
+    public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
+            double totalMapInputMB, int level)
+            throws ClassNotFoundException, IOException, InterruptedException, JobException {
+        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+        KylinConfig kylinConfig = cubeDesc.getConfig();
+
+        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+        logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level "
+                + level);
+
+        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cuboidScheduler, kylinConfig);
+
+        double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
+
+        if (level == -1) {
+            //merge case
+            double estimatedSize = cubeStatsReader.estimateCubeSize();
+            adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize;
+            logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize,
+                    totalMapInputMB, adjustedCurrentLayerSizeEst);
+        } else if (level == 0) {
+            //base cuboid case TODO: the estimation could be very WRONG because it has no correction
+            adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
+            logger.debug("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
+        } else {
+            parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
+            currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
+            adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
+            logger.debug(
+                    "totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}",
+                    totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
+        }
+
+        // number of reduce tasks
+        int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99);
+
+        // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
+        if (cubeDesc.hasMemoryHungryMeasures()) {
+            logger.debug("Multiply reducer num by 4 to boost performance for memory hungry measures");
+            numReduceTasks = numReduceTasks * 4;
+        }
+
+        // at least 1 reducer by default
+        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
+        // no more than 500 reducer by default
+        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+        return numReduceTasks;
+    }
+
+    public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, CuboidScheduler cuboidScheduler)
+            throws IOException {
+        KylinConfig kylinConfig = cubeSeg.getConfig();
+
+        Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig).getCuboidSizeMap();
+        double totalSizeInM = 0;
+        for (Double cuboidSize : cubeSizeMap.values()) {
+            totalSizeInM += cuboidSize;
+        }
+
+        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+
+        // number of reduce tasks
+        int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);
+
+        // at least 1 reducer by default
+        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
+        // no more than 500 reducer by default
+        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+        logger.info("Having total map input MB " + Math.round(totalSizeInM));
+        logger.info("Having per reduce MB " + perReduceInputMB);
+        logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
+        return numReduceTasks;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
index 9c805a8..a5a1ba8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
@@ -38,7 +38,7 @@ public class StatisticsDecisionUtil {
     protected static final Logger logger = LoggerFactory.getLogger(StatisticsDecisionUtil.class);
 
     public static void decideCubingAlgorithm(CubingJob cubingJob, CubeSegment seg) throws IOException {
-        CubeStatsReader cubeStats = new CubeStatsReader(seg, seg.getConfig());
+        CubeStatsReader cubeStats = new CubeStatsReader(seg, null, seg.getConfig());
         decideCubingAlgorithm(cubingJob, seg, cubeStats.getMapperOverlapRatioOfFirstBuild(),
                 cubeStats.getMapperNumberOfFirstBuild());
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 1ae6cd0..0ad4b9e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -60,7 +60,7 @@ abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
     @Override
     protected void doSetup(Context context) throws IOException {
         super.bindCurrentConfiguration(context.getConfiguration());
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
         segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
         final KylinConfig kylinConfig = AbstractHadoopJob.loadKylinPropsAndMetadata();
         cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
new file mode 100644
index 0000000..b60076c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CalculateStatsFromBaseCuboidJob extends AbstractHadoopJob {
+
+    private static final Logger logger = LoggerFactory.getLogger(CalculateStatsFromBaseCuboidJob.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_SEGMENT_ID);
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT);
+            options.addOption(OPTION_CUBOID_MODE);
+            parseOptions(options, args);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            String cubeName = getOptionValue(OPTION_CUBE_NAME);
+            String segmentID = getOptionValue(OPTION_SEGMENT_ID);
+            Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT);
+            String cuboidMode = getOptionValue(OPTION_CUBOID_MODE);
+
+            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            CubeSegment cubeSegment = cube.getSegmentById(segmentID);
+
+            job.getConfiguration().set(BatchConstants.CFG_CUBOID_MODE, cuboidMode);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
+            job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
+            logger.info("Starting: " + job.getJobName());
+
+            setJobClasspath(job, cube.getConfig());
+
+            setupMapper(input);
+            setupReducer(output, 1);
+
+            attachSegmentMetadataWithDict(cubeSegment, job.getConfiguration());
+
+            return waitForCompletion(job);
+
+        } catch (Exception e) {
+            logger.error("error in CalculateStatsFromBaseCuboidJob", e);
+            printUsage(options);
+            throw e;
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
+    }
+
+    private void setupMapper(Path input) throws IOException {
+        FileInputFormat.setInputPaths(job, input);
+        job.setMapperClass(CalculateStatsFromBaseCuboidMapper.class);
+        job.setInputFormatClass(SequenceFileInputFormat.class);
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(Text.class);
+    }
+
+    private void setupReducer(Path output, int numberOfReducers) throws IOException {
+        job.setReducerClass(CalculateStatsFromBaseCuboidReducer.class);
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputValueClass(Text.class);
+        job.setNumReduceTasks(numberOfReducers);
+
+        FileOutputFormat.setOutputPath(job, output);
+        job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
+
+        deletePath(job.getConfiguration(), output);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapper.java
new file mode 100644
index 0000000..1b32944
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapper.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinVersion;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
+import org.apache.kylin.cube.kv.RowKeyDecoder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+public class CalculateStatsFromBaseCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
+    private static final Logger logger = LoggerFactory.getLogger(CalculateStatsFromBaseCuboidMapper.class);
+
+    protected int nRowKey;
+    protected long baseCuboidId;
+
+    private int samplingPercentage;
+    private int rowCount = 0;
+    private long[] rowHashCodesLong = null;
+    //about details of the new algorithm, please see KYLIN-2518
+    private boolean isUsePutRowKeyToHllNewAlgorithm;
+
+    private HLLCounter[] allCuboidsHLL = null;
+    private Long[] cuboidIds;
+    private Integer[][] allCuboidsBitSet = null;
+    private HashFunction hf = null;
+
+    RowKeyDecoder rowKeyDecoder;
+
+    protected Text outputKey = new Text();
+    protected Text outputValue = new Text();
+
+    @Override
+    protected void doSetup(Context context) throws IOException {
+        Configuration conf = context.getConfiguration();
+        HadoopUtil.setCurrentConfiguration(conf);
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+        CubeDesc cubeDesc = cube.getDescriptor();
+        CubeSegment cubeSegment = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
+
+        baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId();
+        nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+
+        String cuboidModeName = conf.get(BatchConstants.CFG_CUBOID_MODE);
+        Set<Long> cuboidIdSet = cube.getCuboidsByMode(cuboidModeName);
+
+        cuboidIds = cuboidIdSet.toArray(new Long[cuboidIdSet.size()]);
+        allCuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, nRowKey);
+
+        samplingPercentage = Integer
+                .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
+
+        allCuboidsHLL = new HLLCounter[cuboidIds.length];
+        for (int i = 0; i < cuboidIds.length; i++) {
+            allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision());
+        }
+
+        //for KYLIN-2518 backward compatibility
+        if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
+            isUsePutRowKeyToHllNewAlgorithm = false;
+            hf = Hashing.murmur3_32();
+            logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion());
+        } else {
+            isUsePutRowKeyToHllNewAlgorithm = true;
+            rowHashCodesLong = new long[nRowKey];
+            hf = Hashing.murmur3_128();
+            logger.info(
+                    "Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518",
+                    cubeDesc.getVersion());
+        }
+
+        rowKeyDecoder = new RowKeyDecoder(cubeSegment);
+    }
+
+    @Override
+    public void doMap(Text key, Text value, Context context) throws InterruptedException, IOException {
+        long cuboidID = rowKeyDecoder.decode(key.getBytes());
+        if (cuboidID != baseCuboidId) {
+            return; // Skip data from cuboids which are not the base cuboid
+        }
+
+        List<String> keyValues = rowKeyDecoder.getValues();
+
+        if (rowCount < samplingPercentage) {
+            Preconditions.checkArgument(nRowKey == keyValues.size());
+
+            String[] row = keyValues.toArray(new String[keyValues.size()]);
+            if (isUsePutRowKeyToHllNewAlgorithm) {
+                putRowKeyToHLLNew(row);
+            } else {
+                putRowKeyToHLLOld(row);
+            }
+        }
+
+        if (++rowCount == 100)
+            rowCount = 0;
+    }
+
+    public void putRowKeyToHLLOld(String[] row) {
+        //generate hash for each row key column
+        byte[][] rowHashCodes = new byte[nRowKey][];
+        for (int i = 0; i < nRowKey; i++) {
+            Hasher hc = hf.newHasher();
+            String colValue = row[i];
+            if (colValue != null) {
+                rowHashCodes[i] = hc.putString(colValue).hash().asBytes();
+            } else {
+                rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+            }
+        }
+
+        // use the row key column hash to get a consolidated hash for each cuboid
+        for (int i = 0; i < cuboidIds.length; i++) {
+            Hasher hc = hf.newHasher();
+            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+                hc.putBytes(rowHashCodes[allCuboidsBitSet[i][position]]);
+            }
+
+            allCuboidsHLL[i].add(hc.hash().asBytes());
+        }
+    }
+
+    private void putRowKeyToHLLNew(String[] row) {
+        //generate hash for each row key column
+        for (int i = 0; i < nRowKey; i++) {
+            Hasher hc = hf.newHasher();
+            String colValue = row[i];
+            if (colValue == null)
+                colValue = "0";
+            byte[] bytes = hc.putString(colValue).hash().asBytes();
+            rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+        }
+
+        // user the row key column hash to get a consolidated hash for each cuboid
+        for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+            long value = 0;
+            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+                value += rowHashCodesLong[allCuboidsBitSet[i][position]];
+            }
+            allCuboidsHLL[i].addHashDirectly(value);
+        }
+    }
+
+    @Override
+    protected void doCleanup(Context context) throws IOException, InterruptedException {
+        ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+        HLLCounter hll;
+        for (int i = 0; i < cuboidIds.length; i++) {
+            hll = allCuboidsHLL[i];
+
+            outputKey.set(Bytes.toBytes(cuboidIds[i]));
+            hllBuf.clear();
+            hll.writeRegisters(hllBuf);
+            outputValue.set(hllBuf.array(), 0, hllBuf.position());
+            context.write(outputKey, outputValue);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java
new file mode 100644
index 0000000..489dac4
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class CalculateStatsFromBaseCuboidReducer extends KylinReducer<Text, Text, NullWritable, Text> {
+
+    private static final Logger logger = LoggerFactory.getLogger(CalculateStatsFromBaseCuboidReducer.class);
+
+    private KylinConfig cubeConfig;
+    protected long baseCuboidId;
+    protected Map<Long, HLLCounter> cuboidHLLMap = null;
+    private List<Long> baseCuboidRowCountInMappers;
+    private long totalRowsBeforeMerge = 0;
+
+    private String output = null;
+    private int samplingPercentage;
+
+    @Override
+    protected void doSetup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+
+        Configuration conf = context.getConfiguration();
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+        cubeConfig = cube.getConfig();
+
+        baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId();
+        baseCuboidRowCountInMappers = Lists.newLinkedList();
+
+        output = conf.get(BatchConstants.CFG_OUTPUT_PATH);
+        samplingPercentage = Integer
+                .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
+
+        cuboidHLLMap = Maps.newHashMap();
+    }
+
+    @Override
+    public void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+        long cuboidId = Bytes.toLong(key.getBytes());
+        logger.info("Cuboid id to be processed: " + cuboidId);
+        for (Text value : values) {
+            HLLCounter hll = new HLLCounter(cubeConfig.getCubeStatsHLLPrecision());
+            ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
+            hll.readRegisters(bf);
+
+            if (cuboidId == baseCuboidId) {
+                baseCuboidRowCountInMappers.add(hll.getCountEstimate());
+            }
+
+            totalRowsBeforeMerge += hll.getCountEstimate();
+
+            if (cuboidHLLMap.get(cuboidId) != null) {
+                cuboidHLLMap.get(cuboidId).merge(hll);
+            } else {
+                cuboidHLLMap.put(cuboidId, hll);
+            }
+        }
+    }
+
+    @Override
+    protected void doCleanup(Context context) throws IOException, InterruptedException {
+        long grandTotal = 0;
+        for (HLLCounter hll : cuboidHLLMap.values()) {
+            grandTotal += hll.getCountEstimate();
+        }
+        double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
+
+        CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(output), //
+                cuboidHLLMap, samplingPercentage, baseCuboidRowCountInMappers.size(), mapperOverlapRatio);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
new file mode 100644
index 0000000..3341be9
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class CopyDictionaryStep extends AbstractExecutable {
+
+    private static final Logger logger = LoggerFactory.getLogger(CopyDictionaryStep.class);
+
+    public CopyDictionaryStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        final CubeManager mgr = CubeManager.getInstance(context.getConfig());
+        final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+        final CubeSegment optimizeSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+
+        CubeSegment oldSegment = optimizeSegment.getCubeInstance().getOriginalSegmentToOptimize(optimizeSegment);
+        Preconditions.checkNotNull(oldSegment,
+                "cannot find the original segment to be optimized by " + optimizeSegment);
+
+        // --- Copy dictionary
+        optimizeSegment.getDictionaries().putAll(oldSegment.getDictionaries());
+        optimizeSegment.getSnapshots().putAll(oldSegment.getSnapshots());
+        optimizeSegment.getRowkeyStats().addAll(oldSegment.getRowkeyStats());
+
+        try {
+            CubeUpdate cubeBuilder = new CubeUpdate(cube);
+            cubeBuilder.setToUpdateSegs(optimizeSegment);
+            mgr.updateCube(cubeBuilder);
+        } catch (IOException e) {
+            logger.error("fail to merge dictionary or lookup snapshots", e);
+            return ExecuteResult.createError(e);
+        }
+
+        return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
index 65c5869..e06077a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
@@ -38,6 +38,7 @@ import javax.annotation.Nullable;
 public class CubingExecutableUtil {
 
     public static final String CUBE_NAME = "cubeName";
+    public static final String SEGMENT_NAME = "segmentName";
     public static final String SEGMENT_ID = "segmentId";
     public static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
     public static final String STATISTICS_PATH = "statisticsPath";
@@ -61,6 +62,14 @@ public class CubingExecutableUtil {
         return params.get(CUBE_NAME);
     }
 
+    public static void setSegmentName(String segmentName, Map<String, String> params) {
+        params.put(SEGMENT_NAME, segmentName);
+    }
+
+    public static String getSegmentName(Map<String, String> params) {
+        return params.get(SEGMENT_NAME);
+    }
+
     public static void setSegmentId(String segmentId, Map<String, String> params) {
         params.put(SEGMENT_ID, segmentId);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index 6a8ba4c..b49b639 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -28,12 +28,15 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.IMROutput2;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,18 +53,27 @@ public class CuboidJob extends AbstractHadoopJob {
 
     private boolean skipped = false;
 
+    private CuboidScheduler cuboidScheduler;
+
     @Override
     public boolean isSkipped() {
         return skipped;
     }
 
-    private boolean checkSkip(String cubingJobId) {
+    private boolean checkSkip(String cubingJobId, int level) {
         if (cubingJobId == null)
             return false;
 
         ExecutableManager execMgr = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
         CubingJob cubingJob = (CubingJob) execMgr.getJob(cubingJobId);
         skipped = cubingJob.isLayerCubing() == false;
+        if (!skipped) {
+            skipped = (level > cuboidScheduler.getBuildLevel());
+            if (skipped) {
+                logger.info("Job level: " + level + " for " + cubingJobId + "[" + cubingJobId
+                        + "] exceeds real cuboid tree levels : " + cuboidScheduler.getBuildLevel());
+            }
+        }
         return skipped;
     }
 
@@ -80,6 +92,7 @@ public class CuboidJob extends AbstractHadoopJob {
             options.addOption(OPTION_OUTPUT_PATH);
             options.addOption(OPTION_NCUBOID_LEVEL);
             options.addOption(OPTION_CUBING_JOB_ID);
+            options.addOption(OPTION_CUBOID_MODE);
             parseOptions(options, args);
 
             String output = getOptionValue(OPTION_OUTPUT_PATH);
@@ -87,12 +100,18 @@ public class CuboidJob extends AbstractHadoopJob {
             int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL));
             String segmentID = getOptionValue(OPTION_SEGMENT_ID);
             String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID);
+            String cuboidModeName = getOptionValue(OPTION_CUBOID_MODE);
+            if (cuboidModeName == null) {
+                cuboidModeName = CuboidModeEnum.CURRENT.toString();
+            }
 
             CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
             CubeInstance cube = cubeMgr.getCube(cubeName);
             CubeSegment segment = cube.getSegmentById(segmentID);
 
-            if (checkSkip(cubingJobId)) {
+            cuboidScheduler = CuboidSchedulerUtil.getCuboidSchedulerByMode(segment, cuboidModeName);
+
+            if (checkSkip(cubingJobId, nCuboidLevel)) {
                 logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + segmentID + "[" + segmentID + "]");
                 return 0;
             }
@@ -104,7 +123,7 @@ public class CuboidJob extends AbstractHadoopJob {
             setJobClasspath(job, cube.getConfig());
 
             // add metadata to distributed cache
-            attachSegmentMetadataWithDict(segment, job.getConfiguration());
+            attachSegmentMetadataWithAll(segment, job.getConfiguration());
 
             // Mapper
             job.setMapperClass(this.mapperClass);
@@ -122,12 +141,13 @@ public class CuboidJob extends AbstractHadoopJob {
 
             // set output
             IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(segment).getOuputFormat();
-            outputFormat.configureJobOutput(job, output, segment, nCuboidLevel);
+            outputFormat.configureJobOutput(job, output, segment, cuboidScheduler, nCuboidLevel);
 
             // set job configuration
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
             job.getConfiguration().setInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, nCuboidLevel);
+            job.getConfiguration().set(BatchConstants.CFG_CUBOID_MODE, cuboidModeName);
 
             return waitForCompletion(job);
         } finally {

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index 2965e2f..e9fd3bd 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -21,13 +21,13 @@ package org.apache.kylin.engine.mr.steps;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinVersion;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.hllc.HLLCounter;
@@ -37,7 +37,6 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
@@ -56,7 +55,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
 
 
     protected boolean collectStatistics = false;
-    protected CuboidScheduler cuboidScheduler = null;
     protected int nRowKey;
     private Integer[][] allCuboidsBitSet = null;
     private HLLCounter[] allCuboidsHLL = null;
@@ -86,15 +84,11 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
         collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
         if (collectStatistics) {
             samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
-            cuboidScheduler = cubeDesc.getInitialCuboidScheduler();
             nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
 
-            List<Long> cuboidIdList = Lists.newArrayList();
-            List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
-            addCuboidBitSet(baseCuboidId, allCuboidsBitSetList, cuboidIdList);
-
-            allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[cuboidIdList.size()][]);
-            cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]);
+            Set<Long> cuboidIdSet = cubeSeg.getCuboidScheduler().getAllCuboidIds();
+            cuboidIds = cuboidIdSet.toArray(new Long[cuboidIdSet.size()]);
+            allCuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, nRowKey);
 
             allCuboidsHLL = new HLLCounter[cuboidIds.length];
             for (int i = 0; i < cuboidIds.length; i++) {
@@ -129,27 +123,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
 
     }
 
-    private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet, List<Long> allCuboids) {
-        allCuboids.add(cuboidId);
-        Integer[] indice = new Integer[Long.bitCount(cuboidId)];
-
-        long mask = Long.highestOneBit(baseCuboidId);
-        int position = 0;
-        for (int i = 0; i < nRowKey; i++) {
-            if ((mask & cuboidId) > 0) {
-                indice[position] = i;
-                position++;
-            }
-            mask = mask >> 1;
-        }
-
-        allCuboidsBitSet.add(indice);
-        Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
-        for (Long childId : children) {
-            addCuboidBitSet(childId, allCuboidsBitSet, allCuboids);
-        }
-    }
-
     @Override
     public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
         Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record);

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 74c8c2c..0f65a3e 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -110,6 +110,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
         if (collectStatistics && (taskId == numberOfTasks - 1)) {
             // hll
             isStatistics = true;
+            baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId();
             baseCuboidRowCountInMappers = Lists.newArrayList();
             cuboidHLLMap = Maps.newHashMap();
             samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java
new file mode 100644
index 0000000..97f9dc1
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FilterRecommendCuboidDataJob extends AbstractHadoopJob {
+
+    private static final Logger logger = LoggerFactory.getLogger(FilterRecommendCuboidDataJob.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_SEGMENT_ID);
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_OUTPUT_PATH);
+            parseOptions(options, args);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+            String segmentID = getOptionValue(OPTION_SEGMENT_ID);
+            Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            CubeSegment optSegment = cube.getSegmentById(segmentID);
+            CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment);
+
+            logger.info("Starting: " + job.getJobName());
+
+            setJobClasspath(job, cube.getConfig());
+
+            // Mapper
+            job.setMapperClass(FilterRecommendCuboidDataMapper.class);
+
+            // Reducer
+            job.setNumReduceTasks(0);
+
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(Text.class);
+
+            // Input
+            job.setInputFormatClass(SequenceFileInputFormat.class);
+            FileInputFormat.setInputPaths(job, input);
+            // Output
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+            FileOutputFormat.setOutputPath(job, output);
+
+            // set job configuration
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
+            // add metadata to distributed cache
+            attachSegmentMetadataWithDict(originalSegment, job.getConfiguration());
+
+            this.deletePath(job.getConfiguration(), output);
+
+            return waitForCompletion(job);
+        } catch (Exception e) {
+            logger.error("error in CuboidJob", e);
+            printUsage(options);
+            throw e;
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
new file mode 100644
index 0000000..d8ae3ab
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase;
+import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+import com.google.common.base.Preconditions;
+
+public class FilterRecommendCuboidDataMapper extends KylinMapper<Text, Text, Text, Text> {
+
+    private MultipleOutputs mos;
+
+    private RowKeySplitter rowKeySplitter;
+    private long baseCuboid;
+    private Set<Long> recommendCuboids;
+
+    @Override
+    protected void doSetup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+        mos = new MultipleOutputs(context);
+
+        String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
+        String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
+
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        CubeManager cubeManager = CubeManager.getInstance(config);
+        CubeInstance cube = cubeManager.getCube(cubeName);
+        CubeSegment optSegment = cube.getSegmentById(segmentID);
+        CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment);
+
+        rowKeySplitter = new RowKeySplitter(originalSegment, 65, 255);
+        baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
+
+        recommendCuboids = cube.getCuboidsRecommend();
+        Preconditions.checkNotNull(recommendCuboids, "The recommend cuboid map could not be null");
+    }
+
+    @Override
+    public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
+        long cuboidID = rowKeySplitter.split(key.getBytes());
+        if (cuboidID != baseCuboid && !recommendCuboids.contains(cuboidID)) {
+            return;
+        }
+
+        String baseOutputPath = PathNameCuboidOld;
+        if (cuboidID == baseCuboid) {
+            baseOutputPath = PathNameCuboidBase;
+        }
+        mos.write(key, value, generateFileName(baseOutputPath));
+    }
+
+    @Override
+    public void doCleanup(Context context) throws IOException, InterruptedException {
+        mos.close();
+
+        Path outputDirBase = new Path(context.getConfiguration().get(FileOutputFormat.OUTDIR), PathNameCuboidBase);
+        FileSystem fs = FileSystem.get(context.getConfiguration());
+        if (!fs.exists(outputDirBase)) {
+            fs.mkdirs(outputDirBase);
+            SequenceFile
+                    .createWriter(context.getConfiguration(),
+                            SequenceFile.Writer.file(new Path(outputDirBase, "part-m-00000")),
+                            SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class))
+                    .close();
+        }
+    }
+
+    private String generateFileName(String subDir) {
+        return subDir + "/part";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java
new file mode 100644
index 0000000..62109f4
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil;
+import org.apache.kylin.engine.mr.common.MapReduceUtil;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InMemCuboidFromBaseCuboidJob extends AbstractHadoopJob {
+    protected static final Logger logger = LoggerFactory.getLogger(InMemCuboidFromBaseCuboidJob.class);
+
+    private boolean skipped = false;
+
+    @Override
+    public boolean isSkipped() {
+        return skipped;
+    }
+
+    private boolean checkSkip(String cubingJobId) {
+        if (cubingJobId == null)
+            return false;
+
+        ExecutableManager execMgr = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
+        CubingJob cubingJob = (CubingJob) execMgr.getJob(cubingJobId);
+        skipped = !cubingJob.isInMemCubing();
+        return skipped;
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_SEGMENT_ID);
+            options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_CUBING_JOB_ID);
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_CUBOID_MODE);
+            parseOptions(options, args);
+
+            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+            String segmentID = getOptionValue(OPTION_SEGMENT_ID);
+            String output = getOptionValue(OPTION_OUTPUT_PATH);
+
+            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            CubeSegment cubeSeg = cube.getSegmentById(segmentID);
+            String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID);
+
+            String cuboidModeName = getOptionValue(OPTION_CUBOID_MODE);
+            if (cuboidModeName == null) {
+                cuboidModeName = CuboidModeEnum.CURRENT.toString();
+            }
+
+            CuboidScheduler cuboidScheduler = CuboidSchedulerUtil.getCuboidSchedulerByMode(cubeSeg, cuboidModeName);
+
+            if (checkSkip(cubingJobId)) {
+                logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + cubeSeg);
+                return 0;
+            }
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            logger.info("Starting: " + job.getJobName());
+
+            setJobClasspath(job, cube.getConfig());
+
+            // add metadata to distributed cache
+            attachSegmentMetadataWithAll(cubeSeg, job.getConfiguration());
+
+            // set job configuration
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
+            job.getConfiguration().set(BatchConstants.CFG_CUBOID_MODE, cuboidModeName);
+
+            String input = getOptionValue(OPTION_INPUT_PATH);
+            FileInputFormat.setInputPaths(job, new Path(input));
+            job.setInputFormatClass(SequenceFileInputFormat.class);
+
+            // set mapper
+            job.setMapperClass(InMemCuboidFromBaseCuboidMapper.class);
+            job.setMapOutputKeyClass(ByteArrayWritable.class);
+            job.setMapOutputValueClass(ByteArrayWritable.class);
+
+            // set output
+            job.setReducerClass(InMemCuboidFromBaseCuboidReducer.class);
+            job.setNumReduceTasks(MapReduceUtil.getInmemCubingReduceTaskNum(cubeSeg, cuboidScheduler));
+
+            // the cuboid file and KV class must be compatible with 0.7 version for smooth upgrade
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(Text.class);
+
+            Path outputPath = new Path(output);
+            FileOutputFormat.setOutputPath(job, outputPath);
+
+            HadoopUtil.deletePath(job.getConfiguration(), outputPath);
+
+            return waitForCompletion(job);
+        } catch (Exception e) {
+            logger.error("error in CuboidJob", e);
+            printUsage(options);
+            throw e;
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        InMemCuboidFromBaseCuboidJob job = new InMemCuboidFromBaseCuboidJob();
+        int exitCode = ToolRunner.run(job, args);
+        System.exit(exitCode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
new file mode 100644
index 0000000..73a39d6
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
+import org.apache.kylin.cube.inmemcubing.InputConverterUnitForBaseCuboid;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class InMemCuboidFromBaseCuboidMapper
+        extends InMemCuboidMapperBase<Text, Text, ByteArrayWritable, ByteArrayWritable, ByteArray> {
+    private static final Log logger = LogFactory.getLog(InMemCuboidFromBaseCuboidMapper.class);
+
+    private ByteBuffer keyValueBuffer;
+    private int keyOffset;
+
+    @Override
+    protected void doSetup(Mapper.Context context) throws IOException {
+        super.doSetup(context);
+
+        long baseCuboid = Cuboid.getBaseCuboidId(cubeDesc);
+        GTInfo gtInfo = CubeGridTable.newGTInfo(Cuboid.findForMandatory(cubeDesc, baseCuboid),
+                new CubeDimEncMap(cubeDesc, dictionaryMap));
+        keyValueBuffer = ByteBuffer.allocate(gtInfo.getMaxRecordLength());
+        keyOffset = cubeSegment.getRowKeyPreambleSize();
+    }
+
+    @Override
+    protected InputConverterUnit<ByteArray> getInputConverterUnit() {
+        return new InputConverterUnitForBaseCuboid();
+    }
+
+    @Override
+    protected Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap,
+            int reserveMemoryMB, CuboidScheduler cuboidScheduler, InputConverterUnit<ByteArray> inputConverterUnit) {
+        AbstractInMemCubeBuilder cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap);
+        cubeBuilder.setReserveMemoryMB(reserveMemoryMB);
+        cubeBuilder.setConcurrentThreads(taskThreadCount);
+
+        ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
+                .setNameFormat("inmemory-cube-building-from-base-cuboid-mapper-%d").build());
+        return executorService.submit(cubeBuilder.buildAsRunnable(queue, inputConverterUnit,
+                new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
+    }
+
+    @Override
+    protected ByteArray getRecordFromKeyValue(Text key, Text value) {
+        keyValueBuffer.clear();
+        keyValueBuffer.put(key.getBytes(), keyOffset, key.getBytes().length - keyOffset);
+        keyValueBuffer.put(value.getBytes());
+
+        byte[] keyValue = new byte[keyValueBuffer.position()];
+        System.arraycopy(keyValueBuffer.array(), 0, keyValue, 0, keyValueBuffer.position());
+
+        return new ByteArray(keyValue);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidReducer.java
new file mode 100644
index 0000000..fbc45d9
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidReducer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+public class InMemCuboidFromBaseCuboidReducer extends InMemCuboidReducer {
+    //pass
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 73a2eb9..b0ea7b7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -93,7 +93,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             setJobClasspath(job, cube.getConfig());
 
             // add metadata to distributed cache
-            attachSegmentMetadataWithDict(segment, job.getConfiguration());
+            attachSegmentMetadataWithAll(segment, job.getConfiguration());
 
             // set job configuration
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
@@ -116,7 +116,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
 
             // set output
             IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(segment).getOuputFormat();
-            outputFormat.configureJobOutput(job, output, segment, 0);
+            outputFormat.configureJobOutput(job, output, segment, segment.getCuboidScheduler(), 0);
 
             return waitForCompletion(job);
         } finally {

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
index 43f95e5..7b4738b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
@@ -33,8 +33,6 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.cuboid.DefaultCuboidScheduler;
-import org.apache.kylin.cube.cuboid.TreeCuboidSchedulerManager;
 import org.apache.kylin.cube.inmemcubing.ConsumeBlockingQueueController;
 import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -43,7 +41,7 @@ import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CuboidStatsReaderUtil;
+import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
@@ -114,11 +112,7 @@ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T>
         }
 
         String cuboidModeName = conf.get(BatchConstants.CFG_CUBOID_MODE);
-        CuboidScheduler cuboidScheduler = TreeCuboidSchedulerManager.getTreeCuboidScheduler(cubeDesc, //
-                CuboidStatsReaderUtil.readCuboidStatsFromSegment(cube.getCuboidsByMode(cuboidModeName), cubeSegment));
-        if (cuboidScheduler == null) {
-            cuboidScheduler = new DefaultCuboidScheduler(cubeDesc);
-        }
+        CuboidScheduler cuboidScheduler = CuboidSchedulerUtil.getCuboidSchedulerByMode(cubeSegment, cuboidModeName);
 
         taskThreadCount = config.getCubeAlgorithmInMemConcurrentThreads();
         reserveMemoryMB = calculateReserveMB(conf);

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
index 2058bc9..60d0870 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
@@ -95,7 +95,7 @@ public abstract class KVGTRecordWriter implements ICuboidWriter {
     protected abstract void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException;
 
     private void initVariables(Long cuboidId) {
-        rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeSegment, cuboidId));
+        rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findForMandatory(cubeDesc, cuboidId));
         keyBuf = rowKeyEncoder.createBuf();
 
         dimensions = Long.bitCount(cuboidId);

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 5a72faf..4f68293 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -91,7 +91,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
     protected void doSetup(Context context) throws IOException, InterruptedException {
         super.bindCurrentConfiguration(context.getConfiguration());
 
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
         segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
 
         config = AbstractHadoopJob.loadKylinPropsAndMetadata();

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
new file mode 100644
index 0000000..e97c6bb
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+public class MergeStatisticsWithOldStep extends AbstractExecutable {
+
+    private static final Logger logger = LoggerFactory.getLogger(MergeStatisticsWithOldStep.class);
+
+    protected Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+
+    public MergeStatisticsWithOldStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        final CubeManager mgr = CubeManager.getInstance(context.getConfig());
+        final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+        final CubeSegment optimizeSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+        final String statsInputPath = CubingExecutableUtil.getStatisticsPath(this.getParams());
+
+        CubeSegment oldSegment = optimizeSegment.getCubeInstance().getOriginalSegmentToOptimize(optimizeSegment);
+        Preconditions.checkNotNull(oldSegment,
+                "cannot find the original segment to be optimized by " + optimizeSegment);
+
+        KylinConfig kylinConf = cube.getConfig();
+        Configuration conf = HadoopUtil.getCurrentConfiguration();
+        ResourceStore rs = ResourceStore.getStore(kylinConf);
+        int averageSamplingPercentage = 0;
+
+        try {
+            //1. Add statistics from optimized segment
+            Path statisticsFilePath = new Path(statsInputPath,
+                    BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+            FileSystem hdfs = FileSystem.get(conf);
+            if (!hdfs.exists(statisticsFilePath))
+                throw new IOException("File " + statisticsFilePath + " does not exists");
+
+            CubeStatsReader optimizeSegmentStatsReader = new CubeStatsReader(optimizeSegment, null,
+                    optimizeSegment.getConfig(), statisticsFilePath);
+            averageSamplingPercentage += optimizeSegmentStatsReader.getSamplingPercentage();
+            addFromCubeStatsReader(optimizeSegmentStatsReader);
+
+            //2. Add statistics from old segment
+            CubeStatsReader oldSegmentStatsReader = new CubeStatsReader(oldSegment, null, oldSegment.getConfig());
+            averageSamplingPercentage += oldSegmentStatsReader.getSamplingPercentage();
+            addFromCubeStatsReader(oldSegmentStatsReader);
+
+            logger.info("Cuboid set with stats info: " + cuboidHLLMap.keySet().toString());
+            //3. Store merged statistics for recommend cuboids
+            averageSamplingPercentage = averageSamplingPercentage / 2;
+            Set<Long> cuboidsRecommend = cube.getCuboidsRecommend();
+
+            Map<Long, HLLCounter> resultCuboidHLLMap = Maps.newHashMapWithExpectedSize(cuboidsRecommend.size());
+            for (Long cuboid : cuboidsRecommend) {
+                HLLCounter hll = cuboidHLLMap.get(cuboid);
+                if (hll == null) {
+                    logger.warn("Cannot get the row count stats for cuboid " + cuboid);
+                } else {
+                    resultCuboidHLLMap.put(cuboid, hll);
+                }
+            }
+
+            String resultDir = CubingExecutableUtil.getMergedStatisticsPath(this.getParams());
+            CubeStatsWriter.writeCuboidStatistics(conf, new Path(resultDir), resultCuboidHLLMap,
+                    averageSamplingPercentage);
+
+            try (FSDataInputStream mergedStats = hdfs
+                    .open(new Path(resultDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME))) {
+                // put the statistics to metadata store
+                String statisticsFileName = optimizeSegment.getStatisticsResourcePath();
+                rs.putResource(statisticsFileName, mergedStats, System.currentTimeMillis());
+            }
+
+            //By default, the cube optimization will use in-memory cubing
+            CubingJob cubingJob = (CubingJob) getManager()
+                    .getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
+            StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, optimizeSegment);
+
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+        } catch (IOException e) {
+            logger.error("fail to merge cuboid statistics", e);
+            return ExecuteResult.createError(e);
+        }
+
+    }
+
+    private void addFromCubeStatsReader(CubeStatsReader cubeStatsReader) {
+        for (Map.Entry<Long, HLLCounter> entry : cubeStatsReader.getCuboidRowHLLCounters().entrySet()) {
+            if (cuboidHLLMap.get(entry.getKey()) != null) {
+                cuboidHLLMap.get(entry.getKey()).merge(entry.getValue());
+            } else {
+                cuboidHLLMap.put(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index 9f2a6ad..359bd4c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -35,6 +35,7 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil;
 import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,8 +51,8 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
     private Text outputKey = new Text();
     private String cubeName;
     private String segmentID;
-    private CubeSegment cubeSegment;
     private CubeDesc cubeDesc;
+    private CubeSegment cubeSegment;
     private CuboidScheduler cuboidScheduler;
 
     private int handleCounter;
@@ -65,17 +66,18 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
     protected void doSetup(Context context) throws IOException {
         super.bindCurrentConfiguration(context.getConfiguration());
 
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
         segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
+        String cuboidModeName = context.getConfiguration().get(BatchConstants.CFG_CUBOID_MODE);
 
         KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
-        cubeSegment = cube.getSegmentById(segmentID);
         cubeDesc = cube.getDescriptor();
+        cubeSegment = cube.getSegmentById(segmentID);
         ndCuboidBuilder = new NDCuboidBuilder(cubeSegment);
         // initialize CubiodScheduler
-        cuboidScheduler = cubeSegment.getCuboidScheduler();
+        cuboidScheduler = CuboidSchedulerUtil.getCuboidSchedulerByMode(cubeSegment, cuboidModeName);
         rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
     }
 
@@ -104,7 +106,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         }
 
         for (Long child : myChildren) {
-            Cuboid childCuboid = Cuboid.findById(cuboidScheduler, child);
+            Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, child);
             Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
             outputKey.set(result.getSecond().array(), 0, result.getFirst());
             context.write(outputKey, value);