You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/27 11:21:36 UTC

[34/52] [abbrv] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
deleted file mode 100644
index bc6ee1f..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-public interface IMROutput {
-
-    /** Return a helper to participate in batch cubing job flow. */
-    public IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg);
-    
-    /**
-     * Participate the batch cubing flow as the output side. Responsible for saving
-     * the cuboid output to storage (Phase 3).
-     * 
-     * - Phase 1: Create Flat Table
-     * - Phase 2: Build Dictionary
-     * - Phase 3: Build Cube
-     * - Phase 4: Update Metadata & Cleanup
-     */
-    public interface IMRBatchCubingOutputSide {
-        
-        /**
-         * Add step that saves cuboid output from HDFS to storage.
-         * 
-         * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn", 
-         * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
-         * dictionary encoding; Mx is measure value serialization form.
-         */
-        public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
-        
-        /** Add step that does any necessary clean up. */
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
-    }
-    
-    /** Return a helper to participate in batch merge job flow. */
-    public IMRBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg);
-    
-    /**
-     * Participate the batch cubing flow as the output side. Responsible for saving
-     * the cuboid output to storage (Phase 2).
-     * 
-     * - Phase 1: Merge Dictionary
-     * - Phase 2: Merge Cube
-     * - Phase 3: Update Metadata & Cleanup
-     */
-    public interface IMRBatchMergeOutputSide {
-        
-        /**
-         * Add step that saves cuboid output from HDFS to storage.
-         * 
-         * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn", 
-         * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
-         * dictionary encoding; Mx is measure value serialization form.
-         */
-        public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
-        
-        /** Add step that does any necessary clean up. */
-        public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
deleted file mode 100644
index 974e2fc..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package org.apache.kylin.engine.mr;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-public interface IMROutput2 {
-
-    /** Return a helper to participate in batch cubing job flow. */
-    public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg);
-
-    /**
-     * Participate the batch cubing flow as the output side.
-     * 
-     * - Phase 1: Create Flat Table
-     * - Phase 2: Build Dictionary
-     * - Phase 3: Build Cube (with StorageOutputFormat)
-     * - Phase 4: Update Metadata & Cleanup
-     */
-    public interface IMRBatchCubingOutputSide2 {
-
-        public IMRStorageOutputFormat getStorageOutputFormat();
-
-        /** Add step that executes after build dictionary and before build cube. */
-        public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow);
-
-        /** Add step that executes after build cube. */
-        public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
-
-        /** Add step that does any necessary clean up. */
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
-    }
-
-    public IMRBatchMergeInputSide2 getBatchMergeInputSide(CubeSegment seg);
-
-    public interface IMRBatchMergeInputSide2 {
-        public IMRStorageInputFormat getStorageInputFormat();
-    }
-
-    @SuppressWarnings("rawtypes")
-    public interface IMRStorageInputFormat {
-        
-        public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException;
-        
-        public CubeSegment findSourceSegment(Mapper.Context context, CubeInstance cubeInstance) throws IOException;
-        
-        public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue);
-    }
-
-    /** Return a helper to participate in batch merge job flow. */
-    public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(CubeSegment seg);
-
-    /**
-     * Participate the batch merge flow as the output side.
-     * 
-     * - Phase 1: Merge Dictionary
-     * - Phase 2: Merge Cube (with StorageInputFormat & StorageOutputFormat)
-     * - Phase 3: Update Metadata & Cleanup
-     */
-    public interface IMRBatchMergeOutputSide2 {
-
-        public IMRStorageOutputFormat getStorageOutputFormat();
-
-        /** Add step that executes after merge dictionary and before merge cube. */
-        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
-
-        /** Add step that executes after merge cube. */
-        public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow);
-
-        /** Add step that does any necessary clean up. */
-        public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
-    }
-
-    @SuppressWarnings("rawtypes")
-    public interface IMRStorageOutputFormat {
-        public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException;
-        
-        public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
deleted file mode 100644
index 3217e4b..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
-import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
-import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
-import org.apache.kylin.job.common.HadoopShellExecutable;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
-import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Hold reusable steps for builders.
- */
-public class JobBuilderSupport {
-
-    final protected JobEngineConfig config;
-    final protected CubeSegment seg;
-    final protected String submitter;
-
-    public JobBuilderSupport(CubeSegment seg, String submitter) {
-        Preconditions.checkNotNull(seg, "segment cannot be null");
-        this.config = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
-        this.seg = seg;
-        this.submitter = submitter;
-    }
-    
-    public MapReduceExecutable createFactDistinctColumnsStep(String jobId) {
-        return createFactDistinctColumnsStep(jobId, false);
-    }
-    
-    public MapReduceExecutable createFactDistinctColumnsStepWithStats(String jobId) {
-        return createFactDistinctColumnsStep(jobId, true);
-    }
-    
-    private MapReduceExecutable createFactDistinctColumnsStep(String jobId, boolean withStats) {
-        MapReduceExecutable result = new MapReduceExecutable();
-        result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
-        result.setMapReduceJobClass(FactDistinctColumnsJob.class);
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId));
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
-        appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
-        appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
-
-        result.setMapReduceParams(cmd.toString());
-        return result;
-    }
-
-    public HadoopShellExecutable createBuildDictionaryStep(String jobId) {
-        // base cuboid job
-        HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
-        buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", getFactDistinctColumnsPath(jobId));
-
-        buildDictionaryStep.setJobParams(cmd.toString());
-        buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
-        return buildDictionaryStep;
-    }
-
-    public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) {
-        final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
-        updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
-        updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
-        updateCubeInfoStep.setSegmentId(seg.getUuid());
-        updateCubeInfoStep.setCubingJobId(jobId);
-        return updateCubeInfoStep;
-    }
-
-    public MergeDictionaryStep createMergeDictionaryStep(List<String> mergingSegmentIds) {
-        MergeDictionaryStep result = new MergeDictionaryStep();
-        result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
-        return result;
-    }
-    
-    public UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, String jobId) {
-        UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
-        result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
-        result.setCubingJobId(jobId);
-        return result;
-    }
-
-    // ============================================================================
-
-    public String getJobWorkingDir(String jobId) {
-        return getJobWorkingDir(config, jobId);
-    }
-    
-    public String getCuboidRootPath(String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
-    }
-    
-    public String getCuboidRootPath(CubeSegment seg) {
-        return getCuboidRootPath(seg.getLastBuildJobID());
-    }
-    
-    public void appendMapReduceParameters(StringBuilder buf, CubeSegment seg) {
-        try {
-            String jobConf = config.getHadoopJobConfFilePath(seg.getCubeDesc().getModel().getCapacity());
-            if (jobConf != null && jobConf.length() > 0) {
-                buf.append(" -conf ").append(jobConf);
-            }
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-    
-    public String getFactDistinctColumnsPath(String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
-    }
-
-
-    public String getStatisticsPath(String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/statistics";
-    }
-
-    // ============================================================================
-    // static methods also shared by other job flow participant
-    // ----------------------------------------------------------------------------
-
-    public static String getJobWorkingDir(JobEngineConfig conf, String jobId) {
-        return conf.getHdfsWorkingDirectory() + "/" + "kylin-" + jobId;
-    }
-
-    public static StringBuilder appendExecCmdParameters(StringBuilder buf, String paraName, String paraValue) {
-        return buf.append(" -").append(paraName).append(" ").append(paraValue);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
deleted file mode 100644
index 61328c9..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.IBatchCubingEngine;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-public class MRBatchCubingEngine implements IBatchCubingEngine {
-
-    @Override
-    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
-        return new BatchCubingJobBuilder(newSegment, submitter).build();
-    }
-
-    @Override
-    public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
-        return new BatchMergeJobBuilder(mergeSegment, submitter).build();
-    }
-    
-    @Override
-    public Class<?> getSourceInterface() {
-        return IMRInput.class;
-    }
-
-    @Override
-    public Class<?> getStorageInterface() {
-        return IMROutput.class;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
deleted file mode 100644
index 57ec128..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.IBatchCubingEngine;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-public class MRBatchCubingEngine2 implements IBatchCubingEngine {
-
-    @Override
-    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
-        return new BatchCubingJobBuilder2(newSegment, submitter).build();
-    }
-
-    @Override
-    public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
-        return new BatchMergeJobBuilder2(mergeSegment, submitter).build();
-    }
-    
-    @Override
-    public Class<?> getSourceInterface() {
-        return IMRInput.class;
-    }
-
-    @Override
-    public Class<?> getStorageInterface() {
-        return IMROutput2.class;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
deleted file mode 100644
index dc0533e..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package org.apache.kylin.engine.mr;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
-import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
-import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
-import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeInputSide2;
-import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.TableSourceFactory;
-import org.apache.kylin.storage.StorageFactory2;
-
-public class MRUtil {
-
-    public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
-        return TableSourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(seg);
-    }
-
-    public static IMRTableInputFormat getTableInputFormat(String tableName) {
-        return getTableInputFormat(getTableDesc(tableName));
-    }
-
-    public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
-        return TableSourceFactory.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc);
-    }
-
-    private static TableDesc getTableDesc(String tableName) {
-        return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName);
-    }
-
-    public static IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
-        return StorageFactory2.createEngineAdapter(seg, IMROutput.class).getBatchCubingOutputSide(seg);
-    }
-
-    public static IMRBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg) {
-        return StorageFactory2.createEngineAdapter(seg, IMROutput.class).getBatchMergeOutputSide(seg);
-    }
-
-    public static IMRBatchCubingOutputSide2 getBatchCubingOutputSide2(CubeSegment seg) {
-        return StorageFactory2.createEngineAdapter(seg, IMROutput2.class).getBatchCubingOutputSide(seg);
-    }
-    
-    public static IMRBatchMergeInputSide2 getBatchMergeInputSide2(CubeSegment seg) {
-        return StorageFactory2.createEngineAdapter(seg, IMROutput2.class).getBatchMergeInputSide(seg);
-    }
-    
-    public static IMRBatchMergeOutputSide2 getBatchMergeOutputSide2(CubeSegment seg) {
-        return StorageFactory2.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
deleted file mode 100644
index aae5d89..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class InMemCuboidJob extends AbstractHadoopJob {
-
-    protected static final Logger logger = LoggerFactory.getLogger(InMemCuboidJob.class);
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_JOB_FLOW_ID);
-            options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_SEGMENT_NAME);
-            parseOptions(options, args);
-
-            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-            String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
-
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-            CubeManager cubeMgr = CubeManager.getInstance(config);
-            CubeInstance cube = cubeMgr.getCube(cubeName);
-            CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-
-            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-            logger.info("Starting: " + job.getJobName());
-            
-            setJobClasspath(job);
-            
-            // add metadata to distributed cache
-            attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
-            // set job configuration
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
-            long timeout = 1000*60*60L; // 1 hour
-            job.getConfiguration().set("mapred.task.timeout", String.valueOf(timeout));
-            
-            // set input
-            IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
-            flatTableInputFormat.configureJob(job);
-            
-            // set mapper
-            job.setMapperClass(InMemCuboidMapper.class);
-            job.setMapOutputKeyClass(ByteArrayWritable.class);
-            job.setMapOutputValueClass(ByteArrayWritable.class);
-            
-            // set output
-            IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
-            storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
-            
-            return waitForCompletion(job);
-        } catch (Exception e) {
-            logger.error("error in CuboidJob", e);
-            printUsage(options);
-            throw e;
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        InMemCuboidJob job = new InMemCuboidJob();
-        int exitCode = ToolRunner.run(job, args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
deleted file mode 100644
index ff6ffe5..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.inmemcubing.DoggedCubeBuilder;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Maps;
-
-/**
- */
-public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArrayWritable, ByteArrayWritable> {
-
-    private static final Log logger = LogFactory.getLog(InMemCuboidMapper.class);
-    private CubeInstance cube;
-    private CubeDesc cubeDesc;
-    private CubeSegment cubeSegment;
-    private IMRTableInputFormat flatTableInputFormat;
-
-    private int counter;
-    private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(10000);
-    private Future<?> future;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        Configuration conf = context.getConfiguration();
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-        String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
-        cube = CubeManager.getInstance(config).getCube(cubeName);
-        cubeDesc = cube.getDescriptor();
-        String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
-        cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-        flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
-
-        Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
-
-        for (DimensionDesc dim : cubeDesc.getDimensions()) {
-            // dictionary
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (cubeDesc.getRowkey().isUseDictionary(col)) {
-                    Dictionary<?> dict = cubeSegment.getDictionary(col);
-                    if (dict == null) {
-                        logger.warn("Dictionary for " + col + " was not found.");
-                    }
-
-                    dictionaryMap.put(col, cubeSegment.getDictionary(col));
-                }
-            }
-        }
-
-        DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
-
-    }
-
-    @Override
-    public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
-        // put each row to the queue
-        String[] row = flatTableInputFormat.parseMapperInput(record);
-        List<String> rowAsList = Arrays.asList(row);
-        
-        while (!future.isDone()) {
-            if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
-                counter++;
-                if (counter % BatchConstants.COUNTER_MAX == 0) {
-                    logger.info("Handled " + counter + " records!");
-                }
-                break;
-            }
-        }
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        logger.info("Totally handled " + counter + " records!");
-
-        while (!future.isDone()) {
-            if (queue.offer(new ArrayList<String>(0), 1, TimeUnit.SECONDS)) {
-                break;
-            }
-        }
-        
-        try {
-            future.get();
-        } catch (Exception e) {
-            throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
-        }
-        queue.clear();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
deleted file mode 100644
index 45da2c8..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.measure.MeasureAggregators;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArrayWritable, Object, Object> {
-
-    private static final Logger logger = LoggerFactory.getLogger(InMemCuboidReducer.class);
-
-    private IMRStorageOutputFormat storageOutputFormat;
-    private MeasureCodec codec;
-    private MeasureAggregators aggs;
-
-    private int counter;
-    private Object[] input;
-    private Object[] result;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-        
-        String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
-        boolean isMerge = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_IS_MERGE));
-
-        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
-        CubeDesc cubeDesc = cube.getDescriptor();
-        CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-        if (isMerge)
-            storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat();
-        else
-            storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
-
-        List<MeasureDesc> measuresDescs = Lists.newArrayList();
-        for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                for (MeasureDesc measure : colDesc.getMeasures()) {
-                    measuresDescs.add(measure);
-                }
-            }
-        }
-
-        codec = new MeasureCodec(measuresDescs);
-        aggs = new MeasureAggregators(measuresDescs);
-
-        input = new Object[measuresDescs.size()];
-        result = new Object[measuresDescs.size()];
-    }
-
-    @Override
-    public void reduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException {
-
-        aggs.reset();
-
-        for (ByteArrayWritable value : values) {
-            codec.decode(value.asBuffer(), input);
-            aggs.aggregate(input);
-        }
-        aggs.collectStates(result);
-        
-        storageOutputFormat.doReducerOutput(key, result, context);
-        
-        counter++;
-        if (counter % BatchConstants.COUNTER_MAX == 0) {
-            logger.info("Handled " + counter + " records!");
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
deleted file mode 100644
index f2a5fcf..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
+++ /dev/null
@@ -1,96 +0,0 @@
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.MapContext;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.job.inmemcubing.ICuboidWriter;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-
-/**
- */
-public class MapContextGTRecordWriter implements ICuboidWriter {
-
-    private static final Log logger = LogFactory.getLog(MapContextGTRecordWriter.class);
-    protected MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext;
-    private Long lastCuboidId;
-    protected CubeSegment cubeSegment;
-    protected CubeDesc cubeDesc;
-
-    private int bytesLength;
-    private int dimensions;
-    private int measureCount;
-    private byte[] keyBuf;
-    private int[] measureColumnsIndex;
-    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-    private ByteArrayWritable outputKey = new ByteArrayWritable();
-    private ByteArrayWritable outputValue = new ByteArrayWritable();
-    private long cuboidRowCount = 0;
-
-    public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
-        this.mapContext = mapContext;
-        this.cubeDesc = cubeDesc;
-        this.cubeSegment = cubeSegment;
-        this.measureCount = cubeDesc.getMeasures().size();
-
-    }
-
-    @Override
-    public void write(long cuboidId, GTRecord record) throws IOException {
-
-        if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
-            // output another cuboid
-            initVariables(cuboidId);
-            if (lastCuboidId != null) {
-                logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows");
-                cuboidRowCount = 0;
-            }
-        }
-
-        cuboidRowCount++;
-        int offSet = RowConstants.ROWKEY_CUBOIDID_LEN;
-        for (int x = 0; x < dimensions; x++) {
-            System.arraycopy(record.get(x).array(), record.get(x).offset(), keyBuf, offSet, record.get(x).length());
-            offSet += record.get(x).length();
-        }
-
-        //output measures
-        valueBuf.clear();
-        record.exportColumns(measureColumnsIndex, valueBuf);
-
-        outputKey.set(keyBuf, 0, offSet);
-        outputValue.set(valueBuf.array(), 0, valueBuf.position());
-        try {
-            mapContext.write(outputKey, outputValue);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private void initVariables(Long cuboidId) {
-        bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN;
-        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
-        for (TblColRef column : cuboid.getColumns()) {
-            bytesLength += cubeSegment.getColumnLength(column);
-        }
-
-        keyBuf = new byte[bytesLength];
-        dimensions = BitSet.valueOf(new long[]{cuboidId}).cardinality();
-        measureColumnsIndex = new int[measureCount];
-        for (int i = 0; i < measureCount; i++) {
-            measureColumnsIndex[i] = dimensions + i;
-        }
-
-        System.arraycopy(Bytes.toBytes(cuboidId), 0, keyBuf, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
deleted file mode 100644
index 3d3b1f4..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.cube.CuboidJob;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-/**
- * @author shaoshi
- */
-public class MergeCuboidFromStorageJob extends CuboidJob {
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_JOB_FLOW_ID);
-            options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_SEGMENT_NAME);
-            parseOptions(options, args);
-
-            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-            String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-            CubeManager cubeMgr = CubeManager.getInstance(config);
-            CubeInstance cube = cubeMgr.getCube(cubeName);
-            CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-
-            Configuration conf = this.getConf();
-            HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
-
-            // start job
-            String jobName = getOptionValue(OPTION_JOB_NAME);
-            System.out.println("Starting: " + jobName);
-            job = Job.getInstance(conf, jobName);
-
-            setJobClasspath(job);
-            
-            // add metadata to distributed cache
-            attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
-            // set job configuration
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
-            job.getConfiguration().set(BatchConstants.CFG_IS_MERGE, "true");
-
-            // configure mapper input
-            IMRStorageInputFormat storageInputFormat = MRUtil.getBatchMergeInputSide2(cubeSeg).getStorageInputFormat();
-            storageInputFormat.configureInput(MergeCuboidFromStorageMapper.class, ByteArrayWritable.class, ByteArrayWritable.class, job);
-
-            // configure reducer output
-            IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat();
-            storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
-            
-            return waitForCompletion(job);
-        } catch (Exception e) {
-            logger.error("error in MergeCuboidFromHBaseJob", e);
-            printUsage(options);
-            throw e;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
deleted file mode 100644
index 1162f14..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.common.RowKeySplitter;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * @author shaoshi
- */
-public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, ByteArrayWritable, ByteArrayWritable> {
-
-    private static final Logger logger = LoggerFactory.getLogger(MergeCuboidFromStorageMapper.class);
-    
-    private KylinConfig config;
-    private String cubeName;
-    private String segmentName;
-    private CubeManager cubeManager;
-    private CubeInstance cube;
-    private CubeDesc cubeDesc;
-    private CubeSegment mergedCubeSegment;
-    private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle
-    private IMRStorageInputFormat storageInputFormat;
-
-    private ByteArrayWritable outputKey = new ByteArrayWritable();
-    private byte[] newKeyBuf;
-    private RowKeySplitter rowKeySplitter;
-
-    private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
-
-    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-    private MeasureCodec codec;
-    private ByteArrayWritable outputValue = new ByteArrayWritable();
-
-    private Boolean checkNeedMerging(TblColRef col) throws IOException {
-        Boolean ret = dictsNeedMerging.get(col);
-        if (ret != null)
-            return ret;
-        else {
-            ret = cubeDesc.getRowkey().isUseDictionary(col);
-            if (ret) {
-                String dictTable = (String) DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
-                ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
-            }
-            dictsNeedMerging.put(col, ret);
-            return ret;
-        }
-    }
-
-    @Override
-    protected void setup(Context context) throws IOException, InterruptedException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-        config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
-
-        cubeManager = CubeManager.getInstance(config);
-        cube = cubeManager.getCube(cubeName);
-        cubeDesc = cube.getDescriptor();
-        mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-        storageInputFormat = MRUtil.getBatchMergeInputSide2(mergedCubeSegment).getStorageInputFormat();
-
-        newKeyBuf = new byte[256];// size will auto-grow
-
-        sourceCubeSegment = storageInputFormat.findSourceSegment(context, cube);
-        logger.info(sourceCubeSegment.toString());
-
-        this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
-
-        List<MeasureDesc> measuresDescs = Lists.newArrayList();
-        for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                for (MeasureDesc measure : colDesc.getMeasures()) {
-                    measuresDescs.add(measure);
-                }
-            }
-        }
-        codec = new MeasureCodec(measuresDescs);
-    }
-
-    @Override
-    public void map(Object inKey, Object inValue, Context context) throws IOException, InterruptedException {
-        Pair<ByteArrayWritable, Object[]> pair = storageInputFormat.parseMapperInput(inKey, inValue);
-        ByteArrayWritable key = pair.getFirst();
-        Object[] value = pair.getSecond();
-        
-        Preconditions.checkState(key.offset() == 0);
-        
-        long cuboidID = rowKeySplitter.split(key.array(), key.length());
-        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
-
-        SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
-        int bufOffset = 0;
-        BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
-        bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
-
-        for (int i = 0; i < cuboid.getColumns().size(); ++i) {
-            TblColRef col = cuboid.getColumns().get(i);
-
-            if (this.checkNeedMerging(col)) {
-                // if dictionary on fact table column, needs rewrite
-                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-                Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
-                Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
-
-                while (sourceDict.getSizeOfValue() > newKeyBuf.length - bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBuf;
-                    newKeyBuf = new byte[2 * newKeyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
-                }
-
-                int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value, 0, splittedByteses[i + 1].length);
-
-                int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset);
-                int idInMergedDict;
-                if (size < 0) {
-                    idInMergedDict = mergedDict.nullId();
-                } else {
-                    idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size);
-                }
-                BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId());
-
-                bufOffset += mergedDict.getSizeOfId();
-            } else {
-                // keep as it is
-                while (splittedByteses[i + 1].length > newKeyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBuf;
-                    newKeyBuf = new byte[2 * newKeyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
-                }
-
-                System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, bufOffset, splittedByteses[i + 1].length);
-                bufOffset += splittedByteses[i + 1].length;
-            }
-        }
-        byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset);
-        outputKey.set(newKey, 0, newKey.length);
-
-        valueBuf.clear();
-        codec.encode(value, valueBuf);
-        outputValue.set(valueBuf.array(), 0, valueBuf.position());
-        
-        context.write(outputKey, outputValue);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
deleted file mode 100644
index d99cb03..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import java.io.IOException;
-import java.util.*;
-
-public class MergeDictionaryStep extends AbstractExecutable {
-
-    private static final String CUBE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
-
-    public MergeDictionaryStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        KylinConfig conf = context.getConfig();
-        final CubeManager mgr = CubeManager.getInstance(conf);
-        final CubeInstance cube = mgr.getCube(getCubeName());
-        final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
-        final List<CubeSegment> mergingSegments = getMergingSegments(cube);
-        
-        Collections.sort(mergingSegments);
-        
-        try {
-            checkLookupSnapshotsMustIncremental(mergingSegments);
-            
-            makeDictForNewSegment(conf, cube, newSegment, mergingSegments);
-            makeSnapshotForNewSegment(cube, newSegment, mergingSegments);
-
-            CubeUpdate cubeBuilder = new CubeUpdate(cube);
-            cubeBuilder.setToUpdateSegs(newSegment);
-            mgr.updateCube(cubeBuilder);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to merge dictionary or lookup snapshots", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-    
-    private List<CubeSegment> getMergingSegments(CubeInstance cube) {
-        List<String> mergingSegmentIds = getMergingSegmentIds();
-        List<CubeSegment> result = Lists.newArrayListWithCapacity(mergingSegmentIds.size());
-        for (String id : mergingSegmentIds) {
-            result.add(cube.getSegmentById(id));
-        }
-        return result;
-    }
-
-    private void checkLookupSnapshotsMustIncremental(List<CubeSegment> mergingSegments) {
-
-        // FIXME check each newer snapshot has only NEW rows but no MODIFIED rows
-    }
-
-    /**
-     * For the new segment, we need to create dictionaries for it, too. For
-     * those dictionaries on fact table, create it by merging underlying
-     * dictionaries For those dictionaries on lookup table, just copy it from
-     * any one of the merging segments, it's guaranteed to be consistent(checked
-     * in CubeSegmentValidator)
-     *
-     * @param cube
-     * @param newSeg
-     * @throws IOException
-     */
-    private void makeDictForNewSegment(KylinConfig conf, CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) throws IOException {
-        HashSet<TblColRef> colsNeedMeringDict = new HashSet<TblColRef>();
-        HashSet<TblColRef> colsNeedCopyDict = new HashSet<TblColRef>();
-        DictionaryManager dictMgr = DictionaryManager.getInstance(conf);
-
-        CubeDesc cubeDesc = cube.getDescriptor();
-        for (DimensionDesc dim : cubeDesc.getDimensions()) {
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (newSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
-                    String dictTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
-                    if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
-                        colsNeedMeringDict.add(col);
-                    } else {
-                        colsNeedCopyDict.add(col);
-                    }
-                }
-            }
-        }
-
-        for (TblColRef col : colsNeedMeringDict) {
-            logger.info("Merging fact table dictionary on : " + col);
-            List<DictionaryInfo> dictInfos = new ArrayList<DictionaryInfo>();
-            for (CubeSegment segment : mergingSegments) {
-                logger.info("Including fact table dictionary of segment : " + segment);
-                if (segment.getDictResPath(col) != null) {
-                    DictionaryInfo dictInfo = dictMgr.getDictionaryInfo(segment.getDictResPath(col));
-                    dictInfos.add(dictInfo);
-                }
-            }
-            mergeDictionaries(dictMgr, newSeg, dictInfos, col);
-        }
-
-        for (TblColRef col : colsNeedCopyDict) {
-            String path = mergingSegments.get(0).getDictResPath(col);
-            newSeg.putDictResPath(col, path);
-        }
-    }
-
-    private DictionaryInfo mergeDictionaries(DictionaryManager dictMgr, CubeSegment cubeSeg, List<DictionaryInfo> dicts, TblColRef col) throws IOException {
-        DictionaryInfo dictInfo = dictMgr.mergeDictionary(dicts);
-        if (dictInfo != null)
-            cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
-
-        return dictInfo;
-    }
-
-    /**
-     * make snapshots for the new segment by copying from one of the underlying
-     * merging segments. it's guaranteed to be consistent(checked in
-     * CubeSegmentValidator)
-     *
-     * @param cube
-     * @param newSeg
-     */
-    private void makeSnapshotForNewSegment(CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) {
-        CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1);
-        for (Map.Entry<String, String> entry : lastSeg.getSnapshots().entrySet()) {
-            newSeg.putSnapshotResPath(entry.getKey(), entry.getValue());
-        }
-    }
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setMergingSegmentIds(List<String> ids) {
-        setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
-    }
-
-    private List<String> getMergingSegmentIds() {
-        final String ids = getParam(MERGING_SEGMENT_IDS);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id: splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
deleted file mode 100644
index 8bd6ea2..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class MergeStatisticsStep extends AbstractExecutable {
-
-    private static final String CUBE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String MERGING_SEGMENT_IS = "mergingSegmentIds";
-    private static final String MERGED_STATISTICS_PATH = "mergedStatisticsPath";
-    protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap();
-
-    public MergeStatisticsStep() {
-        super();
-    }
-
-    @Override
-    @SuppressWarnings("deprecation")
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        KylinConfig kylinConf = context.getConfig();
-        final CubeManager mgr = CubeManager.getInstance(kylinConf);
-        final CubeInstance cube = mgr.getCube(getCubeName());
-        final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
-
-        Configuration conf = new Configuration();
-        ResourceStore rs = ResourceStore.getStore(kylinConf);
-        try {
-
-            int averageSamplingPercentage = 0;
-            for (String segmentId : this.getMergingSegmentIds()) {
-                String fileKey = CubeSegment.getStatisticsResourcePath(getCubeName(), segmentId);
-                InputStream is = rs.getResource(fileKey);
-                File tempFile = null;
-                FileOutputStream tempFileStream = null;
-                try {
-                    tempFile = File.createTempFile(segmentId, ".seq");
-                    tempFileStream = new FileOutputStream(tempFile);
-                    org.apache.commons.io.IOUtils.copy(is, tempFileStream);
-                } finally {
-                    IOUtils.closeStream(is);
-                    IOUtils.closeStream(tempFileStream);
-                }
-
-                FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
-                SequenceFile.Reader reader = null;
-                try {
-                    reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
-                    LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-                    BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-                    while (reader.next(key, value)) {
-                        if (key.get() == 0l) {
-                            // sampling percentage;
-                            averageSamplingPercentage += Bytes.toInt(value.getBytes());
-                        } else {
-                            HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
-                            ByteArray byteArray = new ByteArray(value.getBytes());
-                            hll.readRegisters(byteArray.asBuffer());
-
-                            if (cuboidHLLMap.get(key.get()) != null) {
-                                cuboidHLLMap.get(key.get()).merge(hll);
-                            } else {
-                                cuboidHLLMap.put(key.get(), hll);
-                            }
-                        }
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    throw e;
-                } finally {
-                    IOUtils.closeStream(reader);
-                }
-            }
-            averageSamplingPercentage = averageSamplingPercentage / this.getMergingSegmentIds().size();
-            FactDistinctColumnsReducer.writeCuboidStatistics(conf, new Path(getMergedStatisticsPath()), cuboidHLLMap, averageSamplingPercentage);
-            Path statisticsFilePath = new Path(getMergedStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
-            FileSystem fs = statisticsFilePath.getFileSystem(conf);
-            FSDataInputStream is = fs.open(statisticsFilePath);
-            try {
-                // put the statistics to metadata store
-                String statisticsFileName = newSegment.getStatisticsResourcePath();
-                rs.putResource(statisticsFileName, is, System.currentTimeMillis());
-            } finally {
-                IOUtils.closeStream(is);
-            }
-
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to merge cuboid statistics", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setMergingSegmentIds(List<String> ids) {
-        setParam(MERGING_SEGMENT_IS, StringUtils.join(ids, ","));
-    }
-
-    private List<String> getMergingSegmentIds() {
-        final String ids = getParam(MERGING_SEGMENT_IS);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id : splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-    public void setMergedStatisticsPath(String path) {
-        setParam(MERGED_STATISTICS_PATH, path);
-    }
-
-    private String getMergedStatisticsPath() {
-        return getParam(MERGED_STATISTICS_PATH);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
deleted file mode 100644
index 14eef1a..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-/**
- * Save the cube segment statistic to Kylin metadata store
- *
- */
-public class SaveStatisticsStep extends AbstractExecutable {
-
-    private static final String CUBE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String STATISTICS_PATH = "statisticsPath";
-
-    public SaveStatisticsStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        KylinConfig kylinConf = context.getConfig();
-        final CubeManager mgr = CubeManager.getInstance(kylinConf);
-        final CubeInstance cube = mgr.getCube(getCubeName());
-        final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
-
-        ResourceStore rs = ResourceStore.getStore(kylinConf);
-        try {
-            Path statisticsFilePath = new Path(getStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
-            FileSystem fs = FileSystem.get(HadoopUtil.getCurrentConfiguration());
-            if (!fs.exists(statisticsFilePath))
-                throw new IOException("File " + statisticsFilePath + " does not exists;");
-
-            FSDataInputStream is = fs.open(statisticsFilePath);
-            try {
-                // put the statistics to metadata store
-                String statisticsFileName = newSegment.getStatisticsResourcePath();
-                rs.putResource(statisticsFileName, is, System.currentTimeMillis());
-                fs.delete(statisticsFilePath, false);
-            } finally {
-                IOUtils.closeStream(is);
-            }
-
-
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to save cuboid statistics", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setStatisticsPath(String path) {
-        this.setParam(STATISTICS_PATH, path);
-    }
-
-    private String getStatisticsPath() {
-        return getParam(STATISTICS_PATH);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
deleted file mode 100644
index dd99a64..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-/**
- */
-public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
-
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String CUBE_NAME = "cubeName";
-    private static final String CUBING_JOB_ID = "cubingJobId";
-
-    public UpdateCubeInfoAfterBuildStep() {
-        super();
-    }
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setCubingJobId(String id) {
-        setParam(CUBING_JOB_ID, id);
-    }
-
-    private String getCubingJobId() {
-        return getParam(CUBING_JOB_ID);
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
-        final CubeInstance cube = cubeManager.getCube(getCubeName());
-        final CubeSegment segment = cube.getSegmentById(getSegmentId());
-        
-        CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
-        long sourceCount = cubingJob.findSourceRecordCount();
-        long sourceSizeBytes = cubingJob.findSourceSizeBytes();
-        long cubeSizeBytes = cubingJob.findCubeSizeBytes();
-        boolean segmentReady = cubeSizeBytes > 0; // for build+merge scenario, convert HFile not happen yet, so cube size is 0
-
-        segment.setLastBuildJobID(getCubingJobId());
-        segment.setLastBuildTime(System.currentTimeMillis());
-        segment.setSizeKB(cubeSizeBytes / 1024);
-        segment.setInputRecords(sourceCount);
-        segment.setInputRecordsSize(sourceSizeBytes);
-
-        try {
-            if (segmentReady) {
-                cubeManager.promoteNewlyBuiltSegments(cube, segment);
-            } else {
-                CubeUpdate cubeBuilder = new CubeUpdate(cube);
-                cubeBuilder.setToUpdateSegs(segment);
-                cubeManager.updateCube(cubeBuilder);
-            }
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to update cube after build", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
deleted file mode 100644
index d237908..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
-
-    private static final String CUBE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
-    private static final String CUBING_JOB_ID = "cubingJobId";
-
-    private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-    public UpdateCubeInfoAfterMergeStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final CubeInstance cube = cubeManager.getCube(getCubeName());
-
-        CubeSegment mergedSegment = cube.getSegmentById(getSegmentId());
-        if (mergedSegment == null) {
-            return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + getSegmentId());
-        }
-        
-        CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
-        long cubeSizeBytes = cubingJob.findCubeSizeBytes();
-
-        // collect source statistics
-        List<String> mergingSegmentIds = getMergingSegmentIds();
-        if (mergingSegmentIds.isEmpty()) {
-            return new ExecuteResult(ExecuteResult.State.FAILED, "there are no merging segments");
-        }
-        long sourceCount = 0L;
-        long sourceSize = 0L;
-        for (String id : mergingSegmentIds) {
-            CubeSegment segment = cube.getSegmentById(id);
-            sourceCount += segment.getInputRecords();
-            sourceSize += segment.getInputRecordsSize();
-        }
-
-        // update segment info
-        mergedSegment.setSizeKB(cubeSizeBytes / 1024);
-        mergedSegment.setInputRecords(sourceCount);
-        mergedSegment.setInputRecordsSize(sourceSize);
-        mergedSegment.setLastBuildJobID(getCubingJobId());
-        mergedSegment.setLastBuildTime(System.currentTimeMillis());
-
-        try {
-            cubeManager.promoteNewlyBuiltSegments(cube, mergedSegment);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED);
-        } catch (IOException e) {
-            logger.error("fail to update cube after merge", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setMergingSegmentIds(List<String> ids) {
-        setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
-    }
-
-    private List<String> getMergingSegmentIds() {
-        final String ids = getParam(MERGING_SEGMENT_IDS);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id : splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-    public void setCubingJobId(String id) {
-        setParam(CUBING_JOB_ID, id);
-    }
-
-    private String getCubingJobId() {
-        return getParam(CUBING_JOB_ID);
-    }
-}