You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/03/23 10:15:19 UTC

[35/50] [abbrv] kylin git commit: clean code

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 0a6c123..0b45795 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -22,12 +22,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -38,7 +34,6 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
@@ -52,16 +47,12 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class MergeStatisticsStep extends AbstractExecutable {
 
-    private static final String CUBE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String MERGING_SEGMENT_IS = "mergingSegmentIds";
-    private static final String MERGED_STATISTICS_PATH = "mergedStatisticsPath";
     protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap();
 
     public MergeStatisticsStep() {
@@ -73,16 +64,16 @@ public class MergeStatisticsStep extends AbstractExecutable {
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         KylinConfig kylinConf = context.getConfig();
         final CubeManager mgr = CubeManager.getInstance(kylinConf);
-        final CubeInstance cube = mgr.getCube(getCubeName());
-        final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
+        final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+        final CubeSegment newSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
 
         Configuration conf = HadoopUtil.getCurrentConfiguration();
         ResourceStore rs = ResourceStore.getStore(kylinConf);
         try {
 
             int averageSamplingPercentage = 0;
-            for (String segmentId : this.getMergingSegmentIds()) {
-                String fileKey = CubeSegment.getStatisticsResourcePath(getCubeName(), segmentId);
+            for (String segmentId : CubingExecutableUtil.getMergingSegmentIds(this.getParams())) {
+                String fileKey = CubeSegment.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
                 InputStream is = rs.getResource(fileKey).inputStream;
                 File tempFile = null;
                 FileOutputStream tempFileStream = null;
@@ -126,9 +117,9 @@ public class MergeStatisticsStep extends AbstractExecutable {
                         tempFile.delete();
                 }
             }
-            averageSamplingPercentage = averageSamplingPercentage / this.getMergingSegmentIds().size();
-            CuboidStatsUtil.writeCuboidStatistics(conf, new Path(getMergedStatisticsPath()), cuboidHLLMap, averageSamplingPercentage);
-            Path statisticsFilePath = new Path(getMergedStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+            averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
+            CuboidStatsUtil.writeCuboidStatistics(conf, new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, averageSamplingPercentage);
+            Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
             FileSystem fs = statisticsFilePath.getFileSystem(conf);
             FSDataInputStream is = fs.open(statisticsFilePath);
             try {
@@ -146,45 +137,4 @@ public class MergeStatisticsStep extends AbstractExecutable {
         }
     }
 
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setMergingSegmentIds(List<String> ids) {
-        setParam(MERGING_SEGMENT_IS, StringUtils.join(ids, ","));
-    }
-
-    private List<String> getMergingSegmentIds() {
-        final String ids = getParam(MERGING_SEGMENT_IS);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id : splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-    public void setMergedStatisticsPath(String path) {
-        setParam(MERGED_STATISTICS_PATH, path);
-    }
-
-    private String getMergedStatisticsPath() {
-        return getParam(MERGED_STATISTICS_PATH);
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index 1dbce8e..ff9be44 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -131,7 +131,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         if (myChildren == null || myChildren.size() == 0) {
             context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Skipped records").increment(1L);
             skipCounter++;
-            if (skipCounter % BatchConstants.COUNTER_MAX == 0) {
+            if (skipCounter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
                 logger.info("Skipped " + skipCounter + " records!");
             }
             return;
@@ -140,7 +140,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Processed records").increment(1L);
 
         handleCounter++;
-        if (handleCounter % BatchConstants.COUNTER_MAX == 0) {
+        if (handleCounter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
             logger.info("Handled " + handleCounter + " records!");
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index 9314b88..288ca6a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -47,11 +47,6 @@ import org.apache.kylin.metadata.model.MeasureDesc;
  */
 public class SaveStatisticsStep extends AbstractExecutable {
 
-    private static final String CUBE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String STATISTICS_PATH = "statisticsPath";
-    private static final String CUBING_JOB_ID = "cubingJobId";
-
     public SaveStatisticsStep() {
         super();
     }
@@ -60,15 +55,15 @@ public class SaveStatisticsStep extends AbstractExecutable {
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         KylinConfig kylinConf = context.getConfig();
         final CubeManager mgr = CubeManager.getInstance(kylinConf);
-        final CubeInstance cube = mgr.getCube(getCubeName());
-        final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
+        final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+        final CubeSegment newSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
 
         ResourceStore rs = ResourceStore.getStore(kylinConf);
         try {
-            Path statisticsFilePath = new Path(getStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+            Path statisticsFilePath = new Path(CubingExecutableUtil.getStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
             FileSystem fs = FileSystem.get(HadoopUtil.getCurrentConfiguration());
             if (!fs.exists(statisticsFilePath))
-                throw new IOException("File " + statisticsFilePath + " does not exists;");
+                throw new IOException("File " + statisticsFilePath + " does not exists");
 
             FSDataInputStream is = fs.open(statisticsFilePath);
             try {
@@ -105,7 +100,7 @@ public class SaveStatisticsStep extends AbstractExecutable {
                     break;
                 }
             }
-            
+
             if (memoryHungry == true) {
                 alg = AlgorithmEnum.LAYER;
             } else if ("random".equalsIgnoreCase(algPref)) { // for testing
@@ -120,40 +115,8 @@ public class SaveStatisticsStep extends AbstractExecutable {
         }
         logger.info("The cube algorithm for " + seg + " is " + alg);
 
-        CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
+        CubingJob cubingJob = (CubingJob) executableManager.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
         cubingJob.setAlgorithm(alg);
     }
 
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setStatisticsPath(String path) {
-        this.setParam(STATISTICS_PATH, path);
-    }
-
-    private String getStatisticsPath() {
-        return getParam(STATISTICS_PATH);
-    }
-
-    public void setCubingJobId(String id) {
-        setParam(CUBING_JOB_ID, id);
-    }
-
-    private String getCubingJobId() {
-        return getParam(CUBING_JOB_ID);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index a10fef4..c41aaf1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -33,50 +33,22 @@ import org.apache.kylin.job.execution.ExecuteResult;
  */
 public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
 
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String CUBE_NAME = "cubeName";
-    private static final String CUBING_JOB_ID = "cubingJobId";
-
     public UpdateCubeInfoAfterBuildStep() {
         super();
     }
 
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setCubingJobId(String id) {
-        setParam(CUBING_JOB_ID, id);
-    }
-
-    private String getCubingJobId() {
-        return getParam(CUBING_JOB_ID);
-    }
-
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
-        final CubeInstance cube = cubeManager.getCube(getCubeName());
-        final CubeSegment segment = cube.getSegmentById(getSegmentId());
+        final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+        final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
 
-        CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
+        CubingJob cubingJob = (CubingJob) executableManager.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
         long sourceCount = cubingJob.findSourceRecordCount();
         long sourceSizeBytes = cubingJob.findSourceSizeBytes();
         long cubeSizeBytes = cubingJob.findCubeSizeBytes();
 
-        segment.setLastBuildJobID(getCubingJobId());
+        segment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
         segment.setLastBuildTime(System.currentTimeMillis());
         segment.setSizeKB(cubeSizeBytes / 1024);
         segment.setInputRecords(sourceCount);
@@ -90,5 +62,5 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
             return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
         }
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index 527572b..d3ed68a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -40,10 +40,6 @@ import com.google.common.collect.Lists;
  */
 public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
 
-    private static final String CUBE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
-    private static final String CUBING_JOB_ID = "cubingJobId";
 
     private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
 
@@ -53,18 +49,18 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
 
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final CubeInstance cube = cubeManager.getCube(getCubeName());
+        final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
 
-        CubeSegment mergedSegment = cube.getSegmentById(getSegmentId());
+        CubeSegment mergedSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
         if (mergedSegment == null) {
-            return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + getSegmentId());
+            return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams()));
         }
 
-        CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
+        CubingJob cubingJob = (CubingJob) executableManager.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
         long cubeSizeBytes = cubingJob.findCubeSizeBytes();
 
         // collect source statistics
-        List<String> mergingSegmentIds = getMergingSegmentIds();
+        List<String> mergingSegmentIds = CubingExecutableUtil.getMergingSegmentIds(this.getParams());
         if (mergingSegmentIds.isEmpty()) {
             return new ExecuteResult(ExecuteResult.State.FAILED, "there are no merging segments");
         }
@@ -80,7 +76,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
         mergedSegment.setSizeKB(cubeSizeBytes / 1024);
         mergedSegment.setInputRecords(sourceCount);
         mergedSegment.setInputRecordsSize(sourceSize);
-        mergedSegment.setLastBuildJobID(getCubingJobId());
+        mergedSegment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
         mergedSegment.setLastBuildTime(System.currentTimeMillis());
 
         try {
@@ -92,45 +88,4 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
         }
     }
 
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setMergingSegmentIds(List<String> ids) {
-        setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
-    }
-
-    private List<String> getMergingSegmentIds() {
-        final String ids = getParam(MERGING_SEGMENT_IDS);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id : splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-    public void setCubingJobId(String id) {
-        setParam(CUBING_JOB_ID, id);
-    }
-
-    private String getCubingJobId() {
-        return getParam(CUBING_JOB_ID);
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 285729f..981dac3 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ClassUtil;
@@ -103,7 +104,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 public class SparkCubing extends AbstractApplication {
 
     private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable");
-    private static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("cube").hasArg().isRequired(true).withDescription("Cube Name").create("cubeName");
+    private static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
     private static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId");
     private static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath");
     private static final Option OPTION_COPROCESSOR = OptionBuilder.withArgName("coprocessor").hasArg().isRequired(true).withDescription("Coprocessor Jar Path").create("coprocessor");

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
index 9f17d60..05246f4 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
@@ -18,12 +18,12 @@
 package org.apache.kylin.engine.spark;
 
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.IMROutput2;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.slf4j.Logger;
@@ -48,7 +48,7 @@ public class SparkCubingJobBuilder extends JobBuilderSupport {
     }
 
     public DefaultChainedExecutable build() {
-        final CubingJob result = CubingJob.createBuildJob((CubeSegment)seg, submitter, config);
+        final CubingJob result = CubingJob.createBuildJob((CubeSegment) seg, submitter, config);
         final String jobId = result.getId();
 
         inputSide.addStepPhase1_CreateFlatTable(result);
@@ -59,7 +59,7 @@ public class SparkCubingJobBuilder extends JobBuilderSupport {
         final SparkExecutable sparkExecutable = new SparkExecutable();
         sparkExecutable.setClassName(SparkCubing.class.getName());
         sparkExecutable.setParam("hiveTable", tableName);
-        sparkExecutable.setParam("cubeName", seg.getRealization().getName());
+        sparkExecutable.setParam(CubingExecutableUtil.CUBE_NAME, seg.getRealization().getName());
         sparkExecutable.setParam("segmentId", seg.getUuid());
         sparkExecutable.setParam("confPath", confPath);
         sparkExecutable.setParam("coprocessor", coprocessor);

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
index 1c3e71a..5fea710 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -28,6 +28,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.invertedindex.IIDescManager;
 import org.apache.kylin.invertedindex.IIManager;
@@ -100,40 +101,7 @@ public abstract class BasicService {
     }
 
     protected List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName, final Set<ExecutableState> statusList, final Map<String, Output> allOutputs) {
-        List<CubingJob> results = Lists.newArrayList(FluentIterable.from(getExecutableManager().getAllExecutables()).filter(new Predicate<AbstractExecutable>() {
-            @Override
-            public boolean apply(AbstractExecutable executable) {
-                if (executable instanceof CubingJob) {
-                    if (cubeName == null) {
-                        return true;
-                    }
-                    return ((CubingJob) executable).getCubeName().equalsIgnoreCase(cubeName);
-                } else {
-                    return false;
-                }
-            }
-        }).transform(new Function<AbstractExecutable, CubingJob>() {
-            @Override
-            public CubingJob apply(AbstractExecutable executable) {
-                return (CubingJob) executable;
-            }
-        }).filter(new Predicate<CubingJob>() {
-            @Override
-            public boolean apply(CubingJob executable) {
-                if (null == projectName || null == getProjectManager().getProject(projectName)) {
-                    return true;
-                } else {
-                    ProjectInstance project = getProjectManager().getProject(projectName);
-                    return project.containsRealization(RealizationType.CUBE, executable.getCubeName());
-                }
-            }
-        }).filter(new Predicate<CubingJob>() {
-            @Override
-            public boolean apply(CubingJob executable) {
-                return statusList.contains(allOutputs.get(executable.getId()).getState());
-            }
-        }));
-        return results;
+        return listAllCubingJobs(cubeName, projectName, statusList, -1L, -1L, allOutputs);
     }
 
     protected List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName, final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis, final Map<String, Output> allOutputs) {
@@ -144,7 +112,7 @@ public abstract class BasicService {
                     if (cubeName == null) {
                         return true;
                     }
-                    return ((CubingJob) executable).getCubeName().equalsIgnoreCase(cubeName);
+                    return CubingExecutableUtil.getCubeName(executable.getParams()).equalsIgnoreCase(cubeName);
                 } else {
                     return false;
                 }
@@ -161,7 +129,7 @@ public abstract class BasicService {
                     return true;
                 } else {
                     ProjectInstance project = getProjectManager().getProject(projectName);
-                    return project.containsRealization(RealizationType.CUBE, executable.getCubeName());
+                    return project.containsRealization(RealizationType.CUBE, CubingExecutableUtil.getCubeName(executable.getParams()));
                 }
             }
         }).filter(new Predicate<CubingJob>() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/server/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
index 155593a..cd8eef9 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -19,7 +19,13 @@
 package org.apache.kylin.rest.service;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.cube.CubeInstance;
@@ -30,6 +36,7 @@ import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.JobStatusEnum;
@@ -84,7 +91,7 @@ public class JobService extends BasicService {
     }
 
     public List<JobInstance> listAllJobs(final String cubeName, final String projectName, final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) {
-        Calendar calendar= Calendar.getInstance();
+        Calendar calendar = Calendar.getInstance();
         calendar.setTime(new Date());
         long currentTimeMillis = calendar.getTimeInMillis();
         long timeStartInMillis = getTimeStartInMillis(calendar, timeFilter);
@@ -153,22 +160,22 @@ public class JobService extends BasicService {
 
     private long getTimeStartInMillis(Calendar calendar, JobTimeFilterEnum timeFilter) {
         switch (timeFilter) {
-            case LAST_ONE_DAY:
-                calendar.add(Calendar.DAY_OF_MONTH, -1);
-                return calendar.getTimeInMillis();
-            case LAST_ONE_WEEK:
-                calendar.add(Calendar.WEEK_OF_MONTH, -1);
-                return calendar.getTimeInMillis();
-            case LAST_ONE_MONTH:
-                calendar.add(Calendar.MONTH, -1);
-                return calendar.getTimeInMillis();
-            case LAST_ONE_YEAR:
-                calendar.add(Calendar.YEAR, -1);
-                return calendar.getTimeInMillis();
-            case ALL:
-                return  0;
-            default:
-                throw new RuntimeException("illegal timeFilter for job history:" + timeFilter);
+        case LAST_ONE_DAY:
+            calendar.add(Calendar.DAY_OF_MONTH, -1);
+            return calendar.getTimeInMillis();
+        case LAST_ONE_WEEK:
+            calendar.add(Calendar.WEEK_OF_MONTH, -1);
+            return calendar.getTimeInMillis();
+        case LAST_ONE_MONTH:
+            calendar.add(Calendar.MONTH, -1);
+            return calendar.getTimeInMillis();
+        case LAST_ONE_YEAR:
+            calendar.add(Calendar.YEAR, -1);
+            return calendar.getTimeInMillis();
+        case ALL:
+            return 0;
+        default:
+            throw new RuntimeException("illegal timeFilter for job history:" + timeFilter);
         }
     }
 
@@ -263,8 +270,8 @@ public class JobService extends BasicService {
         CubingJob cubeJob = (CubingJob) job;
         final JobInstance result = new JobInstance();
         result.setName(job.getName());
-        result.setRelatedCube(cubeJob.getCubeName());
-        result.setRelatedSegment(cubeJob.getSegmentIds());
+        result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams()));
+        result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams()));
         result.setLastModified(cubeJob.getLastModified());
         result.setSubmitter(cubeJob.getSubmitter());
         result.setUuid(cubeJob.getId());
@@ -288,8 +295,8 @@ public class JobService extends BasicService {
         Output output = outputs.get(job.getId());
         final JobInstance result = new JobInstance();
         result.setName(job.getName());
-        result.setRelatedCube(cubeJob.getCubeName());
-        result.setRelatedSegment(cubeJob.getSegmentIds());
+        result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams()));
+        result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams()));
         result.setLastModified(output.getLastModified());
         result.setSubmitter(cubeJob.getSubmitter());
         result.setUuid(cubeJob.getId());

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
index cbfd8c3..68b6ae4 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
@@ -60,7 +60,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab
         bindCurrentConfiguration(conf);
         KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
-        String tableName = conf.get(BatchConstants.TABLE_NAME);
+        String tableName = conf.get(BatchConstants.CFG_TABLE_NAME);
         tableDesc = MetadataManager.getInstance(config).getTableDesc(tableName);
         tableInputFormat = MRUtil.getTableInputFormat(tableDesc);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
index 70286ab..9162208 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
@@ -36,7 +36,6 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 
 /**
@@ -79,7 +78,7 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
             setJobClasspath(job);
 
             String table = getOptionValue(OPTION_TABLE);
-            job.getConfiguration().set(BatchConstants.TABLE_NAME, table);
+            job.getConfiguration().set(BatchConstants.CFG_TABLE_NAME, table);
 
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
             FileOutputFormat.setOutputPath(job, output);

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index c2e2e64..6d77240 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -154,30 +154,7 @@ public class HBaseResourceStore extends ResourceStore {
 
     @Override
     protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException {
-        byte[] startRow = Bytes.toBytes(rangeStart);
-        byte[] endRow = plusZero(Bytes.toBytes(rangeEnd));
-
-        Scan scan = new Scan(startRow, endRow);
-        scan.addColumn(B_FAMILY, B_COLUMN_TS);
-        scan.addColumn(B_FAMILY, B_COLUMN);
-        tuneScanParameters(scan);
-
-        HTableInterface table = getConnection().getTable(getAllInOneTableName());
-        List<RawResource> result = Lists.newArrayList();
-        try {
-            ResultScanner scanner = table.getScanner(scan);
-            for (Result r : scanner) {
-                result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r)));
-            }
-        } catch (IOException e) {
-            for (RawResource rawResource : result) {
-                IOUtils.closeQuietly(rawResource.inputStream);
-            }
-            throw e;
-        } finally {
-            IOUtils.closeQuietly(table);
-        }
-        return result;
+        return getAllResources(rangeStart, rangeEnd, -1L, -1L);
     }
 
     @Override
@@ -188,7 +165,10 @@ public class HBaseResourceStore extends ResourceStore {
         Scan scan = new Scan(startRow, endRow);
         scan.addColumn(B_FAMILY, B_COLUMN_TS);
         scan.addColumn(B_FAMILY, B_COLUMN);
-        scan.setFilter(generateTimeFilterList(timeStartInMillis, timeEndInMillis));
+        FilterList filterList = generateTimeFilterList(timeStartInMillis, timeEndInMillis);
+        if (filterList != null) {
+            scan.setFilter(filterList);
+        }
         tuneScanParameters(scan);
 
         HTableInterface table = getConnection().getTable(getAllInOneTableName());
@@ -218,11 +198,15 @@ public class HBaseResourceStore extends ResourceStore {
 
     private FilterList generateTimeFilterList(long timeStartInMillis, long timeEndInMillis) {
         FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
-        SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.GREATER, Bytes.toBytes(timeStartInMillis));
-        filterList.addFilter(timeStartFilter);
-        SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.LESS_OR_EQUAL, Bytes.toBytes(timeEndInMillis));
-        filterList.addFilter(timeEndFilter);
-        return filterList;
+        if (timeStartInMillis != -1L) {
+            SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.GREATER, Bytes.toBytes(timeStartInMillis));
+            filterList.addFilter(timeStartFilter);
+        }
+        if (timeEndInMillis != -1L) {
+            SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.LESS_OR_EQUAL, Bytes.toBytes(timeEndInMillis));
+            filterList.addFilter(timeEndFilter);
+        }
+        return filterList.getFilters().size() == 0 ? null : filterList;
     }
 
     private InputStream getInputStream(String resPath, Result r) throws IOException {
@@ -325,7 +309,7 @@ public class HBaseResourceStore extends ResourceStore {
         byte[] rowkey = Bytes.toBytes(path);
 
         Get get = new Get(rowkey);
-        
+
         if (!fetchContent && !fetchTimestamp) {
             get.setCheckExistenceOnly(true);
         } else {

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index a828728..f71d0f8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -7,15 +7,16 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.storage.hbase.ii.IIBulkLoadJob;
-import org.apache.kylin.storage.hbase.ii.IICreateHFileJob;
-import org.apache.kylin.storage.hbase.ii.IICreateHTableJob;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.realization.IRealizationSegment;
 import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.ii.IIBulkLoadJob;
+import org.apache.kylin.storage.hbase.ii.IICreateHFileJob;
+import org.apache.kylin.storage.hbase.ii.IICreateHTableJob;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -41,16 +42,16 @@ public class HBaseMRSteps extends JobBuilderSupport {
 
     public MapReduceExecutable createRangeRowkeyDistributionStep(String cuboidRootPath, String jobId) {
         String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*";
-        
+
         MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable();
         rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
         StringBuilder cmd = new StringBuilder();
 
         appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(jobId));
-        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getRealization().getName() + "_Step");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getRowkeyDistributionOutputPath(jobId));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Region_Splits_Calculator_" + seg.getRealization().getName() + "_Step");
 
         rowkeyDistributionStep.setMapReduceParams(cmd.toString());
         rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class);
@@ -69,10 +70,10 @@ public class HBaseMRSteps extends JobBuilderSupport {
         HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
         createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
         StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
-        appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats));
 
         createHtableStep.setJobParams(cmd.toString());
         createHtableStep.setJobClass(CreateHTableJob.class);
@@ -82,18 +83,18 @@ public class HBaseMRSteps extends JobBuilderSupport {
 
     public MapReduceExecutable createConvertCuboidToHfileStep(String cuboidRootPath, String jobId) {
         String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*";
-        
+
         MapReduceExecutable createHFilesStep = new MapReduceExecutable();
         createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
         StringBuilder cmd = new StringBuilder();
 
         appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
-        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getHFilePath(jobId));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step");
 
         createHFilesStep.setMapReduceParams(cmd.toString());
         createHFilesStep.setMapReduceJobClass(CubeHFileJob.class);
@@ -107,9 +108,9 @@ public class HBaseMRSteps extends JobBuilderSupport {
         bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
 
         StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "input", getHFilePath(jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getHFilePath(jobId));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
 
         bulkLoadStep.setJobParams(cmd.toString());
         bulkLoadStep.setJobClass(BulkLoadJob.class);
@@ -125,7 +126,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
     }
 
     public List<String> getMergingHTables() {
-        final List<CubeSegment> mergingSegments = ((CubeInstance)seg.getRealization()).getMergingSegments((CubeSegment)seg);
+        final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg);
         Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
         final List<String> mergingHTables = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
@@ -135,7 +136,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
     }
 
     public List<String> getMergingHDFSPaths() {
-        final List<CubeSegment> mergingSegments = ((CubeInstance)seg.getRealization()).getMergingSegments((CubeSegment)seg);
+        final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg);
         Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
         final List<String> mergingHDFSPaths = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
@@ -187,15 +188,14 @@ public class HBaseMRSteps extends JobBuilderSupport {
         jobFlow.addTask(createCreateIIHTableStep(seg));
 
         final String iiPath = rootPath + "*";
-        
+
         // generate hfiles step
         jobFlow.addTask(createConvertIIToHfileStep(seg, iiPath, jobFlow.getId()));
 
         // bulk load step
         jobFlow.addTask(createIIBulkLoadStep(seg, jobFlow.getId()));
-        
-    }
 
+    }
 
     public void addInvertedIndexGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
         String jobId = jobFlow.getId();
@@ -211,14 +211,12 @@ public class HBaseMRSteps extends JobBuilderSupport {
         jobFlow.addTask(step);
     }
 
-    
-
     private HadoopShellExecutable createCreateIIHTableStep(IRealizationSegment seg) {
         HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
         createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
         StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_II_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
 
         createHtableStep.setJobParams(cmd.toString());
         createHtableStep.setJobClass(IICreateHTableJob.class);
@@ -232,11 +230,11 @@ public class HBaseMRSteps extends JobBuilderSupport {
         StringBuilder cmd = new StringBuilder();
 
         appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
-        appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_II_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getHFilePath(jobId));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step");
 
         createHFilesStep.setMapReduceParams(cmd.toString());
         createHFilesStep.setMapReduceJobClass(IICreateHFileJob.class);
@@ -249,9 +247,9 @@ public class HBaseMRSteps extends JobBuilderSupport {
         bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
 
         StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "input", getHFilePath(jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getHFilePath(jobId));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_II_NAME, seg.getRealization().getName());
 
         bulkLoadStep.setJobParams(cmd.toString());
         bulkLoadStep.setJobClass(IIBulkLoadJob.class);
@@ -259,5 +257,5 @@ public class HBaseMRSteps extends JobBuilderSupport {
         return bulkLoadStep;
 
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
index 4cc4794..fa62a62 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
@@ -76,7 +76,7 @@ public class HBaseStreamingOutput implements IStreamingOutput {
             CuboidStatsUtil.writeCuboidStatistics(conf, outputPath, samplingResult, 100);
             FSDataInputStream inputStream = null;
             try {
-                inputStream = FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION));
+                inputStream = FileSystem.getLocal(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME));
                 ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), inputStream, System.currentTimeMillis());
             } finally {
                 IOUtils.closeQuietly(inputStream);

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java
index 2ff7356..8b5daa3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java
@@ -99,11 +99,11 @@ public class RangeKeyDistributionJob extends AbstractHadoopJob {
             int regionSplitSize = KylinConfig.getInstanceFromEnv().getHBaseRegionCut(cubeCapacity.toString());
             int maxRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMax();
             int minRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMin();
-            job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
-            job.getConfiguration().set(BatchConstants.HFILE_SIZE_GB, String.valueOf(hfileSizeGB));
-            job.getConfiguration().set(BatchConstants.REGION_SPLIT_SIZE, String.valueOf(regionSplitSize));
-            job.getConfiguration().set(BatchConstants.REGION_NUMBER_MAX, String.valueOf(maxRegionCount));
-            job.getConfiguration().set(BatchConstants.REGION_NUMBER_MIN, String.valueOf(minRegionCount));
+            job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
+            job.getConfiguration().set(BatchConstants.CFG_HFILE_SIZE_GB, String.valueOf(hfileSizeGB));
+            job.getConfiguration().set(BatchConstants.CFG_REGION_SPLIT_SIZE, String.valueOf(regionSplitSize));
+            job.getConfiguration().set(BatchConstants.CFG_REGION_NUMBER_MAX, String.valueOf(maxRegionCount));
+            job.getConfiguration().set(BatchConstants.CFG_REGION_NUMBER_MIN, String.valueOf(minRegionCount));
             // The partition file for hfile is sequenece file consists of ImmutableBytesWritable and NullWritable
             TableMapReduceUtil.addDependencyJars(job.getConfiguration(), ImmutableBytesWritable.class, NullWritable.class);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
index acdab62..4e53ca4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
@@ -57,24 +57,24 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable
     protected void setup(Context context) throws IOException {
         super.bindCurrentConfiguration(context.getConfiguration());
 
-        if (context.getConfiguration().get(BatchConstants.OUTPUT_PATH) != null) {
-            output = context.getConfiguration().get(BatchConstants.OUTPUT_PATH);
+        if (context.getConfiguration().get(BatchConstants.CFG_OUTPUT_PATH) != null) {
+            output = context.getConfiguration().get(BatchConstants.CFG_OUTPUT_PATH);
         }
 
-        if (context.getConfiguration().get(BatchConstants.HFILE_SIZE_GB) != null) {
-            hfileSizeGB = Integer.valueOf(context.getConfiguration().get(BatchConstants.HFILE_SIZE_GB));
+        if (context.getConfiguration().get(BatchConstants.CFG_HFILE_SIZE_GB) != null) {
+            hfileSizeGB = Integer.valueOf(context.getConfiguration().get(BatchConstants.CFG_HFILE_SIZE_GB));
         }
 
-        if (context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE) != null) {
-            cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE));
+        if (context.getConfiguration().get(BatchConstants.CFG_REGION_SPLIT_SIZE) != null) {
+            cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.CFG_REGION_SPLIT_SIZE));
         }
 
-        if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN) != null) {
-            minRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN));
+        if (context.getConfiguration().get(BatchConstants.CFG_REGION_NUMBER_MIN) != null) {
+            minRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.CFG_REGION_NUMBER_MIN));
         }
 
-        if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX) != null) {
-            maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX));
+        if (context.getConfiguration().get(BatchConstants.CFG_REGION_NUMBER_MAX) != null) {
+            maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.CFG_REGION_NUMBER_MAX));
         }
 
         logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount