You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by nj...@apache.org on 2017/12/02 17:23:57 UTC

[04/19] kylin git commit: APACHE-KYLIN-2733: Introduce optimize job for adjusting cuboid set

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java
deleted file mode 100644
index 5c0555a..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *  
- *     http://www.apache.org/licenses/LICENSE-2.0
- *  
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.common.CubeStatsReader;
-import org.apache.kylin.job.exception.JobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ReducerNumSizing {
-
-    private static final Logger logger = LoggerFactory.getLogger(ReducerNumSizing.class);
-
-    public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, double totalMapInputMB, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
-        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
-        KylinConfig kylinConfig = cubeDesc.getConfig();
-
-        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
-        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
-        logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " + level);
-
-        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, kylinConfig);
-
-        double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
-
-        if (level == -1) {
-            //merge case
-            double estimatedSize = cubeStatsReader.estimateCubeSize();
-            adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize;
-            logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize, totalMapInputMB, adjustedCurrentLayerSizeEst);
-        } else if (level == 0) {
-            //base cuboid case TODO: the estimation could be very WRONG because it has no correction
-            adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
-            logger.debug("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
-        } else {
-            parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
-            currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
-            adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
-            logger.debug("totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
-        }
-
-        // number of reduce tasks
-        int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99);
-
-        // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
-        if (cubeDesc.hasMemoryHungryMeasures()) {
-            logger.debug("Multiply reducer num by 4 to boost performance for memory hungry measures");
-            numReduceTasks = numReduceTasks * 4;
-        }
-
-        // at least 1 reducer by default
-        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
-        // no more than 500 reducer by default
-        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-
-        return numReduceTasks;
-    }
-
-    public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg) throws IOException {
-        KylinConfig kylinConfig = cubeSeg.getConfig();
-
-        Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, kylinConfig).getCuboidSizeMap();
-        double totalSizeInM = 0;
-        for (Double cuboidSize : cubeSizeMap.values()) {
-            totalSizeInM += cuboidSize;
-        }
-
-        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
-
-        // number of reduce tasks
-        int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB);
-
-        // at least 1 reducer by default
-        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
-        // no more than 500 reducer by default
-        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-
-        logger.info("Having total map input MB " + Math.round(totalSizeInM));
-        logger.info("Having per reduce MB " + perReduceInputMB);
-        logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
-        return numReduceTasks;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
new file mode 100644
index 0000000..ed61b4a
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.CuboidStatsReaderUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class UpdateCubeInfoAfterCheckpointStep extends AbstractExecutable {
+
+    private static final Logger logger = LoggerFactory.getLogger(UpdateCubeInfoAfterCheckpointStep.class);
+
+    public UpdateCubeInfoAfterCheckpointStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
+        final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+
+        Set<Long> recommendCuboids = cube.getCuboidsRecommend();
+        try {
+            List<CubeSegment> newSegments = cube.getSegments(SegmentStatusEnum.READY_PENDING);
+            Map<Long, Long> recommendCuboidsWithStats = CuboidStatsReaderUtil
+                    .readCuboidStatsFromSegments(recommendCuboids, newSegments);
+            if (recommendCuboidsWithStats == null) {
+                throw new RuntimeException("Fail to get statistics info for recommended cuboids after optimization!!!");
+            }
+            cubeManager.promoteCheckpointOptimizeSegments(cube, recommendCuboidsWithStats,
+                    newSegments.toArray(new CubeSegment[newSegments.size()]));
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+        } catch (Exception e) {
+            logger.error("fail to update cube after build", e);
+            return ExecuteResult.createError(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
new file mode 100644
index 0000000..13c4f40
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class UpdateCubeInfoAfterOptimizeStep extends AbstractExecutable {
+
+    private static final Logger logger = LoggerFactory.getLogger(UpdateCubeInfoAfterOptimizeStep.class);
+
+    public UpdateCubeInfoAfterOptimizeStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
+        final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+        final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+
+        CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(segment);
+        long sourceCount = originalSegment.getInputRecords();
+        long sourceSizeBytes = originalSegment.getInputRecordsSize();
+
+        CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
+        long cubeSizeBytes = cubingJob.findCubeSizeBytes();
+
+        segment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
+        segment.setLastBuildTime(System.currentTimeMillis());
+        segment.setSizeKB(cubeSizeBytes / 1024);
+        segment.setInputRecords(sourceCount);
+        segment.setInputRecordsSize(sourceSizeBytes);
+
+        try {
+            cubeManager.promoteNewlyOptimizeSegments(cube, segment);
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+        } catch (IOException e) {
+            logger.error("fail to update cube after build", e);
+            return ExecuteResult.createError(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java
new file mode 100644
index 0000000..0cd7264
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class UpdateOldCuboidShardJob extends AbstractHadoopJob {
+
+    private static final Logger logger = LoggerFactory.getLogger(UpdateOldCuboidShardJob.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_SEGMENT_ID);
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_OUTPUT_PATH);
+            parseOptions(options, args);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+            String segmentID = getOptionValue(OPTION_SEGMENT_ID);
+            Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            CubeSegment optSegment = cube.getSegmentById(segmentID);
+            CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment);
+
+            logger.info("Starting: " + job.getJobName());
+
+            setJobClasspath(job, cube.getConfig());
+
+            // Mapper
+            job.setMapperClass(UpdateOldCuboidShardMapper.class);
+
+            // Reducer
+            job.setNumReduceTasks(0);
+
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(Text.class);
+
+            // Input
+            job.setInputFormatClass(SequenceFileInputFormat.class);
+            FileInputFormat.setInputPaths(job, input);
+            // Output
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+            FileOutputFormat.setOutputPath(job, output);
+
+            // set job configuration
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
+            // add metadata to distributed cache
+            attachSegmentsMetadataWithDict(Lists.newArrayList(optSegment, originalSegment), job.getConfiguration());
+
+            this.deletePath(job.getConfiguration(), output);
+
+            return waitForCompletion(job);
+        } catch (Exception e) {
+            logger.error("error in CuboidJob", e);
+            printUsage(options);
+            throw e;
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
new file mode 100644
index 0000000..58b553e
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase;
+import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowKeyEncoder;
+import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateOldCuboidShardMapper extends KylinMapper<Text, Text, Text, Text> {
+
+    private static final Logger logger = LoggerFactory.getLogger(UpdateOldCuboidShardMapper.class);
+
+    private MultipleOutputs mos;
+    private long baseCuboid;
+
+    private CubeDesc cubeDesc;
+    private RowKeySplitter rowKeySplitter;
+    private RowKeyEncoderProvider rowKeyEncoderProvider;
+
+    private Text outputKey = new Text();
+    private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];
+    private ByteArray newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
+
+    @Override
+    protected void doSetup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+        mos = new MultipleOutputs(context);
+
+        String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
+        String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
+
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+        CubeSegment cubeSegment = cube.getSegmentById(segmentID);
+        CubeSegment oldSegment = cube.getOriginalSegmentToOptimize(cubeSegment);
+
+        cubeDesc = cube.getDescriptor();
+        baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
+
+        rowKeySplitter = new RowKeySplitter(oldSegment, 65, 256);
+        rowKeyEncoderProvider = new RowKeyEncoderProvider(cubeSegment);
+    }
+
+    @Override
+    public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
+        long cuboidID = rowKeySplitter.split(key.getBytes());
+
+        Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID);
+        int fullKeySize = buildKey(cuboid, rowKeySplitter.getSplitBuffers());
+        outputKey.set(newKeyBuf.array(), 0, fullKeySize);
+
+        String baseOutputPath = PathNameCuboidOld;
+        if (cuboidID == baseCuboid) {
+            baseOutputPath = PathNameCuboidBase;
+        }
+        mos.write(outputKey, value, generateFileName(baseOutputPath));
+    }
+
+    private int buildKey(Cuboid cuboid, SplittedBytes[] splitBuffers) {
+        RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
+
+        int startIdx = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId
+        int endIdx = startIdx + Long.bitCount(cuboid.getId());
+        int offset = 0;
+        for (int i = startIdx; i < endIdx; i++) {
+            System.arraycopy(splitBuffers[i].value, 0, newKeyBodyBuf, offset, splitBuffers[i].length);
+            offset += splitBuffers[i].length;
+        }
+
+        int fullKeySize = rowkeyEncoder.getBytesLength();
+        while (newKeyBuf.array().length < fullKeySize) {
+            newKeyBuf = new ByteArray(newKeyBuf.length() * 2);
+        }
+        newKeyBuf.setLength(fullKeySize);
+
+        rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), newKeyBuf);
+
+        return fullKeySize;
+    }
+
+    @Override
+    public void doCleanup(Context context) throws IOException, InterruptedException {
+        mos.close();
+
+        Path outputDirBase = new Path(context.getConfiguration().get(FileOutputFormat.OUTDIR), PathNameCuboidBase);
+        FileSystem fs = FileSystem.get(context.getConfiguration());
+        if (!fs.exists(outputDirBase)) {
+            fs.mkdirs(outputDirBase);
+            SequenceFile
+                    .createWriter(context.getConfiguration(),
+                            SequenceFile.Writer.file(new Path(outputDirBase, "part-m-00000")),
+                            SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class))
+                    .close();
+        }
+    }
+
+    private String generateFileName(String subDir) {
+        return subDir + "/part";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 06cc988..bdf0633 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -233,7 +233,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
 
         IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getOuputFormat();
-        outputFormat.configureJobOutput(job, cuboidOutputPath, cubeSeg, level);
+        outputFormat.configureJobOutput(job, cuboidOutputPath, cubeSeg, cubeSeg.getCuboidScheduler(), level);
 
         prepareOutput(rdd, kylinConfig, cubeSeg, level).mapToPair(
                 new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index ad00706..77bd498 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -38,6 +38,7 @@ import org.apache.kylin.dimension.DimensionEncodingFactory;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.model.SegmentRange;
@@ -51,6 +52,7 @@ import org.apache.kylin.rest.exception.NotFoundException;
 import org.apache.kylin.rest.request.CubeRequest;
 import org.apache.kylin.rest.request.JobBuildRequest;
 import org.apache.kylin.rest.request.JobBuildRequest2;
+import org.apache.kylin.rest.request.JobOptimizeRequest;
 import org.apache.kylin.rest.response.EnvelopeResponse;
 import org.apache.kylin.rest.response.GeneralResponse;
 import org.apache.kylin.rest.response.HBaseResponse;
@@ -362,6 +364,67 @@ public class CubeController extends BasicController {
         }
     }
 
+    /**
+     * Send a optimize cube job
+     *
+     * @param cubeName Cube ID
+     * @return JobInstance of CheckpointExecutable
+     */
+    @RequestMapping(value = "/{cubeName}/optimize", method = { RequestMethod.PUT })
+    @ResponseBody
+    public JobInstance optimize(@PathVariable String cubeName, @RequestBody JobOptimizeRequest jobOptimizeRequest) {
+        try {
+            String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
+            CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
+
+            if (cube == null) {
+                throw new InternalErrorException("Cannot find cube " + cubeName);
+            }
+            logger.info("cuboid recommend:" + jobOptimizeRequest.getCuboidsRecommend());
+            return jobService.submitOptimizeJob(cube, jobOptimizeRequest.getCuboidsRecommend(), submitter).getFirst();
+        } catch (BadRequestException e) {
+            logger.error(e.getLocalizedMessage(), e);
+            throw e;
+        } catch (JobException e) {
+            logger.error(e.getLocalizedMessage(), e);
+            throw new BadRequestException(e.getLocalizedMessage());
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage(), e);
+            throw new InternalErrorException(e.getLocalizedMessage());
+        }
+    }
+
+    /**
+     * Send a optimize cube segment job
+     *
+     * @param cubeName Cube ID
+     * @param segmentID for segment to be optimized
+     */
+    @RequestMapping(value = "/{cubeName}/recover_segment_optimize/{segmentID}", method = { RequestMethod.PUT })
+    @ResponseBody
+    public JobInstance recoverSegmentOptimize(@PathVariable String cubeName, @PathVariable String segmentID) {
+        try {
+            String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
+            CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
+            if (cube == null) {
+                throw new InternalErrorException("Cannot find cube " + cubeName);
+            }
+
+            CubeSegment segment = cube.getSegmentById(segmentID);
+            if (segment == null) {
+                throw new InternalErrorException("Cannot find segment '" + segmentID + "'");
+            }
+
+            return jobService.submitRecoverSegmentOptimizeJob(segment, submitter);
+        } catch (JobException e) {
+            logger.error(e.getLocalizedMessage(), e);
+            throw new BadRequestException(e.getLocalizedMessage());
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage(), e);
+            throw new InternalErrorException(e.getLocalizedMessage());
+        }
+    }
+
     @RequestMapping(value = "/{cubeName}/disable", method = { RequestMethod.PUT }, produces = { "application/json" })
     @ResponseBody
     public CubeInstance disableCube(@PathVariable String cubeName) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/server-base/src/main/java/org/apache/kylin/rest/request/JobOptimizeRequest.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/JobOptimizeRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/JobOptimizeRequest.java
new file mode 100644
index 0000000..51e8e7c
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/JobOptimizeRequest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.request;
+
+import java.util.Set;
+
+public class JobOptimizeRequest {
+
+    private Set<Long> cuboidsRecommend;
+
+    public Set<Long> getCuboidsRecommend() {
+        return cuboidsRecommend;
+    }
+
+    public void setCuboidsRecommend(Set<Long> cuboidsRecommend) {
+        this.cuboidsRecommend = cuboidsRecommend;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index a18aaf3..d5805a1 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -72,6 +72,8 @@ import org.springframework.stereotype.Component;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 /**
  * Stateless & lightweight service facade of cube management functions.
@@ -525,6 +527,8 @@ public class CubeService extends BasicService implements InitializingBean {
 
         CubeUpdate update = new CubeUpdate(cube);
         update.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
+        update.setCuboids(Maps.<Long, Long> newHashMap());
+        update.setCuboidsRecommend(Sets.<Long> newHashSet());
         CubeManager.getInstance(getConfig()).updateCube(update);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 529f3b8..9f2d0d9 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -34,11 +34,13 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.directory.api.util.Strings;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.BatchOptimizeJobCheckpointBuilder;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.common.JobInfoConverter;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
@@ -48,6 +50,7 @@ import org.apache.kylin.job.SchedulerFactory;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobTimeFilterEnum;
 import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.job.exception.SchedulerException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.CheckpointExecutable;
@@ -72,6 +75,7 @@ import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.annotation.EnableAspectJAutoProxy;
+import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.stereotype.Component;
 
 import com.google.common.base.Function;
@@ -273,6 +277,137 @@ public class JobService extends BasicService implements InitializingBean {
         return jobInstance;
     }
 
+    public Pair<JobInstance, List<JobInstance>> submitOptimizeJob(CubeInstance cube, Set<Long> cuboidsRecommend,
+            String submitter) throws IOException, JobException {
+
+        Pair<JobInstance, List<JobInstance>> result = submitOptimizeJobInternal(cube, cuboidsRecommend, submitter);
+        accessService.init(result.getFirst(), null);
+        accessService.inherit(result.getFirst(), cube);
+        for (JobInstance jobInstance : result.getSecond()) {
+            accessService.init(jobInstance, null);
+            accessService.inherit(jobInstance, cube);
+        }
+
+        return result;
+    }
+
+    private Pair<JobInstance, List<JobInstance>> submitOptimizeJobInternal(CubeInstance cube,
+            Set<Long> cuboidsRecommend, String submitter) throws IOException {
+        Message msg = MsgPicker.getMsg();
+
+        if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
+            throw new BadRequestException(String.format(msg.getBUILD_BROKEN_CUBE(), cube.getName()));
+        }
+
+        checkCubeDescSignature(cube);
+        checkAllowOptimization(cube, cuboidsRecommend);
+
+        CubeSegment[] optimizeSegments = null;
+        try {
+            /** Add optimize segments */
+            optimizeSegments = getCubeManager().optimizeSegments(cube, cuboidsRecommend);
+            List<JobInstance> optimizeJobInstances = Lists.newLinkedList();
+
+            /** Add optimize jobs */
+            List<AbstractExecutable> optimizeJobList = Lists.newArrayListWithExpectedSize(optimizeSegments.length);
+            for (CubeSegment optimizeSegment : optimizeSegments) {
+                DefaultChainedExecutable optimizeJob = EngineFactory.createBatchOptimizeJob(optimizeSegment, submitter);
+                getExecutableManager().addJob(optimizeJob);
+
+                optimizeJobList.add(optimizeJob);
+                optimizeJobInstances.add(getSingleJobInstance(optimizeJob));
+            }
+
+            /** Add checkpoint job for batch jobs */
+            CheckpointExecutable checkpointJob = new BatchOptimizeJobCheckpointBuilder(cube, submitter).build();
+            checkpointJob.addTaskListForCheck(optimizeJobList);
+
+            getExecutableManager().addJob(checkpointJob);
+
+            return new Pair(getCheckpointJobInstance(checkpointJob), optimizeJobInstances);
+        } catch (Exception e) {
+            if (optimizeSegments != null) {
+                logger.error("Job submission might failed for NEW segments {}, will clean the NEW segments from cube",
+                        optimizeSegments);
+                try {
+                    // Remove this segments
+                    CubeUpdate cubeBuilder = new CubeUpdate(cube);
+                    cubeBuilder.setToRemoveSegs(optimizeSegments);
+                    getCubeManager().updateCube(cubeBuilder);
+                } catch (Exception ee) {
+                    // swallow the exception
+                    logger.error("Clean New segments failed, ignoring it", e);
+                }
+            }
+            throw e;
+        }
+    }
+
+    public JobInstance submitRecoverSegmentOptimizeJob(CubeSegment segment, String submitter)
+            throws IOException, JobException {
+        CubeInstance cubeInstance = segment.getCubeInstance();
+
+        checkCubeDescSignature(cubeInstance);
+
+        String cubeName = cubeInstance.getName();
+        List<JobInstance> jobInstanceList = searchJobsByCubeName(cubeName, null,
+                Lists.newArrayList(JobStatusEnum.NEW, JobStatusEnum.PENDING, JobStatusEnum.ERROR),
+                JobTimeFilterEnum.ALL, JobSearchMode.CHECKPOINT_ONLY);
+        if (jobInstanceList.size() > 1) {
+            throw new IllegalStateException("Exist more than one CheckpointExecutable for cube " + cubeName);
+        } else if (jobInstanceList.size() == 0) {
+            throw new IllegalStateException("There's no CheckpointExecutable for cube " + cubeName);
+        }
+        CheckpointExecutable checkpointExecutable = (CheckpointExecutable) getExecutableManager()
+                .getJob(jobInstanceList.get(0).getId());
+
+        AbstractExecutable toBeReplaced = null;
+        for (AbstractExecutable taskForCheck : checkpointExecutable.getSubTasksForCheck()) {
+            if (taskForCheck instanceof CubingJob) {
+                CubingJob subCubingJob = (CubingJob) taskForCheck;
+                String segmentName = CubingExecutableUtil.getSegmentName(subCubingJob.getParams());
+                if (segmentName != null && segmentName.equals(segment.getName())) {
+                    String segmentID = CubingExecutableUtil.getSegmentId(subCubingJob.getParams());
+                    CubeSegment beingOptimizedSegment = cubeInstance.getSegmentById(segmentID);
+                    if (beingOptimizedSegment != null) { // beingOptimizedSegment exists & should not be recovered
+                        throw new IllegalStateException("Segment " + beingOptimizedSegment.getName() + "-"
+                                + beingOptimizedSegment.getUuid()
+                                + " still exists. Please delete it or discard the related optimize job first!!!");
+                    }
+                    toBeReplaced = taskForCheck;
+                    break;
+                }
+            }
+        }
+        if (toBeReplaced == null) {
+            throw new IllegalStateException("There's no CubingJob for segment " + segment.getName()
+                    + " in CheckpointExecutable " + checkpointExecutable.getName());
+        }
+
+        /** Add CubingJob for the related segment **/
+        CubeSegment optimizeSegment = getCubeManager().appendSegment(cubeInstance, segment.getTSRange());
+        CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
+        cubeBuilder.setToAddSegs(optimizeSegment);
+        getCubeManager().updateCube(cubeBuilder);
+
+        DefaultChainedExecutable optimizeJob = EngineFactory.createBatchOptimizeJob(optimizeSegment, submitter);
+
+        getExecutableManager().addJob(optimizeJob);
+
+        JobInstance optimizeJobInstance = getSingleJobInstance(optimizeJob);
+        accessService.init(optimizeJobInstance, null);
+        accessService.inherit(optimizeJobInstance, cubeInstance);
+
+        /** Update the checkpoint job */
+        checkpointExecutable.getSubTasksForCheck().set(checkpointExecutable.getSubTasksForCheck().indexOf(toBeReplaced),
+                optimizeJob);
+
+        getExecutableManager().updateCheckpointJob(checkpointExecutable.getId(),
+                checkpointExecutable.getSubTasksForCheck());
+
+        return optimizeJobInstance;
+    }
+
     private void checkCubeDescSignature(CubeInstance cube) {
         Message msg = MsgPicker.getMsg();
 
@@ -281,8 +416,25 @@ public class JobService extends BasicService implements InitializingBean {
                     String.format(msg.getINCONSISTENT_CUBE_DESC_SIGNATURE(), cube.getDescriptor()));
     }
 
+    private void checkAllowOptimization(CubeInstance cube, Set<Long> cuboidsRecommend) {
+        long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
+        if (!cuboidsRecommend.contains(baseCuboid)) {
+            throw new BadRequestException("The recommend cuboids should contain the base cuboid " + baseCuboid);
+        }
+        Set<Long> currentCuboidSet = cube.getCuboidScheduler().getAllCuboidIds();
+        if (currentCuboidSet.equals(cuboidsRecommend)) {
+            throw new BadRequestException(
+                    "The recommend cuboids are the same as the current cuboids. It's no need to do optimization.");
+        }
+    }
+
     public JobInstance getJobInstance(String uuid) {
-        return getSingleJobInstance(getExecutableManager().getJob(uuid));
+        AbstractExecutable job = getExecutableManager().getJob(uuid);
+        if (job instanceof CheckpointExecutable) {
+            return getCheckpointJobInstance(job);
+        } else {
+            return getSingleJobInstance(job);
+        }
     }
 
     public Output getOutput(String id) {
@@ -362,21 +514,90 @@ public class JobService extends BasicService implements InitializingBean {
             getExecutableManager().discardJob(job.getId());
             return job;
         }
-        CubeInstance cubeInstance = getCubeManager().getCube(job.getRelatedCube());
+
+        logger.info("Cancel job [" + job.getId() + "] trigger by "
+                + SecurityContextHolder.getContext().getAuthentication().getName());
+        if (job.getStatus() == JobStatusEnum.FINISHED) {
+            throw new IllegalStateException(
+                    "The job " + job.getId() + " has already been finished and cannot be discarded.");
+        }
+        if (job.getStatus() == JobStatusEnum.DISCARDED) {
+            return job;
+        }
+
+        AbstractExecutable executable = getExecutableManager().getJob(job.getId());
+        if (executable instanceof CubingJob) {
+            cancelCubingJobInner((CubingJob) executable);
+        } else if (executable instanceof CheckpointExecutable) {
+            cancelCheckpointJobInner((CheckpointExecutable) executable);
+        } else {
+            getExecutableManager().discardJob(executable.getId());
+        }
+        return job;
+    }
+
+    private void cancelCubingJobInner(CubingJob cubingJob) throws IOException {
+        CubeInstance cubeInstance = getCubeManager().getCube(CubingExecutableUtil.getCubeName(cubingJob.getParams()));
         // might not a cube job
-        final String segmentIds = job.getRelatedSegment();
-        for (String segmentId : StringUtils.split(segmentIds)) {
-            final CubeSegment segment = cubeInstance.getSegmentById(segmentId);
-            if (segment != null && (segment.getStatus() == SegmentStatusEnum.NEW || segment.getTSRange().end.v == 0)) {
-                // Remove this segments
+        final String segmentIds = CubingExecutableUtil.getSegmentId(cubingJob.getParams());
+        if (!StringUtils.isEmpty(segmentIds)) {
+            List<CubeSegment> toRemoveSegments = Lists.newLinkedList();
+            for (String segmentId : StringUtils.split(segmentIds)) {
+                final CubeSegment segment = cubeInstance.getSegmentById(segmentId);
+                if (segment != null
+                        && (segment.getStatus() == SegmentStatusEnum.NEW || segment.getTSRange().end.v == 0)) {
+                    // Remove this segment
+                    toRemoveSegments.add(segment);
+                }
+            }
+            if (!toRemoveSegments.isEmpty()) {
                 CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
-                cubeBuilder.setToRemoveSegs(segment);
+                cubeBuilder.setToRemoveSegs(toRemoveSegments.toArray(new CubeSegment[toRemoveSegments.size()]));
                 getCubeManager().updateCube(cubeBuilder);
             }
         }
-        getExecutableManager().discardJob(job.getId());
+        getExecutableManager().discardJob(cubingJob.getId());
+    }
 
-        return job;
+    private void cancelCheckpointJobInner(CheckpointExecutable checkpointExecutable) throws IOException {
+        List<String> segmentIdList = Lists.newLinkedList();
+        List<String> jobIdList = Lists.newLinkedList();
+        jobIdList.add(checkpointExecutable.getId());
+        setRelatedIdList(checkpointExecutable, segmentIdList, jobIdList);
+
+        CubeInstance cubeInstance = getCubeManager()
+                .getCube(CubingExecutableUtil.getCubeName(checkpointExecutable.getParams()));
+        if (!segmentIdList.isEmpty()) {
+            List<CubeSegment> toRemoveSegments = Lists.newLinkedList();
+            for (String segmentId : segmentIdList) {
+                final CubeSegment segment = cubeInstance.getSegmentById(segmentId);
+                if (segment != null && segment.getStatus() != SegmentStatusEnum.READY) {
+                    toRemoveSegments.add(segment);
+                }
+            }
+
+            CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
+            cubeBuilder.setToRemoveSegs(toRemoveSegments.toArray(new CubeSegment[toRemoveSegments.size()]));
+            cubeBuilder.setCuboidsRecommend(Sets.<Long> newHashSet()); //Set recommend cuboids to be null
+            getCubeManager().updateCube(cubeBuilder);
+        }
+
+        for (String jobId : jobIdList) {
+            getExecutableManager().discardJob(jobId);
+        }
+    }
+
+    private void setRelatedIdList(CheckpointExecutable checkpointExecutable, List<String> segmentIdList,
+            List<String> jobIdList) {
+        for (AbstractExecutable taskForCheck : checkpointExecutable.getSubTasksForCheck()) {
+            jobIdList.add(taskForCheck.getId());
+            if (taskForCheck instanceof CubingJob) {
+                segmentIdList.addAll(Lists
+                        .newArrayList(StringUtils.split(CubingExecutableUtil.getSegmentId(taskForCheck.getParams()))));
+            } else if (taskForCheck instanceof CheckpointExecutable) {
+                setRelatedIdList((CheckpointExecutable) taskForCheck, segmentIdList, jobIdList);
+            }
+        }
     }
 
     public JobInstance pauseJob(JobInstance job) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index c41df06..838112f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
@@ -67,6 +68,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
     CubeInstance cube = null;
     CubeDesc cubeDesc = null;
     String segmentID = null;
+    String cuboidModeName = null;
     KylinConfig kylinConfig;
     Path partitionFilePath;
 
@@ -78,6 +80,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         options.addOption(OPTION_SEGMENT_ID);
         options.addOption(OPTION_PARTITION_FILE_PATH);
         options.addOption(OPTION_STATISTICS_ENABLED);
+        options.addOption(OPTION_CUBOID_MODE);
         parseOptions(options, args);
 
         partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
@@ -89,13 +92,27 @@ public class CreateHTableJob extends AbstractHadoopJob {
         cubeDesc = cube.getDescriptor();
         kylinConfig = cube.getConfig();
         segmentID = getOptionValue(OPTION_SEGMENT_ID);
+        cuboidModeName = getOptionValue(OPTION_CUBOID_MODE);
         CubeSegment cubeSegment = cube.getSegmentById(segmentID);
 
         Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
 
         byte[][] splitKeys;
         if (statsEnabled) {
-            final Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, kylinConfig).getCuboidSizeMap();
+            Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, null, kylinConfig).getCuboidSizeMap();
+            Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName);
+            if (buildingCuboids != null && !buildingCuboids.isEmpty()) {
+                Map<Long, Double> optimizedCuboidSizeMap = Maps.newHashMapWithExpectedSize(buildingCuboids.size());
+                for (Long cuboid : buildingCuboids) {
+                    Double cuboidSize = cuboidSizeMap.get(cuboid);
+                    if (cuboidSize == null) {
+                        logger.warn(cuboid + "cuboid's size is null will replace by 0");
+                        cuboidSize = 0.0;
+                    }
+                    optimizedCuboidSizeMap.put(cuboid, cuboidSize);
+                }
+                cuboidSizeMap = optimizedCuboidSizeMap;
+            }
             splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment, partitionFilePath.getParent());
         } else {
             splitKeys = getRegionSplits(conf, partitionFilePath);

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 31cb189..db3f7f4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -31,13 +31,15 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.engine.mr.IMROutput2;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.MapReduceUtil;
 import org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper;
 import org.apache.kylin.engine.mr.steps.InMemCuboidMapper;
 import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
 import org.apache.kylin.engine.mr.steps.NDCuboidMapper;
-import org.apache.kylin.engine.mr.steps.ReducerNumSizing;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -93,13 +95,15 @@ public class HBaseMROutput2Transition implements IMROutput2 {
         }
 
         @Override
-        public void configureJobOutput(Job job, String output, CubeSegment segment, int level) throws Exception {
+        public void configureJobOutput(Job job, String output, CubeSegment segment, CuboidScheduler cuboidScheduler,
+                int level) throws Exception {
             int reducerNum = 1;
             Class mapperClass = job.getMapperClass();
             if (mapperClass == HiveToBaseCuboidMapper.class || mapperClass == NDCuboidMapper.class) {
-                reducerNum = ReducerNumSizing.getLayeredCubingReduceTaskNum(segment, AbstractHadoopJob.getTotalMapInputMB(job), level);
+                reducerNum = MapReduceUtil.getLayeredCubingReduceTaskNum(segment, cuboidScheduler,
+                        AbstractHadoopJob.getTotalMapInputMB(job), level);
             } else if (mapperClass == InMemCuboidMapper.class) {
-                reducerNum = ReducerNumSizing.getInmemCubingReduceTaskNum(segment);
+                reducerNum = MapReduceUtil.getInmemCubingReduceTaskNum(segment, cuboidScheduler);
             }
             Path outputPath = new Path(output);
             FileOutputFormat.setOutputPath(job, outputPath);
@@ -149,7 +153,8 @@ public class HBaseMROutput2Transition implements IMROutput2 {
 
         @Override
         public void configureJobOutput(Job job, String output, CubeSegment segment) throws Exception {
-            int reducerNum = ReducerNumSizing.getLayeredCubingReduceTaskNum(segment, AbstractHadoopJob.getTotalMapInputMB(job), -1);
+            int reducerNum = MapReduceUtil.getLayeredCubingReduceTaskNum(segment, segment.getCuboidScheduler(),
+                    AbstractHadoopJob.getTotalMapInputMB(job), -1);
             job.setNumReduceTasks(reducerNum);
 
             Path outputPath = new Path(output);
@@ -185,4 +190,30 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
         }
     }
+
+    public IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide(final CubeSegment seg) {
+        return new IMRBatchOptimizeOutputSide2() {
+            HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+            @Override
+            public void addStepPhase2_CreateHTable(DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId(), CuboidModeEnum.RECOMMEND));
+            }
+
+            @Override
+            public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
+                jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+            }
+
+            public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+                steps.addOptimizeGarbageCollectionSteps(jobFlow);
+            }
+
+            @Override
+            public void addStepPhase5_Cleanup(DefaultChainedExecutable jobFlow) {
+                steps.addCheckpointGarbageCollectionSteps(jobFlow);
+            }
+        };
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 4ea9027..67c94ad 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
@@ -32,6 +33,7 @@ import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 
 import com.google.common.base.Preconditions;
@@ -82,7 +84,15 @@ public class HBaseMRSteps extends JobBuilderSupport {
         return createCreateHTableStep(jobId, true);
     }
 
+    public HadoopShellExecutable createCreateHTableStepWithStats(String jobId, CuboidModeEnum cuboidMode) {
+        return createCreateHTableStep(jobId, true, cuboidMode);
+    }
+
     private HadoopShellExecutable createCreateHTableStep(String jobId, boolean withStats) {
+        return createCreateHTableStep(jobId, withStats, CuboidModeEnum.CURRENT);
+    }
+
+    private HadoopShellExecutable createCreateHTableStep(String jobId, boolean withStats, CuboidModeEnum cuboidMode) {
         HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
         createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
         StringBuilder cmd = new StringBuilder();
@@ -90,6 +100,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
         appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
         appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
         appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString());
 
         createHtableStep.setJobParams(cmd.toString());
         createHtableStep.setJobClass(CreateHTableJob.class);
@@ -167,6 +178,35 @@ public class HBaseMRSteps extends JobBuilderSupport {
         return result;
     }
 
+    public MergeGCStep createOptimizeGCStep() {
+        MergeGCStep result = new MergeGCStep();
+        result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+        result.setOldHTables(getOptimizeHTables());
+        return result;
+    }
+
+    public List<CubeSegment> getOptimizeSegments() {
+        CubeInstance cube = (CubeInstance) seg.getRealization();
+        List<CubeSegment> newSegments = Lists.newArrayList(cube.getSegments(SegmentStatusEnum.READY_PENDING));
+        List<CubeSegment> oldSegments = Lists.newArrayListWithExpectedSize(newSegments.size());
+        for (CubeSegment segment : newSegments) {
+            oldSegments.add(cube.getOriginalSegmentToOptimize(segment));
+        }
+        return oldSegments;
+    }
+
+    public List<String> getOptimizeHTables() {
+        return getOldHTables(getOptimizeSegments());
+    }
+
+    public List<String> getOldHTables(final List<CubeSegment> oldSegments) {
+        final List<String> oldHTables = Lists.newArrayListWithExpectedSize(oldSegments.size());
+        for (CubeSegment segment : oldSegments) {
+            oldHTables.add(segment.getStorageLocationIdentifier());
+        }
+        return oldHTables;
+    }
+
     public List<String> getMergingHTables() {
         final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg);
         Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg);
@@ -187,6 +227,18 @@ public class HBaseMRSteps extends JobBuilderSupport {
         return mergingHDFSPaths;
     }
 
+    public List<String> getOptimizeHDFSPaths() {
+        return getOldHDFSPaths(getOptimizeSegments());
+    }
+
+    public List<String> getOldHDFSPaths(final List<CubeSegment> oldSegments) {
+        final List<String> oldHDFSPaths = Lists.newArrayListWithExpectedSize(oldSegments.size());
+        for (CubeSegment oldSegment : oldSegments) {
+            oldHDFSPaths.add(getJobWorkingDir(oldSegment.getLastBuildJobID()));
+        }
+        return oldHDFSPaths;
+    }
+
     public String getHFilePath(String jobId) {
         return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/hfile/");
     }
@@ -195,6 +247,36 @@ public class HBaseMRSteps extends JobBuilderSupport {
         return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/rowkey_stats");
     }
 
+    public void addOptimizeGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
+        String jobId = jobFlow.getId();
+
+        List<String> toDeletePaths = new ArrayList<>();
+        toDeletePaths.add(getOptimizationRootPath(jobId));
+
+        HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
+        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
+        step.setDeletePaths(toDeletePaths);
+        step.setJobId(jobId);
+
+        jobFlow.addTask(step);
+    }
+
+    public void addCheckpointGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
+        String jobId = jobFlow.getId();
+
+        jobFlow.addTask(createOptimizeGCStep());
+
+        List<String> toDeletePaths = new ArrayList<>();
+        toDeletePaths.addAll(getOptimizeHDFSPaths());
+
+        HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
+        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
+        step.setDeletePaths(toDeletePaths);
+        step.setJobId(jobId);
+
+        jobFlow.addTask(step);
+    }
+
     public void addMergingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
         String jobId = jobFlow.getId();