You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/25 10:33:51 UTC

[3/4] kylin git commit: KYLIN-1245 Put layer cubing and in-mem cubing side by side, switch based on 'mapper overlap ratio'

KYLIN-1245 Put layer cubing and in-mem cubing side by side, switch based on 'mapper overlap ratio'


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2aedddf4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2aedddf4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2aedddf4

Branch: refs/heads/2.0-rc
Commit: 2aedddf4894cd09bf039bf9479a856cd38f6cf1d
Parents: 8936df1
Author: Yang Li <li...@apache.org>
Authored: Mon Dec 21 19:52:31 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Dec 25 17:20:34 2015 +0800

----------------------------------------------------------------------
 build/conf/kylin_hive_conf.xml                  |   2 +-
 .../org/apache/kylin/common/KylinConfig.java    | 578 +------------------
 .../apache/kylin/common/KylinConfigBase.java    | 506 ++++++++++++++++
 .../org/apache/kylin/common/KylinConfigExt.java |  23 +
 .../apache/kylin/common/util/MailService.java   |   2 +-
 .../kylin/common/util/MailServiceTest.java      |   4 +-
 .../org/apache/kylin/cube/CubeInstance.java     |  10 -
 .../java/org/apache/kylin/cube/CubeManager.java |   6 +-
 .../org/apache/kylin/cube/model/CubeDesc.java   |   3 +
 .../org/apache/kylin/cube/CubeSegmentsTest.java |   2 +-
 .../kylin/job/execution/AbstractExecutable.java |   4 +
 .../kylin/engine/mr/BatchCubingJobBuilder2.java |  94 ++-
 .../kylin/engine/mr/BatchMergeJobBuilder2.java  |  35 +-
 .../org/apache/kylin/engine/mr/CubingJob.java   |  23 +-
 .../org/apache/kylin/engine/mr/IMROutput2.java  | 104 ++--
 .../java/org/apache/kylin/engine/mr/MRUtil.java |   6 -
 .../engine/mr/common/AbstractHadoopJob.java     |   7 +-
 .../kylin/engine/mr/common/CubeStatsReader.java | 246 ++++++++
 .../kylin/engine/mr/common/CuboidStatsUtil.java |  15 +-
 .../engine/mr/common/MapReduceExecutable.java   |   6 +-
 .../kylin/engine/mr/steps/BaseCuboidJob.java    |   3 +-
 .../apache/kylin/engine/mr/steps/CuboidJob.java |  26 +
 .../mr/steps/FactDistinctColumnsReducer.java    |  22 +-
 .../kylin/engine/mr/steps/InMemCuboidJob.java   |  78 ++-
 .../engine/mr/steps/InMemCuboidReducer.java     |  28 +-
 .../mr/steps/MergeCuboidFromStorageJob.java     |  94 ---
 .../mr/steps/MergeCuboidFromStorageMapper.java  | 261 ---------
 .../engine/mr/steps/MergeStatisticsStep.java    |   2 +-
 .../engine/mr/steps/SaveStatisticsStep.java     |  37 ++
 .../apache/kylin/engine/spark/SparkCubing.java  |  67 ++-
 .../streaming/monitor/StreamingMonitor.java     |   2 +-
 .../kylin/rest/security/CrossDomainFilter.java  |   2 +-
 .../apache/kylin/rest/service/CubeService.java  |   2 +-
 .../apache/kylin/rest/service/JobService.java   |   2 +-
 .../storage/hbase/steps/CreateHTableJob.java    | 143 +----
 .../storage/hbase/steps/HBaseMROutput2.java     | 290 ----------
 .../hbase/steps/HBaseMROutput2Transition.java   | 329 +----------
 37 files changed, 1241 insertions(+), 1823 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/build/conf/kylin_hive_conf.xml
----------------------------------------------------------------------
diff --git a/build/conf/kylin_hive_conf.xml b/build/conf/kylin_hive_conf.xml
index 3fe17b6..9bf74d4 100644
--- a/build/conf/kylin_hive_conf.xml
+++ b/build/conf/kylin_hive_conf.xml
@@ -22,7 +22,7 @@ limitations under the License. See accompanying LICENSE file.
 
     <property>
         <name>dfs.block.size</name>
-        <value>10485760</value>
+        <value>32000000</value>
         <description>Want more mappers for in-mem cubing, thus smaller the DFS block size</description>
     </property>
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index c65ade4..bc18989 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -27,129 +27,29 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintWriter;
-import java.io.Serializable;
 import java.io.StringWriter;
 import java.util.Enumeration;
 import java.util.Map;
 import java.util.Properties;
-import java.util.SortedSet;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.restclient.RestClient;
-import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.Log4jConfigurer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Sets;
-
 /**
  */
 @SuppressWarnings("serial")
-public class KylinConfig implements Serializable {
-
-    /*
-     * NOTE: These key constants should be private or even better be removed.
-     *       All external access should go through public methods.
-     */
-    public static final String KYLIN_OWNER = "kylin.owner";
-
-    public static final String KYLIN_STORAGE_URL = "kylin.storage.url";
-
-    public static final String KYLIN_METADATA_URL = "kylin.metadata.url";
-
-    public static final String KYLIN_REST_SERVERS = "kylin.rest.servers";
-
-    public static final String KYLIN_REST_TIMEZONE = "kylin.rest.timezone";
-
-    public static final String KYLIN_JOB_CONCURRENT_MAX_LIMIT = "kylin.job.concurrent.max.limit";
-
-    public static final String KYLIN_JOB_YARN_APP_REST_CHECK_URL = "kylin.job.yarn.app.rest.check.status.url";
+public class KylinConfig extends KylinConfigBase {
 
-    public static final String KYLIN_JOB_YARN_APP_REST_CHECK_INTERVAL_SECONDS = "kylin.job.yarn.app.rest.check.interval.seconds";
-
-    public static final String HIVE_TABLE_LOCATION_PREFIX = "hive.table.location.";
-
-    public static final String KYLIN_JOB_REMOTE_CLI_PASSWORD = "kylin.job.remote.cli.password";
-
-    public static final String KYLIN_JOB_REMOTE_CLI_USERNAME = "kylin.job.remote.cli.username";
-
-    public static final String KYLIN_JOB_REMOTE_CLI_HOSTNAME = "kylin.job.remote.cli.hostname";
-
-    public static final String KYLIN_JOB_REMOTE_CLI_PORT = "kylin.job.remote.cli.port";
-
-    public static final String KYLIN_JOB_REMOTE_CLI_WORKING_DIR = "kylin.job.remote.cli.working.dir";
-
-    public static final String KYLIN_JOB_CMD_EXTRA_ARGS = "kylin.job.cmd.extra.args";
-    /**
-     * Toggle to indicate whether to use hive for table flattening. Default
-     * true.
-     */
-    public static final String KYLIN_JOB_RUN_AS_REMOTE_CMD = "kylin.job.run.as.remote.cmd";
-
-    public static final String KYLIN_JOB_MAPREDUCE_DEFAULT_REDUCE_COUNT_RATIO = "kylin.job.mapreduce.default.reduce.count.ratio";
-
-    public static final String KYLIN_JOB_MAPREDUCE_DEFAULT_REDUCE_INPUT_MB = "kylin.job.mapreduce.default.reduce.input.mb";
-
-    public static final String KYLIN_JOB_MAPREDUCE_MAX_REDUCER_NUMBER = "kylin.job.mapreduce.max.reducer.number";
-
-    public static final String KYLIN_JOB_JAR = "kylin.job.jar";
-
-    public static final String KYLIN_JOB_JAR_SPARK = "kylin.job.jar.spark";
-
-    public static final String COPROCESSOR_LOCAL_JAR = "kylin.coprocessor.local.jar";
-
-    public static final String KYLIN_JOB_LOG_DIR = "kylin.job.log.dir";
-
-    public static final String KYLIN_JOB_CUBING_IN_MEM = "kylin.job.cubing.inMem";
-
-    public static final String KYLIN_JOB_CUBING_IN_MEM_SAMPLING_PERCENT = "kylin.job.cubing.inMem.sampling.percent";
-
-    public static final String KYLIN_HDFS_WORKING_DIR = "kylin.hdfs.working.dir";
-
-    public static final String KYLIN_HBASE_CLUSTER_FS = "kylin.hbase.cluster.fs";
-
-    public static final String HIVE_DATABASE_FOR_INTERMEDIATE_TABLE = "kylin.job.hive.database.for.intermediatetable";
-
-    public static final String HIVE_PASSWORD = "hive.password";
-
-    public static final String HIVE_USER = "hive.user";
+    private static final Logger logger = LoggerFactory.getLogger(KylinConfig.class);
 
-    public static final String HIVE_URL = "hive.url";
-    /**
-     * Kylin properties file
-     */
+    /** Kylin properties file name */
     public static final String KYLIN_CONF_PROPERTIES_FILE = "kylin.properties";
-
-    public static final String MAIL_ENABLED = "mail.enabled";
-
-    public static final String MAIL_HOST = "mail.host";
-
-    public static final String MAIL_USERNAME = "mail.username";
-
-    public static final String MAIL_PASSWORD = "mail.password";
-
-    public static final String MAIL_SENDER = "mail.sender";
-
-    public static final String KYLIN_HOME = "KYLIN_HOME";
     public static final String KYLIN_CONF = "KYLIN_CONF";
 
-    public static final String HBASE_REGION_CUT_SMALL = "kylin.hbase.region.cut.small";
-    public static final String HBASE_REGION_CUT_MEDIUM = "kylin.hbase.region.cut.medium";
-    public static final String HBASE_REGION_CUT_LARGE = "kylin.hbase.region.cut.large";
-
-    public static final String SPARK_HOME = "kylin.spark.home";
-    public static final String SPARK_MASTER = "kylin.spark.master";
-
-    private static final Logger logger = LoggerFactory.getLogger(KylinConfig.class);
-
-    public static final String VERSION = "${project.version}";
-
-    public static final String HTABLE_DEFAULT_COMPRESSION_CODEC = "kylin.hbase.default.compression.codec";
-
     // static cached instances
     private static KylinConfig ENV_INSTANCE = null;
 
@@ -249,363 +149,6 @@ public class KylinConfig implements Serializable {
         return config;
     }
 
-    // ============================================================================
-
-    /**
-     * Find config from environment. The Search process: 1. Check the
-     * $KYLIN_CONF/kylin.properties 2. Check the $KYLIN_HOME/conf/kylin.properties
-     */
-    private static KylinConfig loadKylinConfig() {
-        Log4jConfigurer.initLogger();
-
-        InputStream is = getKylinPropertiesAsInputSteam();
-        if (is == null) {
-            throw new IllegalArgumentException("Failed to load kylin config");
-        }
-        KylinConfig config = new KylinConfig();
-        config.reloadKylinConfig(is);
-
-        return config;
-    }
-
-    private volatile Properties properties = new Properties();
-
-    public CliCommandExecutor getCliCommandExecutor() throws IOException {
-        CliCommandExecutor exec = new CliCommandExecutor();
-        if (getRunAsRemoteCommand()) {
-            exec.setRunAtRemote(getRemoteHadoopCliHostname(), getRemoteHadoopCliPort(), getRemoteHadoopCliUsername(), getRemoteHadoopCliPassword());
-        }
-        return exec;
-    }
-
-    // ============================================================================
-
-    public String getStorageUrl() {
-        return getOptional(KYLIN_STORAGE_URL);
-    }
-
-    public String getHiveUrl() {
-        return getOptional(HIVE_URL, "");
-    }
-
-    public String getHiveUser() {
-        return getOptional(HIVE_USER, "");
-    }
-
-    public String getHivePassword() {
-        return getOptional(HIVE_PASSWORD, "");
-    }
-
-    public String getHdfsWorkingDirectory() {
-        String root = getRequired(KYLIN_HDFS_WORKING_DIR);
-        if (!root.endsWith("/")) {
-            root += "/";
-        }
-        return root + getMetadataUrlPrefix() + "/";
-    }
-
-    public String getHBaseClusterFs() {
-        return getOptional(KYLIN_HBASE_CLUSTER_FS, "");
-    }
-
-    public String getKylinJobLogDir() {
-        return getOptional(KYLIN_JOB_LOG_DIR, "/tmp/kylin/logs");
-    }
-
-    public String getKylinJobJarPath() {
-        final String jobJar = getOptional(KYLIN_JOB_JAR);
-        if (StringUtils.isNotEmpty(jobJar)) {
-            return jobJar;
-        }
-        String kylinHome = getKylinHome();
-        if (StringUtils.isEmpty(kylinHome)) {
-            return "";
-        }
-        return getFileName(kylinHome + File.separator + "lib", JOB_JAR_NAME_PATTERN);
-    }
-
-    public String getKylinJobMRLibDir() {
-        return getOptional("kylin.job.mr.lib.dir", "");
-    }
-
-    public String getKylinSparkJobJarPath() {
-        final String jobJar = getOptional(KYLIN_JOB_JAR_SPARK);
-        if (StringUtils.isNotEmpty(jobJar)) {
-            return jobJar;
-        }
-        String kylinHome = getKylinHome();
-        if (StringUtils.isEmpty(kylinHome)) {
-            return "";
-        }
-        return getFileName(kylinHome + File.separator + "lib", SPARK_JOB_JAR_NAME_PATTERN);
-    }
-
-    public void overrideMRJobJarPath(String path) {
-        logger.info("override " + KYLIN_JOB_JAR + " to " + path);
-        System.setProperty(KYLIN_JOB_JAR, path);
-    }
-
-    public void overrideSparkJobJarPath(String path) {
-        logger.info("override " + KYLIN_JOB_JAR_SPARK + " to " + path);
-        System.setProperty(KYLIN_JOB_JAR_SPARK, path);
-    }
-
-    private static final Pattern COPROCESSOR_JAR_NAME_PATTERN = Pattern.compile("kylin-coprocessor-(.+)\\.jar");
-    private static final Pattern JOB_JAR_NAME_PATTERN = Pattern.compile("kylin-job-(.+)\\.jar");
-    private static final Pattern SPARK_JOB_JAR_NAME_PATTERN = Pattern.compile("kylin-engine-spark-(.+)\\.jar");
-
-    public String getCoprocessorLocalJar() {
-        final String coprocessorJar = getOptional(COPROCESSOR_LOCAL_JAR);
-        if (StringUtils.isNotEmpty(coprocessorJar)) {
-            return coprocessorJar;
-        }
-        String kylinHome = getKylinHome();
-        if (StringUtils.isEmpty(kylinHome)) {
-            throw new RuntimeException("getCoprocessorLocalJar needs KYLIN_HOME");
-        }
-        return getFileName(kylinHome + File.separator + "lib", COPROCESSOR_JAR_NAME_PATTERN);
-    }
-
-    private static String getFileName(String homePath, Pattern pattern) {
-        File home = new File(homePath);
-        SortedSet<String> files = Sets.newTreeSet();
-        if (home.exists() && home.isDirectory()) {
-            for (File file : home.listFiles()) {
-                final Matcher matcher = pattern.matcher(file.getName());
-                if (matcher.matches()) {
-                    files.add(file.getAbsolutePath());
-                }
-            }
-        }
-        if (files.isEmpty()) {
-            throw new RuntimeException("cannot find " + pattern.toString() + " in " + homePath);
-        } else {
-            return files.last();
-        }
-    }
-
-    public void overrideCoprocessorLocalJar(String path) {
-        logger.info("override " + COPROCESSOR_LOCAL_JAR + " to " + path);
-        System.setProperty(COPROCESSOR_LOCAL_JAR, path);
-    }
-
-    public double getDefaultHadoopJobReducerInputMB() {
-        return Double.parseDouble(getOptional(KYLIN_JOB_MAPREDUCE_DEFAULT_REDUCE_INPUT_MB, "500"));
-    }
-
-    public double getDefaultHadoopJobReducerCountRatio() {
-        return Double.parseDouble(getOptional(KYLIN_JOB_MAPREDUCE_DEFAULT_REDUCE_COUNT_RATIO, "1.0"));
-    }
-
-    public int getHadoopJobMaxReducerNumber() {
-        return Integer.parseInt(getOptional(KYLIN_JOB_MAPREDUCE_MAX_REDUCER_NUMBER, "500"));
-    }
-
-    public boolean getRunAsRemoteCommand() {
-        return Boolean.parseBoolean(getOptional(KYLIN_JOB_RUN_AS_REMOTE_CMD));
-    }
-
-    public int getRemoteHadoopCliPort() {
-        return Integer.parseInt(getOptional(KYLIN_JOB_REMOTE_CLI_PORT, "22"));
-    }
-
-    public String getRemoteHadoopCliHostname() {
-        return getOptional(KYLIN_JOB_REMOTE_CLI_HOSTNAME);
-    }
-
-    public String getRemoteHadoopCliUsername() {
-        return getOptional(KYLIN_JOB_REMOTE_CLI_USERNAME);
-    }
-
-    public String getRemoteHadoopCliPassword() {
-        return getOptional(KYLIN_JOB_REMOTE_CLI_PASSWORD);
-    }
-
-    public String getCliWorkingDir() {
-        return getOptional(KYLIN_JOB_REMOTE_CLI_WORKING_DIR);
-    }
-
-    public String getMapReduceCmdExtraArgs() {
-        return getOptional(KYLIN_JOB_CMD_EXTRA_ARGS);
-    }
-
-    public String getOverrideHiveTableLocation(String table) {
-        return getOptional(HIVE_TABLE_LOCATION_PREFIX + table.toUpperCase());
-    }
-
-    public String getYarnStatusCheckUrl() {
-        return getOptional(KYLIN_JOB_YARN_APP_REST_CHECK_URL, null);
-    }
-
-    public int getYarnStatusCheckIntervalSeconds() {
-        return Integer.parseInt(getOptional(KYLIN_JOB_YARN_APP_REST_CHECK_INTERVAL_SECONDS, "60"));
-    }
-
-    public int getMaxConcurrentJobLimit() {
-        return Integer.parseInt(getOptional(KYLIN_JOB_CONCURRENT_MAX_LIMIT, "10"));
-    }
-
-    public String getTimeZone() {
-        return getOptional(KYLIN_REST_TIMEZONE, "PST");
-    }
-
-    public String[] getRestServers() {
-        return getOptionalStringArray(KYLIN_REST_SERVERS);
-    }
-
-    public String getAdminDls() {
-        return getOptional("kylin.job.admin.dls", null);
-    }
-
-    public long getJobStepTimeout() {
-        return Long.parseLong(getOptional("kylin.job.step.timeout", String.valueOf(2 * 60 * 60)));
-    }
-
-    public String getServerMode() {
-        return this.getOptional("kylin.server.mode", "all");
-    }
-
-    public int getDictionaryMaxCardinality() {
-        return Integer.parseInt(getOptional("kylin.dictionary.max.cardinality", "5000000"));
-    }
-
-    public int getTableSnapshotMaxMB() {
-        return Integer.parseInt(getOptional("kylin.table.snapshot.max_mb", "300"));
-    }
-
-    public int getHBaseRegionCountMin() {
-        return Integer.parseInt(getOptional("kylin.hbase.region.count.min", "1"));
-    }
-
-    public int getHBaseRegionCountMax() {
-        return Integer.parseInt(getOptional("kylin.hbase.region.count.max", "500"));
-    }
-
-    public int getScanThreshold() {
-        return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000"));
-    }
-
-    public boolean getQueryRunLocalCoprocessor() {
-        return Boolean.parseBoolean(getOptional("kylin.query.run.local.coprocessor", "false"));
-    }
-
-    public Long getQueryDurationCacheThreshold() {
-        return Long.parseLong(this.getOptional("kylin.query.cache.threshold.duration", String.valueOf(2000)));
-    }
-
-    public Long getQueryScanCountCacheThreshold() {
-        return Long.parseLong(this.getOptional("kylin.query.cache.threshold.scancount", String.valueOf(10 * 1024)));
-    }
-
-    public long getQueryMemBudget() {
-        return Long.parseLong(this.getOptional("kylin.query.mem.budget", String.valueOf(3L * 1024 * 1024 * 1024)));
-    }
-
-    public boolean isQuerySecureEnabled() {
-        return Boolean.parseBoolean(this.getOptional("kylin.query.security.enabled", "false"));
-    }
-
-    public boolean isQueryCacheEnabled() {
-        return Boolean.parseBoolean(this.getOptional("kylin.query.cache.enabled", "true"));
-    }
-
-    public int getHBaseKeyValueSize() {
-        return Integer.parseInt(this.getOptional("kylin.hbase.client.keyvalue.maxsize", "10485760"));
-    }
-
-    public int getHBaseScanCacheRows() {
-        return Integer.parseInt(this.getOptional("kylin.hbase.scan.cache_rows", "1024"));
-    }
-
-    public int getHBaseScanMaxResultSize() {
-        return Integer.parseInt(this.getOptional("kylin.hbase.scan.max_result_size", "" + (5 * 1024 * 1024))); // 5 MB
-    }
-
-    public boolean isCubingInMem() {
-        return Boolean.parseBoolean(this.getOptional(KYLIN_JOB_CUBING_IN_MEM, "false"));
-    }
-
-    public int getCubingInMemSamplingPercent() {
-        int percent = Integer.parseInt(this.getOptional(KYLIN_JOB_CUBING_IN_MEM_SAMPLING_PERCENT, "100"));
-        percent = Math.max(percent, 1);
-        percent = Math.min(percent, 100);
-        return percent;
-    }
-
-    public String getHbaseDefaultCompressionCodec() {
-        return getOptional(HTABLE_DEFAULT_COMPRESSION_CODEC, "");
-    }
-
-    public boolean isHiveKeepFlatTable() {
-        return Boolean.parseBoolean(this.getOptional("kylin.hive.keep.flat.table", "false"));
-    }
-
-    private String getOptional(String prop) {
-        final String property = System.getProperty(prop);
-        return property != null ? property : properties.getProperty(prop);
-    }
-
-    private String[] getOptionalStringArray(String prop) {
-        final String property = getOptional(prop);
-        if (!StringUtils.isBlank(property)) {
-            return property.split("\\s*,\\s*");
-        } else {
-            return new String[] {};
-        }
-    }
-
-    private String getOptional(String prop, String dft) {
-        final String property = System.getProperty(prop);
-        return property != null ? property : properties.getProperty(prop, dft);
-    }
-
-    private String getRequired(String prop) {
-        final String property = System.getProperty(prop);
-        if (property != null) {
-            return property;
-        }
-        String r = properties.getProperty(prop);
-        if (StringUtils.isEmpty(r)) {
-            throw new IllegalArgumentException("missing '" + prop + "' in conf/kylin_instance.properties");
-        }
-        return r;
-    }
-
-    void reloadKylinConfig(InputStream is) {
-        Properties newProperties = new Properties();
-        try {
-            newProperties.load(is);
-        } catch (IOException e) {
-            throw new RuntimeException("Cannot load kylin config.", e);
-        } finally {
-            IOUtils.closeQuietly(is);
-        }
-        this.properties = newProperties;
-    }
-
-    public void writeProperties(File file) throws IOException {
-        FileOutputStream fos = null;
-        try {
-            fos = new FileOutputStream(file);
-            properties.store(fos, file.getAbsolutePath());
-        } finally {
-            IOUtils.closeQuietly(fos);
-        }
-    }
-
-    public static String getKylinHome() {
-        String kylinHome = System.getenv(KYLIN_HOME);
-        if (StringUtils.isEmpty(kylinHome)) {
-            logger.warn("KYLIN_HOME was not set");
-            return kylinHome;
-        }
-        return kylinHome;
-    }
-
-    public void printProperties() throws IOException {
-        properties.list(System.out);
-    }
-
     private static File getKylinProperties() {
         String kylinConfHome = System.getProperty(KYLIN_CONF);
         if (!StringUtils.isEmpty(kylinConfHome)) {
@@ -679,68 +222,33 @@ public class KylinConfig implements Serializable {
         return new File(path, KYLIN_CONF_PROPERTIES_FILE);
     }
 
-    public String getMetadataUrl() {
-        return getOptional(KYLIN_METADATA_URL);
-    }
-
-    public String getMetadataUrlPrefix() {
-        String hbaseMetadataUrl = getMetadataUrl();
-        String defaultPrefix = "kylin_metadata";
+    /**
+     * Find config from environment. The Search process: 1. Check the
+     * $KYLIN_CONF/kylin.properties 2. Check the $KYLIN_HOME/conf/kylin.properties
+     */
+    private static KylinConfig loadKylinConfig() {
+        Log4jConfigurer.initLogger();
 
-        if (org.apache.commons.lang3.StringUtils.containsIgnoreCase(hbaseMetadataUrl, "@hbase")) {
-            int cut = hbaseMetadataUrl.indexOf('@');
-            String tmp = cut < 0 ? defaultPrefix : hbaseMetadataUrl.substring(0, cut);
-            return tmp;
-        } else {
-            return defaultPrefix;
+        InputStream is = getKylinPropertiesAsInputSteam();
+        if (is == null) {
+            throw new IllegalArgumentException("Failed to load kylin config");
         }
-    }
-
-    public void setMetadataUrl(String metadataUrl) {
-        properties.setProperty(KYLIN_METADATA_URL, metadataUrl);
-    }
-
-    public void setStorageUrl(String storageUrl) {
-        properties.setProperty(KYLIN_STORAGE_URL, storageUrl);
-    }
-
-    public String getHiveDatabaseForIntermediateTable() {
-        return this.getOptional(HIVE_DATABASE_FOR_INTERMEDIATE_TABLE, "default");
-    }
-
-    public String getKylinOwner() {
-        return this.getOptional(KYLIN_OWNER, "");
-    }
-
-    public void setRunAsRemoteCommand(String v) {
-        properties.setProperty(KYLIN_JOB_RUN_AS_REMOTE_CMD, v);
-    }
-
-    public void setRemoteHadoopCliHostname(String v) {
-        properties.setProperty(KYLIN_JOB_REMOTE_CLI_HOSTNAME, v);
-    }
-
-    public void setRemoteHadoopCliUsername(String v) {
-        properties.setProperty(KYLIN_JOB_REMOTE_CLI_USERNAME, v);
-    }
+        KylinConfig config = new KylinConfig();
+        config.reloadKylinConfig(is);
 
-    public void setRemoteHadoopCliPassword(String v) {
-        properties.setProperty(KYLIN_JOB_REMOTE_CLI_PASSWORD, v);
+        return config;
     }
 
-    public String getProperty(String key, String defaultValue) {
-        return properties.getProperty(key, defaultValue);
-    }
+    // ============================================================================
 
-    /**
-     * Set a new key:value into the kylin config.
-     *
-     * @param key
-     * @param value
-     */
-    public void setProperty(String key, String value) {
-        logger.info("Kylin Config was updated with " + key + " : " + value);
-        properties.setProperty(key, value);
+    public void writeProperties(File file) throws IOException {
+        FileOutputStream fos = null;
+        try {
+            fos = new FileOutputStream(file);
+            getAllProperties().store(fos, file.getAbsolutePath());
+        } finally {
+            IOUtils.closeQuietly(fos);
+        }
     }
 
     public String getConfigAsString() throws IOException {
@@ -750,42 +258,12 @@ public class KylinConfig implements Serializable {
     }
 
     private void list(PrintWriter out) {
-        for (Enumeration e = properties.keys() ; e.hasMoreElements() ;) {
-            String key = (String)e.nextElement();
-            String val = (String)properties.get(key);
+        Properties props = getAllProperties();
+        for (Enumeration<?> e = props.keys(); e.hasMoreElements();) {
+            String key = (String) e.nextElement();
+            String val = (String) props.get(key);
             out.println(key + "=" + val);
         }
     }
 
-    public String getSparkHome() {
-        return properties.getProperty(SPARK_HOME);
-    }
-
-    public String getSparkMaster() {
-        return properties.getProperty(SPARK_MASTER);
-    }
-
-    public int getHBaseRegionCut(String capacity) {
-        String cut;
-        switch (capacity) {
-        case "SMALL":
-            cut = getProperty(HBASE_REGION_CUT_SMALL, "10");
-            break;
-        case "MEDIUM":
-            cut = getProperty(HBASE_REGION_CUT_MEDIUM, "20");
-            break;
-        case "LARGE":
-            cut = getProperty(HBASE_REGION_CUT_LARGE, "100");
-            break;
-        default:
-            throw new IllegalArgumentException("Capacity not recognized: " + capacity);
-        }
-
-        return Integer.valueOf(cut);
-    }
-
-    public String toString() {
-        return getMetadataUrl();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
new file mode 100644
index 0000000..302a2db
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.SortedSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+@SuppressWarnings("serial")
+/**
+ * An abstract class to encapsulate access to a set of 'properties'.
+ * Subclass can override methods in this class to extend the content of the 'properties',
+ * with some override values for example.
+ */
+public class KylinConfigBase implements Serializable {
+
+    private static final Logger logger = LoggerFactory.getLogger(KylinConfigBase.class);
+    
+    /*
+     * DON'T DEFINE CONSTANTS FOR PROPERTY KEYS!
+     * 
+     * For 1), no external need to access property keys, all accesses are by public methods.
+     * For 2), it's cumbersome to maintain constants at top and code at bottom.
+     * For 3), key literals usually appear only once.
+     */
+
+    public static String getKylinHome() {
+        String kylinHome = System.getenv("KYLIN_HOME");
+        if (StringUtils.isEmpty(kylinHome)) {
+            logger.warn("KYLIN_HOME was not set");
+        }
+        return kylinHome;
+    }
+
+    // ============================================================================
+    
+    private volatile Properties properties = new Properties();
+    
+    public String getOptional(String prop) {
+        return getOptional(prop, null);
+    }
+
+    public String getOptional(String prop, String dft) {
+        final String property = System.getProperty(prop);
+        return property != null ? property : properties.getProperty(prop, dft);
+    }
+
+    protected String[] getOptionalStringArray(String prop, String[] dft) {
+        final String property = getOptional(prop);
+        if (!StringUtils.isBlank(property)) {
+            return property.split("\\s*,\\s*");
+        } else {
+            return dft;
+        }
+    }
+
+    public String getRequired(String prop) {
+        String r = getOptional(prop);
+        if (StringUtils.isEmpty(r)) {
+            throw new IllegalArgumentException("missing '" + prop + "' in conf/kylin_instance.properties");
+        }
+        return r;
+    }
+    
+    /**
+     * Use with care, properties should be read-only. This is for testing mostly.
+     */
+    public void setProperty(String key, String value) {
+        logger.info("Kylin Config was updated with " + key + " : " + value);
+        properties.setProperty(key, value);
+    }
+    
+    protected Properties getAllProperties() {
+        return properties;
+    }
+
+    protected void reloadKylinConfig(InputStream is) {
+        Properties newProperties = new Properties();
+        try {
+            newProperties.load(is);
+        } catch (IOException e) {
+            throw new RuntimeException("Cannot load kylin config.", e);
+        } finally {
+            IOUtils.closeQuietly(is);
+        }
+        this.properties = newProperties;
+    }
+    
+    // ============================================================================
+    
+    public String getMetadataUrl() {
+        return getOptional("kylin.metadata.url");
+    }
+
+    public void setMetadataUrl(String metadataUrl) {
+        setProperty("kylin.metadata.url", metadataUrl);
+    }
+
+    public String getMetadataUrlPrefix() {
+        String hbaseMetadataUrl = getMetadataUrl();
+        String defaultPrefix = "kylin_metadata";
+
+        if (org.apache.commons.lang3.StringUtils.containsIgnoreCase(hbaseMetadataUrl, "@hbase")) {
+            int cut = hbaseMetadataUrl.indexOf('@');
+            String tmp = cut < 0 ? defaultPrefix : hbaseMetadataUrl.substring(0, cut);
+            return tmp;
+        } else {
+            return defaultPrefix;
+        }
+    }
+
+    public String getServerMode() {
+        return this.getOptional("kylin.server.mode", "all");
+    }
+
+    public String getStorageUrl() {
+        return getOptional("kylin.storage.url");
+    }
+
+    public void setStorageUrl(String storageUrl) {
+        setProperty("kylin.storage.url", storageUrl);
+    }
+
+    /** was for route to hive, not used any more */
+    @Deprecated
+    public String getHiveUrl() {
+        return getOptional("hive.url", "");
+    }
+
+    /** was for route to hive, not used any more */
+    @Deprecated
+    public String getHiveUser() {
+        return getOptional("hive.user", "");
+    }
+
+    /** was for route to hive, not used any more */
+    @Deprecated
+    public String getHivePassword() {
+        return getOptional("hive.password", "");
+    }
+
+    public String getHdfsWorkingDirectory() {
+        String root = getRequired("kylin.hdfs.working.dir");
+        if (!root.endsWith("/")) {
+            root += "/";
+        }
+        return root + getMetadataUrlPrefix() + "/";
+    }
+
+    public CliCommandExecutor getCliCommandExecutor() throws IOException {
+        CliCommandExecutor exec = new CliCommandExecutor();
+        if (getRunAsRemoteCommand()) {
+            exec.setRunAtRemote(getRemoteHadoopCliHostname(), getRemoteHadoopCliPort(), getRemoteHadoopCliUsername(), getRemoteHadoopCliPassword());
+        }
+        return exec;
+    }
+
+    public String getHBaseClusterFs() {
+        return getOptional("kylin.hbase.cluster.fs", "");
+    }
+
+    public String getKylinJobLogDir() {
+        return getOptional("kylin.job.log.dir", "/tmp/kylin/logs");
+    }
+
+    public String getKylinJobJarPath() {
+        final String jobJar = getOptional("kylin.job.jar");
+        if (StringUtils.isNotEmpty(jobJar)) {
+            return jobJar;
+        }
+        String kylinHome = getKylinHome();
+        if (StringUtils.isEmpty(kylinHome)) {
+            return "";
+        }
+        return getFileName(kylinHome + File.separator + "lib", JOB_JAR_NAME_PATTERN);
+    }
+
+    public void overrideMRJobJarPath(String path) {
+        logger.info("override " + "kylin.job.jar" + " to " + path);
+        System.setProperty("kylin.job.jar", path);
+    }
+
+    public String getKylinJobMRLibDir() {
+        return getOptional("kylin.job.mr.lib.dir", "");
+    }
+
+    public String getKylinSparkJobJarPath() {
+        final String jobJar = getOptional("kylin.job.jar.spark");
+        if (StringUtils.isNotEmpty(jobJar)) {
+            return jobJar;
+        }
+        String kylinHome = getKylinHome();
+        if (StringUtils.isEmpty(kylinHome)) {
+            return "";
+        }
+        return getFileName(kylinHome + File.separator + "lib", SPARK_JOB_JAR_NAME_PATTERN);
+    }
+
+    public void overrideSparkJobJarPath(String path) {
+        logger.info("override " + "kylin.job.jar.spark" + " to " + path);
+        System.setProperty("kylin.job.jar.spark", path);
+    }
+
+    private static final Pattern COPROCESSOR_JAR_NAME_PATTERN = Pattern.compile("kylin-coprocessor-(.+)\\.jar");
+    private static final Pattern JOB_JAR_NAME_PATTERN = Pattern.compile("kylin-job-(.+)\\.jar");
+    private static final Pattern SPARK_JOB_JAR_NAME_PATTERN = Pattern.compile("kylin-engine-spark-(.+)\\.jar");
+
+    public String getCoprocessorLocalJar() {
+        final String coprocessorJar = getOptional("kylin.coprocessor.local.jar");
+        if (StringUtils.isNotEmpty(coprocessorJar)) {
+            return coprocessorJar;
+        }
+        String kylinHome = getKylinHome();
+        if (StringUtils.isEmpty(kylinHome)) {
+            throw new RuntimeException("getCoprocessorLocalJar needs KYLIN_HOME");
+        }
+        return getFileName(kylinHome + File.separator + "lib", COPROCESSOR_JAR_NAME_PATTERN);
+    }
+
+    public void overrideCoprocessorLocalJar(String path) {
+        logger.info("override " + "kylin.coprocessor.local.jar" + " to " + path);
+        System.setProperty("kylin.coprocessor.local.jar", path);
+    }
+
+    private static String getFileName(String homePath, Pattern pattern) {
+        File home = new File(homePath);
+        SortedSet<String> files = Sets.newTreeSet();
+        if (home.exists() && home.isDirectory()) {
+            for (File file : home.listFiles()) {
+                final Matcher matcher = pattern.matcher(file.getName());
+                if (matcher.matches()) {
+                    files.add(file.getAbsolutePath());
+                }
+            }
+        }
+        if (files.isEmpty()) {
+            throw new RuntimeException("cannot find " + pattern.toString() + " in " + homePath);
+        } else {
+            return files.last();
+        }
+    }
+
+    public double getDefaultHadoopJobReducerInputMB() {
+        return Double.parseDouble(getOptional("kylin.job.mapreduce.default.reduce.input.mb", "500"));
+    }
+
+    public double getDefaultHadoopJobReducerCountRatio() {
+        return Double.parseDouble(getOptional("kylin.job.mapreduce.default.reduce.count.ratio", "1.0"));
+    }
+
+    public int getHadoopJobMaxReducerNumber() {
+        return Integer.parseInt(getOptional("kylin.job.mapreduce.max.reducer.number", "500"));
+    }
+
+    public boolean getRunAsRemoteCommand() {
+        return Boolean.parseBoolean(getOptional("kylin.job.run.as.remote.cmd"));
+    }
+
+    public void setRunAsRemoteCommand(String v) {
+        setProperty("kylin.job.run.as.remote.cmd", v);
+    }
+
+    public int getRemoteHadoopCliPort() {
+        return Integer.parseInt(getOptional("kylin.job.remote.cli.port", "22"));
+    }
+
+    public String getRemoteHadoopCliHostname() {
+        return getOptional("kylin.job.remote.cli.hostname");
+    }
+
+    public void setRemoteHadoopCliHostname(String v) {
+        setProperty("kylin.job.remote.cli.hostname", v);
+    }
+
+    public String getRemoteHadoopCliUsername() {
+        return getOptional("kylin.job.remote.cli.username");
+    }
+
+    public void setRemoteHadoopCliUsername(String v) {
+        setProperty("kylin.job.remote.cli.username", v);
+    }
+
+    public String getRemoteHadoopCliPassword() {
+        return getOptional("kylin.job.remote.cli.password");
+    }
+
+    public void setRemoteHadoopCliPassword(String v) {
+        setProperty("kylin.job.remote.cli.password", v);
+    }
+
+    public String getCliWorkingDir() {
+        return getOptional("kylin.job.remote.cli.working.dir");
+    }
+
+    public String getMapReduceCmdExtraArgs() {
+        return getOptional("kylin.job.cmd.extra.args");
+    }
+
+    public String getOverrideHiveTableLocation(String table) {
+        return getOptional("hive.table.location." + table.toUpperCase());
+    }
+
+    public String getYarnStatusCheckUrl() {
+        return getOptional("kylin.job.yarn.app.rest.check.status.url", null);
+    }
+
+    public int getYarnStatusCheckIntervalSeconds() {
+        return Integer.parseInt(getOptional("kylin.job.yarn.app.rest.check.interval.seconds", "60"));
+    }
+
+    public int getMaxConcurrentJobLimit() {
+        return Integer.parseInt(getOptional("kylin.job.concurrent.max.limit", "10"));
+    }
+
+    public String getTimeZone() {
+        return getOptional("kylin.rest.timezone", "PST");
+    }
+
+    public String[] getRestServers() {
+        return getOptionalStringArray("kylin.rest.servers", new String[0]);
+    }
+
+    public String getAdminDls() {
+        return getOptional("kylin.job.admin.dls", null);
+    }
+
+    public long getJobStepTimeout() {
+        return Long.parseLong(getOptional("kylin.job.step.timeout", String.valueOf(2 * 60 * 60)));
+    }
+    
+    public String getCubeAlgorithm() {
+        return getOptional("kylin.cube.algorithm", "auto");
+    }
+    
+    public double getCubeAlgorithmAutoThreshold() {
+        return Double.parseDouble(getOptional("kylin.cube.algorithm.auto.threshold", "8"));
+    }
+    
+    public int getDictionaryMaxCardinality() {
+        return Integer.parseInt(getOptional("kylin.dictionary.max.cardinality", "5000000"));
+    }
+
+    public int getTableSnapshotMaxMB() {
+        return Integer.parseInt(getOptional("kylin.table.snapshot.max_mb", "300"));
+    }
+
+    public int getHBaseRegionCut(String capacity) {
+        String cut;
+        switch (capacity) {
+        case "SMALL":
+            cut = getOptional("kylin.hbase.region.cut.small", "10");
+            break;
+        case "MEDIUM":
+            cut = getOptional("kylin.hbase.region.cut.medium", "20");
+            break;
+        case "LARGE":
+            cut = getOptional("kylin.hbase.region.cut.large", "100");
+            break;
+        default:
+            throw new IllegalArgumentException("Capacity not recognized: " + capacity);
+        }
+
+        return Integer.valueOf(cut);
+    }
+
+    public int getHBaseRegionCountMin() {
+        return Integer.parseInt(getOptional("kylin.hbase.region.count.min", "1"));
+    }
+
+    public int getHBaseRegionCountMax() {
+        return Integer.parseInt(getOptional("kylin.hbase.region.count.max", "500"));
+    }
+
+    public int getScanThreshold() {
+        return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000"));
+    }
+
+    public boolean getQueryRunLocalCoprocessor() {
+        return Boolean.parseBoolean(getOptional("kylin.query.run.local.coprocessor", "false"));
+    }
+
+    public Long getQueryDurationCacheThreshold() {
+        return Long.parseLong(this.getOptional("kylin.query.cache.threshold.duration", String.valueOf(2000)));
+    }
+
+    public Long getQueryScanCountCacheThreshold() {
+        return Long.parseLong(this.getOptional("kylin.query.cache.threshold.scancount", String.valueOf(10 * 1024)));
+    }
+
+    public long getQueryMemBudget() {
+        return Long.parseLong(this.getOptional("kylin.query.mem.budget", String.valueOf(3L * 1024 * 1024 * 1024)));
+    }
+
+    public boolean isQuerySecureEnabled() {
+        return Boolean.parseBoolean(this.getOptional("kylin.query.security.enabled", "false"));
+    }
+
+    public boolean isQueryCacheEnabled() {
+        return Boolean.parseBoolean(this.getOptional("kylin.query.cache.enabled", "true"));
+    }
+
+    public boolean isQueryIgnoreUnknownFunction() {
+        return Boolean.parseBoolean(this.getOptional("kylin.query.ignore_unknown_function", "false"));
+    }
+
+    public int getHBaseKeyValueSize() {
+        return Integer.parseInt(this.getOptional("kylin.hbase.client.keyvalue.maxsize", "10485760"));
+    }
+
+    public int getHBaseScanCacheRows() {
+        return Integer.parseInt(this.getOptional("kylin.hbase.scan.cache_rows", "1024"));
+    }
+
+    public int getHBaseScanMaxResultSize() {
+        return Integer.parseInt(this.getOptional("kylin.hbase.scan.max_result_size", "" + (5 * 1024 * 1024))); // 5 MB
+    }
+
+    public int getCubingInMemSamplingPercent() {
+        int percent = Integer.parseInt(this.getOptional("kylin.job.cubing.inMem.sampling.percent", "100"));
+        percent = Math.max(percent, 1);
+        percent = Math.min(percent, 100);
+        return percent;
+    }
+
+    public String getHbaseDefaultCompressionCodec() {
+        return getOptional("kylin.hbase.default.compression.codec", "");
+    }
+
+    public boolean isHiveKeepFlatTable() {
+        return Boolean.parseBoolean(this.getOptional("kylin.hive.keep.flat.table", "false"));
+    }
+
+    public String getHiveDatabaseForIntermediateTable() {
+        return this.getOptional("kylin.job.hive.database.for.intermediatetable", "default");
+    }
+
+    public String getKylinOwner() {
+        return this.getOptional("kylin.owner", "");
+    }
+
+    public String getSparkHome() {
+        return getRequired("kylin.spark.home");
+    }
+
+    public String getSparkMaster() {
+        return getRequired("kylin.spark.master");
+    }
+
+    public boolean isMailEnabled() {
+        return Boolean.parseBoolean(getOptional("mail.enabled", "false"));
+    }
+
+    public void setMailEnabled(boolean enable) {
+        setProperty("mail.enabled", "" + enable);
+    }
+
+    public String getMailHost() {
+        return getOptional("mail.host", "");
+    }
+
+    public String getMailUsername() {
+        return getOptional("mail.username", "");
+    }
+
+    public String getMailPassword() {
+        return getOptional("mail.password", "");
+    }
+
+    public String getMailSender() {
+        return getOptional("mail.sender", "");
+    }
+
+    public String toString() {
+        return getMetadataUrl();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
new file mode 100644
index 0000000..8af35b6
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common;
+
+public class KylinConfigExt {
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-common/src/main/java/org/apache/kylin/common/util/MailService.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/MailService.java b/core-common/src/main/java/org/apache/kylin/common/util/MailService.java
index 3b0a4e1..9a4ec64 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/MailService.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/MailService.java
@@ -42,7 +42,7 @@ public class MailService {
     private static final Log logger = LogFactory.getLog(MailService.class);
 
     public MailService(KylinConfig config) {
-        this("true".equalsIgnoreCase(config.getProperty(KylinConfig.MAIL_ENABLED, "false")), config.getProperty(KylinConfig.MAIL_HOST, ""), config.getProperty(KylinConfig.MAIL_USERNAME, ""), config.getProperty(KylinConfig.MAIL_PASSWORD, ""), config.getProperty(KylinConfig.MAIL_SENDER, ""));
+        this(config.isMailEnabled(), config.getMailHost(), config.getMailUsername(), config.getMailPassword(), config.getMailSender());
     }
 
     private MailService(boolean enabled, String host, String username, String password, String sender) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java b/core-common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
index bc4d7cf..df06221 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
@@ -51,8 +51,8 @@ public class MailServiceTest extends LocalFileMetadataTestCase {
         boolean sent = sendTestEmail(mailservice);
         assert sent;
 
-        // set mail.enabled=false, and run again, this time should be no mail delviered
-        config.setProperty(KylinConfig.MAIL_ENABLED, "false");
+        // set mail.enabled=false, and run again, this time should be no mail delivered
+        config.setMailEnabled(false);
         mailservice = new MailService(config);
         sent = sendTestEmail(mailservice);
         assert !sent;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 2208136..41c1f41 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -297,16 +297,6 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
         return result;
     }
 
-    public List<CubeSegment> getSegment(SegmentStatusEnum status) {
-        List<CubeSegment> result = Lists.newArrayList();
-        for (CubeSegment segment : segments) {
-            if (segment.getStatus() == status) {
-                result.add(segment);
-            }
-        }
-        return result;
-    }
-
     public CubeSegment getSegment(String name, SegmentStatusEnum status) {
         for (CubeSegment segment : segments) {
             if ((null != segment.getName() && segment.getName().equals(name)) && segment.getStatus() == status) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 4592b15..c5ea962 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -270,7 +270,7 @@ public class CubeManager implements IRealizationProvider {
     }
 
     private boolean validateReadySegments(CubeInstance cube) {
-        final List<CubeSegment> readySegments = cube.getSegment(SegmentStatusEnum.READY);
+        final List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY);
         if (readySegments.size() == 0) {
             return true;
         }
@@ -474,7 +474,7 @@ public class CubeManager implements IRealizationProvider {
     }
 
     private Pair<Long, Long> alignMergeRange(CubeInstance cube, long startDate, long endDate) {
-        List<CubeSegment> readySegments = cube.getSegment(SegmentStatusEnum.READY);
+        List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY);
         if (readySegments.isEmpty()) {
             throw new IllegalStateException("there are no segments in ready state");
         }
@@ -609,7 +609,7 @@ public class CubeManager implements IRealizationProvider {
             return null;
         }
 
-        List<CubeSegment> readySegments = Lists.newArrayList(cube.getSegment(SegmentStatusEnum.READY));
+        List<CubeSegment> readySegments = Lists.newArrayList(cube.getSegments(SegmentStatusEnum.READY));
 
         if (readySegments.size() == 0) {
             logger.debug("Cube " + cube.getName() + " has no ready segment to merge");

http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index e78dea5..6f3dc00 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -124,6 +125,8 @@ public class CubeDesc extends RootPersistentEntity {
     private int engineType = IEngineAware.ID_MR_V1;
     @JsonProperty("storage_type")
     private int storageType = IStorageAware.ID_HBASE;
+    @JsonProperty("override_kylin_properties")
+    private LinkedHashMap<String, String> overrideKylinProps = new LinkedHashMap<String, String>();
 
     private Map<String, Map<String, TblColRef>> columnMap = new HashMap<String, Map<String, TblColRef>>();
     private LinkedHashSet<TblColRef> allColumns = new LinkedHashSet<TblColRef>();

http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
index 6d1d39b..c1f55d1 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
@@ -65,7 +65,7 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase {
 
         // assert one ready segment
         assertEquals(1, cube.getSegments().size());
-        CubeSegment seg = cube.getSegment(SegmentStatusEnum.READY).get(0);
+        CubeSegment seg = cube.getSegments(SegmentStatusEnum.READY).get(0);
         assertEquals(SegmentStatusEnum.READY, seg.getStatus());
 
         // append again, for non-partitioned cube, it becomes a full refresh

http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 7ba328d..18d3193 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -269,6 +269,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
     protected final void addExtraInfo(String key, String value) {
         executableManager.addJobInfo(getId(), key, value);
     }
+    
+    protected final Map<String, String> getExtraInfo() {
+        return executableManager.getOutput(getId()).getExtra();
+    }
 
     public final void setStartTime(long time) {
         addExtraInfo(START_TIME, time + "");

http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index f8fbc33..86439a8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -19,10 +19,13 @@
 package org.apache.kylin.engine.mr;
 
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.RowKeyDesc;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
 import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
+import org.apache.kylin.engine.mr.steps.NDCuboidJob;
 import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.slf4j.Logger;
@@ -37,14 +40,15 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
     public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
         super(newSegment, submitter);
         this.inputSide = MRUtil.getBatchCubingInputSide(seg);
-        this.outputSide = MRUtil.getBatchCubingOutputSide2((CubeSegment)seg);
+        this.outputSide = MRUtil.getBatchCubingOutputSide2((CubeSegment) seg);
     }
 
     public CubingJob build() {
         logger.info("MR_V2 new job to BUILD segment " + seg);
-        
-        final CubingJob result = CubingJob.createBuildJob((CubeSegment)seg, submitter, config);
+
+        final CubingJob result = CubingJob.createBuildJob((CubeSegment) seg, submitter, config);
         final String jobId = result.getId();
+        final String cuboidRootPath = getCuboidRootPath(jobId);
 
         // Phase 1: Create Flat Table
         inputSide.addStepPhase1_CreateFlatTable(result);
@@ -56,8 +60,9 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         outputSide.addStepPhase2_BuildDictionary(result);
 
         // Phase 3: Build Cube
-        result.addTask(createInMemCubingStep(jobId));
-        outputSide.addStepPhase3_BuildCube(result);
+        addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
+        result.addTask(createInMemCubingStep(jobId, cuboidRootPath)); // inmem cubing, only selected algorithm will execute
+        outputSide.addStepPhase3_BuildCube(result, cuboidRootPath);
 
         // Phase 4: Update Metadata & Cleanup
         result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
@@ -67,28 +72,44 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         return result;
     }
 
+    private void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
+        RowKeyDesc rowKeyDesc = ((CubeSegment) seg).getCubeDesc().getRowkey();
+        final int groupRowkeyColumnsCount = ((CubeSegment) seg).getCubeDesc().getBuildLevel();
+        final int totalRowkeyColumnsCount = rowKeyDesc.getRowKeyColumns().length;
+        final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
+        // base cuboid step
+        result.addTask(createBaseCuboidStep(cuboidOutputTempPath, jobId));
+        // n dim cuboid steps
+        for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
+            int dimNum = totalRowkeyColumnsCount - i;
+            result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount, jobId));
+        }
+    }
+
     private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
         SaveStatisticsStep result = new SaveStatisticsStep();
         result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
         result.setCubeName(seg.getRealization().getName());
         result.setSegmentId(seg.getUuid());
         result.setStatisticsPath(getStatisticsPath(jobId));
+        result.setCubingJobId(jobId);
         return result;
     }
 
-    private MapReduceExecutable createInMemCubingStep(String jobId) {
+    private MapReduceExecutable createInMemCubingStep(String jobId, String cuboidRootPath) {
         // base cuboid job
         MapReduceExecutable cubeStep = new MapReduceExecutable();
 
         StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
+        appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
 
         cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
 
         appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "output", cuboidRootPath);
         appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getRealization().getName());
-        appendExecCmdParameters(cmd, "jobflowid", jobId);
+        appendExecCmdParameters(cmd, "cubingJobId", jobId);
 
         cubeStep.setMapReduceParams(cmd.toString());
         cubeStep.setMapReduceJobClass(InMemCuboidJob.class);
@@ -96,4 +117,61 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         return cubeStep;
     }
 
+    private MapReduceExecutable createBaseCuboidStep(String[] cuboidOutputTempPath, String jobId) {
+        // base cuboid job
+        MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
+
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
+
+        baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
+
+        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "input", "FLAT_TABLE"); // marks flat table input
+        appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
+        appendExecCmdParameters(cmd, "level", "0");
+        appendExecCmdParameters(cmd, "cubingJobId", jobId);
+
+        baseCuboidStep.setMapReduceParams(cmd.toString());
+        baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
+        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
+        return baseCuboidStep;
+    }
+
+    private MapReduceExecutable createNDimensionCuboidStep(String[] cuboidOutputTempPath, int dimNum, int totalRowkeyColumnCount, String jobId) {
+        // ND cuboid job
+        MapReduceExecutable ndCuboidStep = new MapReduceExecutable();
+
+        ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension");
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
+        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
+        appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
+        appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
+        appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum));
+        appendExecCmdParameters(cmd, "cubingJobId", jobId);
+
+        ndCuboidStep.setMapReduceParams(cmd.toString());
+        ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class);
+        return ndCuboidStep;
+    }
+
+    private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
+        String[] paths = new String[groupRowkeyColumnsCount + 1];
+        for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
+            int dimNum = totalRowkeyColumnCount - i;
+            if (dimNum == totalRowkeyColumnCount) {
+                paths[i] = cuboidRootPath + "base_cuboid";
+            } else {
+                paths[i] = cuboidRootPath + dimNum + "d_cuboid";
+            }
+        }
+        return paths;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index 48a717f..008d489 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -23,7 +23,7 @@ import java.util.List;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.steps.MergeCuboidFromStorageJob;
+import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
 import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.slf4j.Logger;
@@ -39,23 +39,24 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
 
     public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
         super(mergeSegment, submitter);
-        this.outputSide = MRUtil.getBatchMergeOutputSide2((CubeSegment)seg);
+        this.outputSide = MRUtil.getBatchMergeOutputSide2((CubeSegment) seg);
     }
 
     public CubingJob build() {
         logger.info("MR_V2 new job to MERGE segment " + seg);
-        
-        final CubeSegment cubeSegment = (CubeSegment)seg;
+
+        final CubeSegment cubeSegment = (CubeSegment) seg;
         final CubingJob result = CubingJob.createMergeJob(cubeSegment, submitter, config);
         final String jobId = result.getId();
+        final String cuboidRootPath = getCuboidRootPath(jobId);
 
         final List<CubeSegment> mergingSegments = cubeSegment.getCubeInstance().getMergingSegments(cubeSegment);
         Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
         final List<String> mergingSegmentIds = Lists.newArrayList();
-        final List<String> mergingHTables = Lists.newArrayList();
+        final List<String> mergingCuboidPaths = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingSegmentIds.add(merging.getUuid());
-            mergingHTables.add(merging.getStorageLocationIdentifier());
+            mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
         }
 
         // Phase 1: Merge Dictionary
@@ -63,10 +64,10 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
         result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
         outputSide.addStepPhase1_MergeDictionary(result);
 
-        // Phase 2: Merge Cube
-        String formattedTables = StringUtil.join(mergingHTables, ",");
-        result.addTask(createMergeCuboidDataFromStorageStep(formattedTables, jobId));
-        outputSide.addStepPhase2_BuildCube(result);
+        // Phase 2: Merge Cube Files
+        String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
+        result.addTask(createMergeCuboidDataStep(cubeSegment, formattedPath, cuboidRootPath));
+        outputSide.addStepPhase2_BuildCube(result, cuboidRootPath);
 
         // Phase 3: Update Metadata & Cleanup
         result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
@@ -85,20 +86,20 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
         return result;
     }
 
-    private MapReduceExecutable createMergeCuboidDataFromStorageStep(String inputTableNames, String jobId) {
+    private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) {
         MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
         mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
         StringBuilder cmd = new StringBuilder();
 
-        appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
-        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
+        appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getRealization().getName() + "_Step");
-        appendExecCmdParameters(cmd, "jobflowid", jobId);
+        appendExecCmdParameters(cmd, "input", inputPath);
+        appendExecCmdParameters(cmd, "output", outputPath);
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
 
         mergeCuboidDataStep.setMapReduceParams(cmd.toString());
-        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromStorageJob.class);
-        mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class);
         return mergeCuboidDataStep;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index bee030f..32e0f21 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -46,6 +46,10 @@ import org.apache.kylin.job.execution.Output;
 /**
  */
 public class CubingJob extends DefaultChainedExecutable {
+    
+    public static enum AlgorithmEnum {
+        LAYER, INMEM
+    }
 
     // KEYS of Output.extraInfo map, info passed across job steps
     public static final String SOURCE_RECORD_COUNT = "sourceRecordCount";
@@ -169,7 +173,24 @@ public class CubingJob extends DefaultChainedExecutable {
     public void setMapReduceWaitTime(long t) {
         addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
     }
+    
+    public void setAlgorithm(AlgorithmEnum alg) {
+        addExtraInfo("algorithm", alg.name());
+    }
+    
+    public AlgorithmEnum getAlgorithm() {
+        String alg = getExtraInfo().get("algorithm");
+        return alg == null ? null : AlgorithmEnum.valueOf(alg);
+    }
 
+    public boolean isLayerCubing() {
+        return AlgorithmEnum.LAYER == getAlgorithm();
+    }
+    
+    public boolean isInMemCubing() {
+        return AlgorithmEnum.INMEM == getAlgorithm();
+    }
+    
     public long findSourceRecordCount() {
         return Long.parseLong(findExtraInfo(SOURCE_RECORD_COUNT, "0"));
     }
@@ -206,5 +227,5 @@ public class CubingJob extends DefaultChainedExecutable {
         }
         return dft;
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
index 3ad51c5..844eb07 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -1,13 +1,23 @@
-package org.apache.kylin.engine.mr;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
 
-import java.io.IOException;
+package org.apache.kylin.engine.mr;
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
@@ -17,93 +27,59 @@ public interface IMROutput2 {
     public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg);
 
     /**
-     * Participate the batch cubing flow as the output side.
+     * Participate the batch cubing flow as the output side. Responsible for saving
+     * the cuboid output to storage at the end of Phase 3.
      * 
      * - Phase 1: Create Flat Table
      * - Phase 2: Build Dictionary
-     * - Phase 3: Build Cube (with StorageOutputFormat)
+     * - Phase 3: Build Cube
      * - Phase 4: Update Metadata & Cleanup
      */
     public interface IMRBatchCubingOutputSide2 {
 
-        /** Return an output format for Phase 3: Build Cube MR */
-        public IMRStorageOutputFormat getStorageOutputFormat();
-
         /** Add step that executes after build dictionary and before build cube. */
         public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow);
 
-        /** Add step that executes after build cube. */
-        public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
-
-        /** Add step that does any necessary clean up. */
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
-    }
-
-    public IMRBatchMergeInputSide2 getBatchMergeInputSide(CubeSegment seg);
-
-    public interface IMRBatchMergeInputSide2 {
-        public IMRStorageInputFormat getStorageInputFormat();
-    }
-
-    /** Read in a cube as input of merge. Configure the input file format of mapper. */
-    @SuppressWarnings("rawtypes")
-    public interface IMRStorageInputFormat {
-
-        /** Configure MR mapper class and input file format. */
-        public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException;
-
-        /** Given a mapper context, figure out which segment the mapper reads from. */
-        public CubeSegment findSourceSegment(Mapper.Context context) throws IOException;
-
         /**
-         * Read in a row of cuboid. Given the input KV, de-serialize back cuboid ID, dimensions, and measures.
+         * Add step that saves cuboids from HDFS to storage.
          * 
-         * @return <code>ByteArrayWritable</code> is the cuboid ID (8 bytes) + dimension values in dictionary encoding
-         *         <code>Object[]</code> is the measure values in order of <code>CubeDesc.getMeasures()</code>
+         * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn, 
+         * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+         * dictionary encoding; Mx is measure value serialization form.
          */
-        public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue);
+        public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
+
+        /** Add step that does any necessary clean up. */
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
 
     /** Return a helper to participate in batch merge job flow. */
     public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(CubeSegment seg);
 
     /**
-     * Participate the batch merge flow as the output side.
+     * Participate the batch cubing flow as the output side. Responsible for saving
+     * the cuboid output to storage at the end of Phase 2.
      * 
      * - Phase 1: Merge Dictionary
-     * - Phase 2: Merge Cube (with StorageInputFormat & StorageOutputFormat)
+     * - Phase 2: Merge Cube
      * - Phase 3: Update Metadata & Cleanup
      */
     public interface IMRBatchMergeOutputSide2 {
 
-        /** Return an input format for Phase 2: Merge Cube MR */
-        public IMRStorageOutputFormat getStorageOutputFormat();
-
         /** Add step that executes after merge dictionary and before merge cube. */
         public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
 
-        /** Add step that executes after merge cube. */
-        public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow);
+        /**
+         * Add step that saves cuboid output from HDFS to storage.
+         * 
+         * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn, 
+         * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+         * dictionary encoding; Mx is measure value serialization form.
+         */
+        public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
 
         /** Add step that does any necessary clean up. */
         public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
     }
 
-    /** Write out a cube. Configure the output file format of reducer and do the actual K-V output. */
-    @SuppressWarnings("rawtypes")
-    public interface IMRStorageOutputFormat {
-        
-        /** Configure MR reducer class and output file format. */
-        public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException;
-
-        /**
-         * Write out a row of cuboid. Given the cuboid ID, dimensions, and measures, serialize in whatever
-         * way and output to reducer context.
-         * 
-         * @param key     The cuboid ID (8 bytes) + dimension values in dictionary encoding
-         * @param value   The measure values in order of <code>CubeDesc.getMeasures()</code>
-         * @param context The reducer context output goes to
-         */
-        public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 55fa9e2..41c8b6b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -7,12 +7,10 @@ import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
 import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
-import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeInputSide2;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.IRealizationSegment;
 import org.apache.kylin.source.SourceFactory;
 import org.apache.kylin.storage.StorageFactory;
@@ -47,10 +45,6 @@ public class MRUtil {
         return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchCubingOutputSide(seg);
     }
 
-    public static IMRBatchMergeInputSide2 getBatchMergeInputSide2(CubeSegment seg) {
-        return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeInputSide(seg);
-    }
-
     public static IMRBatchMergeOutputSide2 getBatchMergeOutputSide2(CubeSegment seg) {
         return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index f031f76..748bac9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -72,8 +72,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     protected static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class);
 
     protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname");
-    protected static final Option OPTION_JOB_FLOW_ID = OptionBuilder.withArgName("job flow ID").hasArg().isRequired(true).withDescription("job flow ID").create("jobflowid");
     protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename");
+    protected static final Option OPTION_CUBING_JOB_ID = OptionBuilder.withArgName("cubingJobId").hasArg().isRequired(false).withDescription("ID of cubing job executable").create("cubingJobId");
     protected static final Option OPTION_II_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create("iiname");
     protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname");
     protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Hive table name.").create("tablename");
@@ -477,4 +477,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         return this.job;
     }
 
+    // tells MapReduceExecutable to skip this job
+    public boolean isSkipped() {
+        return false;
+    }
+
 }