You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/03/11 08:01:00 UTC

[2/2] kylin git commit: clean code

clean code


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5e13bba0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5e13bba0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5e13bba0

Branch: refs/heads/master
Commit: 5e13bba0822cafeb02817e5e59c08a3a4b5020c9
Parents: 4662ada
Author: Hongbin Ma <ma...@apache.org>
Authored: Fri Mar 11 14:03:45 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Fri Mar 11 15:00:35 2016 +0800

----------------------------------------------------------------------
 .../kylin/common/persistence/ResourceStore.java | 26 ++-----
 .../java/org/apache/kylin/cube/CubeSegment.java | 31 +++++---
 .../org/apache/kylin/cube/cuboid/Cuboid.java    | 26 +++----
 .../kylin/job/constant/ExecutableConstants.java | 20 -----
 .../kylin/job/execution/AbstractExecutable.java | 27 +++----
 .../kylin/engine/mr/BatchCubingJobBuilder.java  | 25 ++++---
 .../kylin/engine/mr/BatchCubingJobBuilder2.java | 48 ++++++------
 .../kylin/engine/mr/BatchMergeJobBuilder.java   | 13 ++--
 .../kylin/engine/mr/BatchMergeJobBuilder2.java  | 22 +++---
 .../org/apache/kylin/engine/mr/CubingJob.java   | 56 ++++----------
 .../kylin/engine/mr/JobBuilderSupport.java      | 63 +++++++++-------
 .../engine/mr/common/AbstractHadoopJob.java     | 33 ++++-----
 .../kylin/engine/mr/common/BatchConstants.java  | 59 ++++++++++-----
 .../kylin/engine/mr/common/CubeStatsReader.java | 56 +++++++-------
 .../kylin/engine/mr/common/CuboidStatsUtil.java |  2 +-
 .../mr/invertedindex/BatchIIJobBuilder.java     | 21 +++---
 .../mr/invertedindex/InvertedIndexJob.java      |  7 +-
 .../engine/mr/steps/BaseCuboidMapperBase.java   |  2 +-
 .../engine/mr/steps/CubingExecutableUtil.java   | 78 ++++++++++++++++++++
 .../kylin/engine/mr/steps/CuboidReducer.java    |  2 +-
 .../engine/mr/steps/FactDistinctColumnsJob.java |  2 +-
 .../mr/steps/FactDistinctColumnsMapperBase.java |  2 +-
 .../mr/steps/FactDistinctColumnsReducer.java    |  4 +-
 .../engine/mr/steps/HiveToBaseCuboidMapper.java |  2 +-
 .../engine/mr/steps/InMemCuboidMapper.java      |  2 +-
 .../engine/mr/steps/InMemCuboidReducer.java     |  2 +-
 .../engine/mr/steps/MergeDictionaryStep.java    | 47 +-----------
 .../engine/mr/steps/MergeStatisticsStep.java    | 66 ++---------------
 .../kylin/engine/mr/steps/NDCuboidMapper.java   |  4 +-
 .../engine/mr/steps/SaveStatisticsStep.java     | 49 ++----------
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java  | 38 ++--------
 .../mr/steps/UpdateCubeInfoAfterMergeStep.java  | 57 ++------------
 .../apache/kylin/engine/spark/SparkCubing.java  |  3 +-
 .../engine/spark/SparkCubingJobBuilder.java     |  6 +-
 .../apache/kylin/rest/service/BasicService.java | 40 +---------
 .../apache/kylin/rest/service/JobService.java   | 51 +++++++------
 .../cardinality/ColumnCardinalityMapper.java    |  2 +-
 .../cardinality/HiveColumnCardinalityJob.java   |  3 +-
 .../kylin/storage/hbase/HBaseResourceStore.java | 46 ++++--------
 .../kylin/storage/hbase/steps/HBaseMRSteps.java | 78 ++++++++++----------
 .../hbase/steps/HBaseStreamingOutput.java       |  2 +-
 .../hbase/steps/RangeKeyDistributionJob.java    | 10 +--
 .../steps/RangeKeyDistributionReducer.java      | 20 ++---
 43 files changed, 483 insertions(+), 670 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index ccae80b..88ee553 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -73,7 +73,7 @@ abstract public class ResourceStore {
         }
         return knownImpl;
     }
-    
+
     private static ResourceStore createResourceStore(KylinConfig kylinConfig) {
         List<Throwable> es = new ArrayList<Throwable>();
         logger.info("Using metadata url " + kylinConfig.getMetadataUrl() + " for resource store");
@@ -141,7 +141,7 @@ abstract public class ResourceStore {
         RawResource res = getResourceImpl(resPath);
         if (res == null)
             return null;
-        
+
         DataInputStream din = new DataInputStream(res.inputStream);
         try {
             T r = serializer.deserialize(din);
@@ -160,25 +160,9 @@ abstract public class ResourceStore {
     final public long getResourceTimestamp(String resPath) throws IOException {
         return getResourceTimestampImpl(norm(resPath));
     }
-    
+
     final public <T extends RootPersistentEntity> List<T> getAllResources(String rangeStart, String rangeEnd, Class<T> clazz, Serializer<T> serializer) throws IOException {
-        final List<RawResource> allResources = getAllResources(rangeStart, rangeEnd);
-        if (allResources.isEmpty()) {
-            return Collections.emptyList();
-        }
-        List<T> result = Lists.newArrayList();
-        try {
-            for (RawResource rawResource : allResources) {
-                final T element = serializer.deserialize(new DataInputStream(rawResource.inputStream));
-                element.setLastModified(rawResource.timestamp);
-                result.add(element);
-            }
-            return result;
-        } finally {
-            for (RawResource rawResource : allResources) {
-                IOUtils.closeQuietly(rawResource.inputStream);
-            }
-        }
+        return getAllResources(rangeStart, rangeEnd, -1L, -1L, clazz, serializer);
     }
 
     final public <T extends RootPersistentEntity> List<T> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis, Class<T> clazz, Serializer<T> serializer) throws IOException {
@@ -210,7 +194,7 @@ abstract public class ResourceStore {
 
     /** returns 0 if not exists */
     abstract protected long getResourceTimestampImpl(String resPath) throws IOException;
-    
+
     /**
      * overwrite a resource without write conflict check
      */

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 67dce73..5b61c10 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -18,12 +18,12 @@
 
 package org.apache.kylin.cube;
 
-import java.text.SimpleDateFormat;
-import java.util.Collection;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.concurrent.ConcurrentHashMap;
-
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonBackReference;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.ShardingHash;
@@ -37,11 +37,12 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.IRealizationSegment;
 
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonBackReference;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Maps;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
 
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IRealizationSegment {
@@ -76,6 +77,9 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
     private Map<Long, Short> cuboidShardNums = Maps.newHashMap();
     @JsonProperty("total_shards")
     private int totalShards = 0;
+    @JsonProperty("blackout_cuboids")
+    private List<Long> blackoutCuboids = Lists.newArrayList();
+
 
     @JsonProperty("binary_signature")
     private String binarySignature; // a hash of cube schema and dictionary ID, used for sanity check
@@ -391,6 +395,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
 
     /**
      * get the number of shards where each cuboid will distribute
+     *
      * @return
      */
     public Short getCuboidShardNum(Long cuboidId) {
@@ -423,6 +428,10 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
         return ret;
     }
 
+    public List<Long> getBlackoutCuboids() {
+        return this.blackoutCuboids;
+    }
+
     @Override
     public IRealization getRealization() {
         return cubeInstance;

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
index 16b0287..89f5204 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
@@ -18,10 +18,14 @@
 
 package org.apache.kylin.cube.cuboid;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
@@ -31,13 +35,10 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.RowKeyColDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Lists;
 
 public class Cuboid implements Comparable<Cuboid> {
 
@@ -231,8 +232,7 @@ public class Cuboid implements Comparable<Cuboid> {
             return true;
         }
 
-        hier:
-        for (HierarchyMask hierarchyMasks : hierarchyMaskList) {
+        hier: for (HierarchyMask hierarchyMasks : hierarchyMaskList) {
             long result = cuboidID & hierarchyMasks.fullMask;
             if (result > 0) {
                 for (long mask : hierarchyMasks.allMasks) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index d370b0d..d3c1003 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -26,21 +26,15 @@ public final class ExecutableConstants {
     }
 
     public static final String YARN_APP_ID = "yarn_application_id";
-
     public static final String YARN_APP_URL = "yarn_application_tracking_url";
     public static final String MR_JOB_ID = "mr_job_id";
     public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
     public static final String SOURCE_RECORDS_COUNT = "source_records_count";
     public static final String SOURCE_RECORDS_SIZE = "source_records_size";
-    public static final String GLOBAL_LISTENER_NAME = "ChainListener";
 
     public static final int DEFAULT_SCHEDULER_INTERVAL_SECONDS = 60;
 
-    public static final String CUBE_JOB_GROUP_NAME = "cube_job_group";
-
-    public static final String DAEMON_JOB_GROUP_NAME = "daemon_job_group";
     public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
-
     public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
     public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
     public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data";
@@ -57,23 +51,9 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
     public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
     public static final String STEP_NAME_GARBAGE_COLLECTION_HDFS = "Garbage Collection on HDFS";
-
     public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
     public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";
     public static final String STEP_NAME_UPDATE_II_INFO = "Update Inverted Index Info";
 
-    public static final String PROP_ENGINE_CONTEXT = "jobengineConfig";
-    public static final String PROP_JOB_FLOW = "jobFlow";
-    public static final String PROP_JOBINSTANCE_UUID = "jobInstanceUuid";
-    public static final String PROP_JOBSTEP_SEQ_ID = "jobStepSequenceID";
-    public static final String PROP_COMMAND = "command";
-    // public static final String PROP_STORAGE_LOCATION =
-    // "storageLocationIdentifier";
-    public static final String PROP_JOB_ASYNC = "jobAsync";
-    public static final String PROP_JOB_CMD_EXECUTOR = "jobCmdExecutor";
-    public static final String PROP_JOB_CMD_OUTPUT = "jobCmdOutput";
-    public static final String PROP_JOB_KILLED = "jobKilled";
-    public static final String PROP_JOB_RUNTIME_FLOWS = "jobFlows";
-
     public static final String NOTIFY_EMAIL_TEMPLATE = "<div><b>Build Result of Job ${job_name}</b><pre><ul>" + "<li>Build Result: <b>${result}</b></li>" + "<li>Job Engine: ${job_engine}</li>" + "<li>Cube Name: ${cube_name}</li>" + "<li>Source Records Count: ${source_records_count}</li>" + "<li>Start Time: ${start_time}</li>" + "<li>Duration: ${duration}</li>" + "<li>MR Waiting: ${mr_waiting}</li>" + "<li>Last Update Time: ${last_update_time}</li>" + "<li>Submitter: ${submitter}</li>" + "<li>Error Log: ${error_log}</li>" + "</ul></pre><div/>";
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 8d5fea5..83c61ae 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -18,10 +18,13 @@
 
 package org.apache.kylin.job.execution;
 
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kylin.common.KylinConfig;
@@ -32,12 +35,10 @@ import org.apache.kylin.job.manager.ExecutableManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 /**
  */
@@ -117,12 +118,12 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
             }
             retry++;
         } while (((result != null && result.succeed() == false) || exception != null) && needRetry() == true);
-        
+
         if (exception != null) {
             onExecuteError(exception, executableContext);
             throw new ExecuteException(exception);
         }
-        
+
         onExecuteFinished(result, executableContext);
         return result;
     }
@@ -164,7 +165,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
 
     @Override
     public final Map<String, String> getParams() {
-        return Collections.unmodifiableMap(this.params);
+        return this.params;
     }
 
     public final String getParam(String key) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 3c10c09..45d03d1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -22,6 +22,7 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.RowKeyDesc;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
 import org.apache.kylin.engine.mr.steps.NDCuboidJob;
@@ -92,12 +93,12 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
 
         baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
 
-        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", "FLAT_TABLE"); // marks flat table input
-        appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "level", "0");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, "FLAT_TABLE"); // marks flat table input
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[0]);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0");
 
         baseCuboidStep.setMapReduceParams(cmd.toString());
         baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
@@ -113,12 +114,12 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         StringBuilder cmd = new StringBuilder();
 
         appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
-        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
-        appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
-        appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "" + (totalRowkeyColumnCount - dimNum));
 
         ndCuboidStep.setMapReduceParams(cmd.toString());
         ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class);

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 86439a8..5f4a3ed 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -22,8 +22,10 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.RowKeyDesc;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
 import org.apache.kylin.engine.mr.steps.NDCuboidJob;
 import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
@@ -89,10 +91,10 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
     private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
         SaveStatisticsStep result = new SaveStatisticsStep();
         result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
-        result.setCubeName(seg.getRealization().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setStatisticsPath(getStatisticsPath(jobId));
-        result.setCubingJobId(jobId);
+        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+        CubingExecutableUtil.setStatisticsPath(getStatisticsPath(jobId), result.getParams());
+        CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
         return result;
     }
 
@@ -105,11 +107,11 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
 
         cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
 
-        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "output", cuboidRootPath);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "cubingJobId", jobId);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidRootPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Cube_Builder_" + seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
 
         cubeStep.setMapReduceParams(cmd.toString());
         cubeStep.setMapReduceJobClass(InMemCuboidJob.class);
@@ -126,13 +128,13 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
 
         baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
 
-        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", "FLAT_TABLE"); // marks flat table input
-        appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "level", "0");
-        appendExecCmdParameters(cmd, "cubingJobId", jobId);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, "FLAT_TABLE"); // marks flat table input
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[0]);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
 
         baseCuboidStep.setMapReduceParams(cmd.toString());
         baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
@@ -148,13 +150,13 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         StringBuilder cmd = new StringBuilder();
 
         appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
-        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
-        appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
-        appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum));
-        appendExecCmdParameters(cmd, "cubingJobId", jobId);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "" + (totalRowkeyColumnCount - dimNum));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
 
         ndCuboidStep.setMapReduceParams(cmd.toString());
         ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class);

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
index 831aa9d..6f1d445 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -48,7 +49,7 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
     public CubingJob build() {
         logger.info("MR_V1 new job to MERGE segment " + seg);
 
-        final CubeSegment cubeSegment = (CubeSegment)seg;
+        final CubeSegment cubeSegment = (CubeSegment) seg;
 
         final CubingJob result = CubingJob.createMergeJob(cubeSegment, submitter, config);
         final String jobId = result.getId();
@@ -84,11 +85,11 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
         StringBuilder cmd = new StringBuilder();
 
         appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", outputPath);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
 
         mergeCuboidDataStep.setMapReduceParams(cmd.toString());
         mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class);

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index 008d489..e151674 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -22,7 +22,9 @@ import java.util.List;
 
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
 import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -79,10 +81,12 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
     private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
         MergeStatisticsStep result = new MergeStatisticsStep();
         result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
-        result.setMergedStatisticsPath(mergedStatisticsFolder);
+
+        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+        CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams());
+        CubingExecutableUtil.setMergedStatisticsPath(mergedStatisticsFolder, result.getParams());
+
         return result;
     }
 
@@ -92,11 +96,11 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
         StringBuilder cmd = new StringBuilder();
 
         appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", outputPath);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
 
         mergeCuboidDataStep.setMapReduceParams(cmd.toString());
         mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class);

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 979ff75..0325a09 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -24,17 +24,16 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
-import java.util.List;
 import java.util.TimeZone;
 import java.util.regex.Matcher;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -47,8 +46,8 @@ import org.apache.kylin.job.execution.Output;
 /**
  */
 public class CubingJob extends DefaultChainedExecutable {
-    
-    public static enum AlgorithmEnum {
+
+    public enum AlgorithmEnum {
         LAYER, INMEM
     }
 
@@ -58,9 +57,6 @@ public class CubingJob extends DefaultChainedExecutable {
     public static final String CUBE_SIZE_BYTES = "byteSizeBytes";
     public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
 
-    private static final String CUBE_INSTANCE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-
     public static CubingJob createBuildJob(CubeSegment seg, String submitter, JobEngineConfig config) {
         return initCubingJob(seg, "BUILD", submitter, config);
     }
@@ -73,8 +69,8 @@ public class CubingJob extends DefaultChainedExecutable {
         CubingJob result = new CubingJob();
         SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
         format.setTimeZone(TimeZone.getTimeZone(config.getTimeZone()));
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
+        CubingExecutableUtil.setCubeName(seg.getCubeInstance().getName(), result.getParams());
+        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
         result.setName(seg.getCubeInstance().getName() + " - " + seg.getName() + " - " + jobType + " - " + format.format(new Date(System.currentTimeMillis())));
         result.setSubmitter(submitter);
         result.setNotifyList(seg.getCubeInstance().getDescriptor().getNotifyList());
@@ -85,29 +81,9 @@ public class CubingJob extends DefaultChainedExecutable {
         super();
     }
 
-    void setCubeName(String name) {
-        setParam(CUBE_INSTANCE_NAME, name);
-    }
-
-    public String getCubeName() {
-        return getParam(CUBE_INSTANCE_NAME);
-    }
-
-    void setSegmentIds(List<String> segmentIds) {
-        setParam(SEGMENT_ID, StringUtils.join(segmentIds, ","));
-    }
-
-    void setSegmentId(String segmentId) {
-        setParam(SEGMENT_ID, segmentId);
-    }
-
-    public String getSegmentIds() {
-        return getParam(SEGMENT_ID);
-    }
-
     @Override
     protected Pair<String, String> formatNotifications(ExecutableContext context, ExecutableState state) {
-        CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig()).getCube(getCubeName());
+        CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig()).getCube(CubingExecutableUtil.getCubeName(this.getParams()));
         final Output output = jobService.getOutput(getId());
         String logMsg;
         state = output.getState();
@@ -131,7 +107,7 @@ public class CubingJob extends DefaultChainedExecutable {
         String content = ExecutableConstants.NOTIFY_EMAIL_TEMPLATE;
         content = content.replaceAll("\\$\\{job_name\\}", getName());
         content = content.replaceAll("\\$\\{result\\}", state.toString());
-        content = content.replaceAll("\\$\\{cube_name\\}", getCubeName());
+        content = content.replaceAll("\\$\\{cube_name\\}", CubingExecutableUtil.getCubeName(this.getParams()));
         content = content.replaceAll("\\$\\{source_records_count\\}", String.valueOf(findSourceRecordCount()));
         content = content.replaceAll("\\$\\{start_time\\}", new Date(getStartTime()).toString());
         content = content.replaceAll("\\$\\{duration\\}", getDuration() / 60000 + "mins");
@@ -147,7 +123,7 @@ public class CubingJob extends DefaultChainedExecutable {
             logger.warn(e.getLocalizedMessage(), e);
         }
 
-        String title = "[" + state.toString() + "] - [Kylin Cube Build Job]-" + getCubeName();
+        String title = "[" + state.toString() + "] - [Kylin Cube Build Job]-" + CubingExecutableUtil.getCubeName(this.getParams());
         return Pair.of(title, content);
     }
 
@@ -174,11 +150,11 @@ public class CubingJob extends DefaultChainedExecutable {
     public void setMapReduceWaitTime(long t) {
         addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
     }
-    
+
     public void setAlgorithm(AlgorithmEnum alg) {
         addExtraInfo("algorithm", alg.name());
     }
-    
+
     public AlgorithmEnum getAlgorithm() {
         String alg = getExtraInfo().get("algorithm");
         return alg == null ? null : AlgorithmEnum.valueOf(alg);
@@ -187,11 +163,11 @@ public class CubingJob extends DefaultChainedExecutable {
     public boolean isLayerCubing() {
         return AlgorithmEnum.LAYER == getAlgorithm();
     }
-    
+
     public boolean isInMemCubing() {
         return AlgorithmEnum.INMEM == getAlgorithm();
     }
-    
+
     public long findSourceRecordCount() {
         return Long.parseLong(findExtraInfo(SOURCE_RECORD_COUNT, "0"));
     }
@@ -204,7 +180,7 @@ public class CubingJob extends DefaultChainedExecutable {
         // look for the info BACKWARD, let the last step that claims the cube size win
         return Long.parseLong(findExtraInfoBackward(CUBE_SIZE_BYTES, "0"));
     }
-    
+
     public String findExtraInfo(String key, String dft) {
         return findExtraInfo(key, dft, false);
     }
@@ -212,14 +188,14 @@ public class CubingJob extends DefaultChainedExecutable {
     public String findExtraInfoBackward(String key, String dft) {
         return findExtraInfo(key, dft, true);
     }
-    
+
     private String findExtraInfo(String key, String dft, boolean backward) {
         ArrayList<AbstractExecutable> tasks = new ArrayList<AbstractExecutable>(getTasks());
-        
+
         if (backward) {
             Collections.reverse(tasks);
         }
-        
+
         for (AbstractExecutable child : tasks) {
             Output output = executableManager.getOutput(child.getId());
             String value = output.getExtra().get(key);

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index e3b07d8..a3fef8e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -23,21 +23,23 @@ import java.util.List;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.invertedindex.UpdateIIInfoAfterBuildStep;
 import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
 import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
 import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
 import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
-
-import com.google.common.base.Preconditions;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.realization.IRealizationSegment;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Hold reusable steps for builders.
  */
@@ -67,14 +69,14 @@ public class JobBuilderSupport {
         result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
         result.setMapReduceJobClass(FactDistinctColumnsJob.class);
         StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
-        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId));
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
-        appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
-        appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step");
+        appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getFactDistinctColumnsPath(jobId));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_OUTPUT, getStatisticsPath(jobId));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_SAMPLING_PERCENT, String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step");
 
         result.setMapReduceParams(cmd.toString());
         return result;
@@ -85,9 +87,9 @@ public class JobBuilderSupport {
         HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
         buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
         StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", getFactDistinctColumnsPath(jobId));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getFactDistinctColumnsPath(jobId));
 
         buildDictionaryStep.setJobParams(cmd.toString());
         buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
@@ -95,34 +97,38 @@ public class JobBuilderSupport {
     }
 
     public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) {
-        final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
-        updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
-        updateCubeInfoStep.setCubeName(seg.getRealization().getName());
-        updateCubeInfoStep.setSegmentId(seg.getUuid());
-        updateCubeInfoStep.setCubingJobId(jobId);
-        return updateCubeInfoStep;
+        final UpdateCubeInfoAfterBuildStep result = new UpdateCubeInfoAfterBuildStep();
+        result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+
+        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+        CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
+
+        return result;
     }
 
     public MergeDictionaryStep createMergeDictionaryStep(List<String> mergingSegmentIds) {
         MergeDictionaryStep result = new MergeDictionaryStep();
         result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
-        result.setCubeName(seg.getRealization().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
+
+        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+        CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams());
+
         return result;
     }
 
     public UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, String jobId) {
         UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
         result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
-        result.setCubeName(seg.getRealization().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
-        result.setCubingJobId(jobId);
-        return result;
-    }
 
+        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+        CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
+        CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams());
 
+        return result;
+    }
 
     public UpdateIIInfoAfterBuildStep createUpdateIIInfoAfterBuildStep(String jobId) {
         final UpdateIIInfoAfterBuildStep updateIIInfoStep = new UpdateIIInfoAfterBuildStep();
@@ -141,6 +147,7 @@ public class JobBuilderSupport {
     public String getRealizationRootPath(String jobId) {
         return getJobWorkingDir(jobId) + "/" + seg.getRealization().getName();
     }
+
     public String getCuboidRootPath(String jobId) {
         return getRealizationRootPath(jobId) + "/cuboid/";
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 61983d5..fe60ca8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -23,7 +23,7 @@ package org.apache.kylin.engine.mr.common;
  *
  */
 
-import static org.apache.hadoop.util.StringUtils.*;
+import static org.apache.hadoop.util.StringUtils.formatTime;
 
 import java.io.File;
 import java.io.IOException;
@@ -74,22 +74,21 @@ import org.slf4j.LoggerFactory;
 public abstract class AbstractHadoopJob extends Configured implements Tool {
     protected static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class);
 
-    protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname");
-    protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename");
-    protected static final Option OPTION_CUBING_JOB_ID = OptionBuilder.withArgName("cubingJobId").hasArg().isRequired(false).withDescription("ID of cubing job executable").create("cubingJobId");
-    protected static final Option OPTION_II_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create("iiname");
-    protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname");
-    protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Hive table name.").create("tablename");
-    protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
-    protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat");
-    protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output");
-    protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level");
-    protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("partitions");
-    protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename");
-
-    protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder.withArgName("statisticsenabled").hasArg().isRequired(false).withDescription("Statistics enabled").create("statisticsenabled");
-    protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName("statisticsoutput").hasArg().isRequired(false).withDescription("Statistics output").create("statisticsoutput");
-    protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder.withArgName("statisticssamplingpercent").hasArg().isRequired(false).withDescription("Statistics sampling percentage").create("statisticssamplingpercent");
+    protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName(BatchConstants.ARG_JOB_NAME).hasArg().isRequired(true).withDescription("Job name. For example, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create(BatchConstants.ARG_JOB_NAME);
+    protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create(BatchConstants.ARG_CUBE_NAME);
+    protected static final Option OPTION_CUBING_JOB_ID = OptionBuilder.withArgName(BatchConstants.ARG_CUBING_JOB_ID).hasArg().isRequired(false).withDescription("ID of cubing job executable").create(BatchConstants.ARG_CUBING_JOB_ID);
+    protected static final Option OPTION_II_NAME = OptionBuilder.withArgName(BatchConstants.ARG_II_NAME).hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create(BatchConstants.ARG_II_NAME);
+    protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName( BatchConstants.ARG_SEGMENT_NAME).hasArg().isRequired(true).withDescription("Cube segment name").create( BatchConstants.ARG_SEGMENT_NAME);
+    protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg().isRequired(true).withDescription("Input path").create(BatchConstants.ARG_INPUT);
+    protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName(BatchConstants.ARG_INPUT_FORMAT).hasArg().isRequired(false).withDescription("Input format").create(BatchConstants.ARG_INPUT_FORMAT);
+    protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Output path").create(BatchConstants.ARG_OUTPUT);
+    protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName(BatchConstants.ARG_LEVEL).hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create(BatchConstants.ARG_LEVEL);
+    protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName(BatchConstants.ARG_PARTITION).hasArg().isRequired(true).withDescription("Partition file path.").create(BatchConstants.ARG_PARTITION);
+    protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_HTABLE_NAME).hasArg().isRequired(true).withDescription("HTable name").create(BatchConstants.ARG_HTABLE_NAME);
+
+    protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder.withArgName(BatchConstants.ARG_STATS_ENABLED).hasArg().isRequired(false).withDescription("Statistics enabled").create(BatchConstants.ARG_STATS_ENABLED);
+    protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_OUTPUT).hasArg().isRequired(false).withDescription("Statistics output").create(BatchConstants.ARG_STATS_OUTPUT);
+    protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(false).withDescription("Statistics sampling percentage").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
 
     protected String name;
     protected boolean isAsync = false;

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 6943f18..a614f4b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -20,8 +20,15 @@ package org.apache.kylin.engine.mr.common;
 
 public interface BatchConstants {
 
+    /**
+     * source data config
+     */
     char INTERMEDIATE_TABLE_ROW_DELIMITER = 127;
 
+    /**
+     * ConFiGuration entry names for MR jobs
+     */
+
     String CFG_CUBE_NAME = "cube.name";
     String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
     String CFG_CUBE_CUBOID_LEVEL = "cube.cuboid.level";
@@ -29,31 +36,47 @@ public interface BatchConstants {
     String CFG_II_NAME = "ii.name";
     String CFG_II_SEGMENT_NAME = "ii.segment.name";
 
-    String OUTPUT_PATH = "output.path";
-
-    String TABLE_NAME = "table.name";
-    String TABLE_COLUMNS = "table.columns";
-
+    String CFG_OUTPUT_PATH = "output.path";
+    String CFG_TABLE_NAME = "table.name";
     String CFG_IS_MERGE = "is.merge";
     String CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER = "cube.intermediate.table.row.delimiter";
+    String CFG_REGION_NUMBER_MIN = "region.number.min";
+    String CFG_REGION_NUMBER_MAX = "region.number.max";
+    String CFG_REGION_SPLIT_SIZE = "region.split.size";
+    String CFG_HFILE_SIZE_GB = "hfile.size.gb";
 
-    String MAPREDUCE_COUNTER_GROUP_NAME = "Cube Builder";
-
-    String REGION_NUMBER_MIN = "region.number.min";
-    String REGION_NUMBER_MAX = "region.number.max";
-    String REGION_SPLIT_SIZE = "region.split.size";
-    String HFILE_SIZE_GB = "hfile.size.gb";
-    
     String CFG_KYLIN_LOCAL_TEMP_DIR = "/tmp/kylin/";
     String CFG_KYLIN_HDFS_TEMP_DIR = "/tmp/kylin/";
-    
+
     String CFG_STATISTICS_LOCAL_DIR = CFG_KYLIN_LOCAL_TEMP_DIR + "cuboidstatistics/";
     String CFG_STATISTICS_ENABLED = "statistics.enabled";
-    String CFG_STATISTICS_OUTPUT = "statistics.ouput";
+    String CFG_STATISTICS_OUTPUT = "statistics.ouput";//spell error, for compatibility issue better not change it
     String CFG_STATISTICS_SAMPLING_PERCENT = "statistics.sampling.percent";
-    String CFG_STATISTICS_CUBE_ESTIMATION = "cube_statistics.txt";
-    String CFG_STATISTICS_CUBOID_ESTIMATION = "cuboid_statistics.seq";
+    String CFG_STATISTICS_CUBE_ESTIMATION_FILENAME = "cube_statistics.txt";
+    String CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME = "cuboid_statistics.seq";
 
-    int COUNTER_MAX = 100000;
-    int ERROR_RECORD_THRESHOLD = 100;
+    /**
+     * command line ARGuments
+     */
+    String ARG_INPUT = "input";
+    String ARG_OUTPUT = "output";
+    String ARG_JOB_NAME = "jobname";
+    String ARG_CUBING_JOB_ID = "cubingJobId";
+    String ARG_CUBE_NAME = "cubename";
+    String ARG_II_NAME = "iiname";
+    String ARG_SEGMENT_NAME = "segmentname";
+    String ARG_PARTITION = "partitions";
+    String ARG_STATS_ENABLED= "statisticsenabled";
+    String ARG_STATS_OUTPUT= "statisticsoutput";
+    String ARG_STATS_SAMPLING_PERCENT= "statisticssamplingpercent";
+    String ARG_HTABLE_NAME= "htablename";
+    String ARG_INPUT_FORMAT= "inputformat";
+    String ARG_LEVEL= "level";
+
+    /**
+     * logger and counter
+     */
+    String MAPREDUCE_COUNTER_GROUP_NAME = "Cube Builder";
+    int NORMAL_RECORD_LOG_THRESHOLD = 100000;
+    int ERROR_RECORD_LOG_THRESHOLD = 100;
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 57e93c3..00b9e32 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -18,19 +18,8 @@
 
 package org.apache.kylin.engine.mr.common;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintWriter;
-import java.text.DecimalFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
@@ -61,9 +50,16 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 /**
  * This should be in cube module. It's here in engine-mr because currently stats
@@ -76,7 +72,7 @@ public class CubeStatsReader {
     final CubeSegment seg;
     final int samplingPercentage;
     final double mapperOverlapRatioOfFirstBuild; // only makes sense for the first build, is meaningless after merge
-    final Map<Long, HyperLogLogPlusCounter> cuboidRowCountMap;
+    final Map<Long, HyperLogLogPlusCounter> cuboidRowEstimatesHLL;
 
     public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException {
         ResourceStore store = ResourceStore.getStore(kylinConfig);
@@ -112,7 +108,7 @@ public class CubeStatsReader {
             this.seg = cubeSegment;
             this.samplingPercentage = percentage;
             this.mapperOverlapRatioOfFirstBuild = mapperOverlapRatio;
-            this.cuboidRowCountMap = counterMap;
+            this.cuboidRowEstimatesHLL = counterMap;
 
         } finally {
             IOUtils.closeStream(reader);
@@ -133,13 +129,13 @@ public class CubeStatsReader {
         return tempFile;
     }
 
-    public Map<Long, Long> getCuboidRowCountMap() {
-        return getCuboidRowCountMapFromSampling(cuboidRowCountMap, samplingPercentage);
+    public Map<Long, Long> getCuboidRowEstimatesHLL() {
+        return getCuboidRowCountMapFromSampling(cuboidRowEstimatesHLL, samplingPercentage);
     }
 
     // return map of Cuboid ID => MB
     public Map<Long, Double> getCuboidSizeMap() {
-        return getCuboidSizeMapFromRowCount(seg, getCuboidRowCountMap());
+        return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL());
     }
 
     public double getMapperOverlapRatioOfFirstBuild() {
@@ -147,15 +143,13 @@ public class CubeStatsReader {
     }
 
     public static Map<Long, Long> getCuboidRowCountMapFromSampling(Map<Long, HyperLogLogPlusCounter> hllcMap, int samplingPercentage) {
-        return Maps.transformValues(hllcMap, new Function<HyperLogLogPlusCounter, Long>() {
-            @Nullable
-            @Override
-            public Long apply(HyperLogLogPlusCounter input) {
-                // No need to adjust according sampling percentage. Assumption is that data set is far
-                // more than cardinality. Even a percentage of the data should already see all cardinalities.
-                return input.getCountEstimate();
-            }
-        });
+        Map<Long, Long> cuboidRowCountMap = Maps.newHashMap();
+        for (Map.Entry<Long, HyperLogLogPlusCounter> entry : hllcMap.entrySet()) {
+            // No need to adjust according sampling percentage. Assumption is that data set is far
+            // more than cardinality. Even a percentage of the data should already see all cardinalities.
+            cuboidRowCountMap.put(entry.getKey(), entry.getValue().getCountEstimate());
+        }
+        return cuboidRowCountMap;
     }
 
     public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap) {
@@ -219,7 +213,7 @@ public class CubeStatsReader {
     }
 
     private void print(PrintWriter out) {
-        Map<Long, Long> cuboidRows = getCuboidRowCountMap();
+        Map<Long, Long> cuboidRows = getCuboidRowEstimatesHLL();
         Map<Long, Double> cuboidSizes = getCuboidSizeMap();
         List<Long> cuboids = new ArrayList<Long>(cuboidRows.keySet());
         Collections.sort(cuboids);

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
index 02fe0f0..cb4b1cb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
@@ -43,7 +43,7 @@ public class CuboidStatsUtil {
     }
     public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
             Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage, double mapperOverlapRatio) throws IOException {
-        Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+        Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
 
         List<Long> allCuboids = new ArrayList<Long>();
         allCuboids.addAll(cuboidHLLMap.keySet());

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
index e7501b8..4841e64 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java
@@ -22,6 +22,7 @@ import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMROutput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -29,9 +30,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class BatchIIJobBuilder extends JobBuilderSupport {
-    
+
     private static final Logger logger = LoggerFactory.getLogger(BatchIIJobBuilder.class);
-    
+
     private final IMRBatchCubingInputSide inputSide;
     private final IMROutput.IMRBatchInvertedIndexingOutputSide outputSide;
 
@@ -44,15 +45,15 @@ public class BatchIIJobBuilder extends JobBuilderSupport {
     public IIJob build() {
         logger.info("MR new job to BUILD segment " + seg);
 
-        final IIJob result = IIJob.createBuildJob((IISegment)seg, submitter, config);
+        final IIJob result = IIJob.createBuildJob((IISegment) seg, submitter, config);
         final String jobId = result.getId();
-        
+
         final String iiRootPath = getRealizationRootPath(jobId) + "/";
         // Phase 1: Create Flat Table
         inputSide.addStepPhase1_CreateFlatTable(result);
-    
+
         // Phase 2: Build Inverted Index
-        result.addTask(createInvertedIndexStep((IISegment)seg, iiRootPath));
+        result.addTask(createInvertedIndexStep((IISegment) seg, iiRootPath));
         outputSide.addStepPhase3_BuildII(result, iiRootPath);
 
         // Phase 3: Update Metadata & Cleanup
@@ -71,13 +72,13 @@ public class BatchIIJobBuilder extends JobBuilderSupport {
 
         buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II);
 
-        appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "output", iiOutputTempPath);
-        appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_II_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, iiOutputTempPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, ExecutableConstants.STEP_NAME_BUILD_II);
 
         buildIIStep.setMapReduceParams(cmd.toString());
         buildIIStep.setMapReduceJobClass(InvertedIndexJob.class);
         return buildIIStep;
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
index 27505e6..9ea2411 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java
@@ -19,14 +19,12 @@
 package org.apache.kylin.engine.mr.invertedindex;
 
 import java.io.IOException;
-import java.util.ArrayList;
 
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
@@ -39,8 +37,6 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -106,7 +102,6 @@ public class InvertedIndexJob extends AbstractHadoopJob {
         conf.set(BatchConstants.CFG_II_SEGMENT_NAME, seg.getName());
     }
 
-   
     private void setupMapper(IISegment segment) throws IOException {
 
         IMRInput.IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat();
@@ -127,7 +122,7 @@ public class InvertedIndexJob extends AbstractHadoopJob {
 
         FileOutputFormat.setOutputPath(job, output);
 
-        job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+        job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
 
         deletePath(job.getConfiguration(), output);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 3dddece..5fb9098 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -195,7 +195,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
 
         // TODO expose errorRecordCounter as hadoop counter
         errorRecordCounter++;
-        if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) {
+        if (errorRecordCounter > BatchConstants.ERROR_RECORD_LOG_THRESHOLD) {
             if (ex instanceof IOException)
                 throw (IOException) ex;
             else if (ex instanceof RuntimeException)

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
new file mode 100644
index 0000000..71f27f2
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
@@ -0,0 +1,78 @@
+package org.apache.kylin.engine.mr.steps;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.google.common.collect.Lists;
+
+public class CubingExecutableUtil {
+
+    public static final String CUBE_NAME = "cubeName";
+    public static final String SEGMENT_ID = "segmentId";
+    public static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
+    public static final String STATISTICS_PATH = "statisticsPath";
+    public static final String CUBING_JOB_ID = "cubingJobId";
+    public static final String MERGED_STATISTICS_PATH = "mergedStatisticsPath";
+
+    public static void setStatisticsPath(String path, Map<String, String> params) {
+        params.put(STATISTICS_PATH, path);
+    }
+
+    public static String getStatisticsPath(Map<String, String> params) {
+        return params.get(STATISTICS_PATH);
+    }
+
+    public static void setCubeName(String cubeName, Map<String, String> params) {
+        params.put(CUBE_NAME, cubeName);
+    }
+
+    public static String getCubeName(Map<String, String> params) {
+        return params.get(CUBE_NAME);
+    }
+
+    public static void setSegmentId(String segmentId, Map<String, String> params) {
+        params.put(SEGMENT_ID, segmentId);
+    }
+
+    public static String getSegmentId(Map<String, String> params) {
+        return params.get(SEGMENT_ID);
+    }
+
+    public static void setMergingSegmentIds(List<String> ids, Map<String, String> params) {
+        params.put(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
+    }
+
+    public static List<String> getMergingSegmentIds(Map<String, String> params) {
+        final String ids = params.get(MERGING_SEGMENT_IDS);
+        if (ids != null) {
+            final String[] splitted = StringUtils.split(ids, ",");
+            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+            for (String id : splitted) {
+                result.add(id);
+            }
+            return result;
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+    public static void setCubingJobId(String id, Map<String, String> params) {
+        params.put(CUBING_JOB_ID, id);
+    }
+
+    public static String getCubingJobId(Map<String, String> params) {
+        return params.get(CUBING_JOB_ID);
+    }
+
+    public static void setMergedStatisticsPath(String path, Map<String, String> params) {
+        params.put(MERGED_STATISTICS_PATH, path);
+    }
+
+    public static String getMergedStatisticsPath(Map<String, String> params) {
+        return params.get(MERGED_STATISTICS_PATH);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
index 4dbb53e..f263d99 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -108,7 +108,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
         context.write(key, outputValue);
 
         counter++;
-        if (counter % BatchConstants.COUNTER_MAX == 0) {
+        if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
             logger.info("Handled " + counter + " records!");
         }
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 1373e2c..2dabb7a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -125,7 +125,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
         job.setNumReduceTasks(numberOfReducers);
 
         FileOutputFormat.setOutputPath(job, output);
-        job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+        job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
 
         deletePath(job.getConfiguration(), output);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 1412dfb..cc7b6df 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -71,7 +71,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
         ex.printStackTrace(System.err);
 
         errorRecordCounter++;
-        if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) {
+        if (errorRecordCounter > BatchConstants.ERROR_RECORD_LOG_THRESHOLD) {
             if (ex instanceof IOException)
                 throw (IOException) ex;
             else if (ex instanceof RuntimeException)

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index f3e0290..f43834d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -128,7 +128,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
     private void outputDistinctValues(TblColRef col, Collection<ByteArray> values, Context context) throws IOException {
         final Configuration conf = context.getConfiguration();
         final FileSystem fs = FileSystem.get(conf);
-        final String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
+        final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH);
         final Path outputFile = new Path(outputPath, col.getName());
 
         FSDataOutputStream out = null;
@@ -176,7 +176,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
     private void writeMapperAndCuboidStatistics(Context context) throws IOException {
         Configuration conf = context.getConfiguration();
         FileSystem fs = FileSystem.get(conf);
-        FSDataOutputStream out = fs.create(new Path(statisticsOutput, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION));
+        FSDataOutputStream out = fs.create(new Path(statisticsOutput, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION_FILENAME));
 
         try {
             String msg;

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
index e2a49df..8f5557d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
@@ -41,7 +41,7 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, O
     @Override
     public void map(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
         counter++;
-        if (counter % BatchConstants.COUNTER_MAX == 0) {
+        if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
             logger.info("Handled " + counter + " records!");
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index b094b98..e42a6b5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -100,7 +100,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
         while (!future.isDone()) {
             if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
                 counter++;
-                if (counter % BatchConstants.COUNTER_MAX == 0) {
+                if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
                     logger.info("Handled " + counter + " records!");
                 }
                 break;

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index 9beacbb..e72c38b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -85,7 +85,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
         context.write(outputKey, outputValue);
         
         counter++;
-        if (counter % BatchConstants.COUNTER_MAX == 0) {
+        if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
             logger.info("Handled " + counter + " records!");
         }
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
index f247a8f..264ba9b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
@@ -25,7 +25,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -44,10 +43,6 @@ import com.google.common.collect.Lists;
 
 public class MergeDictionaryStep extends AbstractExecutable {
 
-    private static final String CUBE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
-
     public MergeDictionaryStep() {
         super();
     }
@@ -56,8 +51,8 @@ public class MergeDictionaryStep extends AbstractExecutable {
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         KylinConfig conf = context.getConfig();
         final CubeManager mgr = CubeManager.getInstance(conf);
-        final CubeInstance cube = mgr.getCube(getCubeName());
-        final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
+        final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+        final CubeSegment newSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
         final List<CubeSegment> mergingSegments = getMergingSegments(cube);
 
         Collections.sort(mergingSegments);
@@ -79,7 +74,7 @@ public class MergeDictionaryStep extends AbstractExecutable {
     }
 
     private List<CubeSegment> getMergingSegments(CubeInstance cube) {
-        List<String> mergingSegmentIds = getMergingSegmentIds();
+        List<String> mergingSegmentIds = CubingExecutableUtil.getMergingSegmentIds(this.getParams());
         List<CubeSegment> result = Lists.newArrayListWithCapacity(mergingSegmentIds.size());
         for (String id : mergingSegmentIds) {
             result.add(cube.getSegmentById(id));
@@ -111,7 +106,7 @@ public class MergeDictionaryStep extends AbstractExecutable {
         CubeDesc cubeDesc = cube.getDescriptor();
 
         for (TblColRef col : cubeDesc.getAllColumnsNeedDictionary()) {
-            String dictTable = dictMgr.decideSourceData(cubeDesc.getModel(),true, col).getTable();
+            String dictTable = dictMgr.decideSourceData(cubeDesc.getModel(), true, col).getTable();
             if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
                 colsNeedMeringDict.add(col);
             } else {
@@ -165,38 +160,4 @@ public class MergeDictionaryStep extends AbstractExecutable {
         }
     }
 
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setMergingSegmentIds(List<String> ids) {
-        setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
-    }
-
-    private List<String> getMergingSegmentIds() {
-        final String ids = getParam(MERGING_SEGMENT_IDS);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id : splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
 }