You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/24 01:20:27 UTC

[19/28] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
new file mode 100644
index 0000000..f62df42
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+/**
+ * @author George Song (ysong1)
+ *
+ */
+
+import static org.apache.hadoop.util.StringUtils.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.StringSplitter;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.cmd.ShellCmdOutput;
+import org.apache.kylin.job.common.OptionsHelper;
+import org.apache.kylin.job.exception.JobException;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("static-access")
+public abstract class AbstractHadoopJob extends Configured implements Tool {
+    protected static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class);
+
+    protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname");
+    protected static final Option OPTION_JOB_FLOW_ID = OptionBuilder.withArgName("job flow ID").hasArg().isRequired(true).withDescription("job flow ID").create("jobflowid");
+    protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename");
+    protected static final Option OPTION_II_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create("iiname");
+    protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname");
+    protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Hive table name.").create("tablename");
+    protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
+    protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat");
+    protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("inputdelim").hasArg().isRequired(false).withDescription("Input delimeter").create("inputdelim");
+    protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output");
+    protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level");
+    protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("input");
+    protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename");
+    protected static final Option OPTION_KEY_COLUMN_PERCENTAGE = OptionBuilder.withArgName("rowkey column percentage").hasArg().isRequired(true).withDescription("Percentage of row key columns").create("columnpercentage");
+    protected static final Option OPTION_KEY_SPLIT_NUMBER = OptionBuilder.withArgName("key split number").hasArg().isRequired(true).withDescription("Number of key split range").create("splitnumber");
+
+    protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder.withArgName("statisticsenabled").hasArg().isRequired(false).withDescription("Statistics enabled").create("statisticsenabled");
+    protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName("statisticsoutput").hasArg().isRequired(false).withDescription("Statistics output").create("statisticsoutput");
+    protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder.withArgName("statisticssamplingpercent").hasArg().isRequired(false).withDescription("Statistics sampling percentage").create("statisticssamplingpercent");
+
+    protected String name;
+    protected String description;
+    protected boolean isAsync = false;
+    protected OptionsHelper optionsHelper = new OptionsHelper();
+
+    protected Job job;
+
+    public AbstractHadoopJob() {
+        super(HadoopUtil.newHadoopJobConfiguration());
+    }
+
+    protected void parseOptions(Options options, String[] args) throws ParseException {
+        optionsHelper.parseOptions(options, args);
+    }
+
+    public void printUsage(Options options) {
+        optionsHelper.printUsage(getClass().getSimpleName(), options);
+    }
+
+    public Option[] getOptions() {
+        return optionsHelper.getOptions();
+    }
+
+    public String getOptionsAsString() {
+        return optionsHelper.getOptionsAsString();
+    }
+
+    protected String getOptionValue(Option option) {
+        return optionsHelper.getOptionValue(option);
+    }
+
+    protected boolean hasOption(Option option) {
+        return optionsHelper.hasOption(option);
+    }
+
+    protected int waitForCompletion(Job job) throws IOException, InterruptedException, ClassNotFoundException {
+        int retVal = 0;
+        long start = System.nanoTime();
+        if (isAsync) {
+            job.submit();
+        } else {
+            job.waitForCompletion(true);
+            retVal = job.isSuccessful() ? 0 : 1;
+            logger.debug("Job '" + job.getJobName() + "' finished " + (job.isSuccessful() ? "successfully in " : "with failures.  Time taken ") + formatTime((System.nanoTime() - start) / 1000000L));
+        }
+        return retVal;
+    }
+
+    protected static void runJob(Tool job, String[] args) {
+        try {
+            int exitCode = ToolRunner.run(job, args);
+            System.exit(exitCode);
+        } catch (Exception e) {
+            e.printStackTrace(System.err);
+            System.exit(5);
+        }
+    }
+
+    private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
+
+    protected void setJobClasspath(Job job) {
+        String jarPath = KylinConfig.getInstanceFromEnv().getKylinJobJarPath();
+        File jarFile = new File(jarPath);
+        if (jarFile.exists()) {
+            job.setJar(jarPath);
+            logger.info("append job jar: " + jarPath);
+        } else {
+            job.setJarByClass(this.getClass());
+        }
+
+        String kylinHiveDependency = System.getProperty("kylin.hive.dependency");
+        String kylinHBaseDependency = System.getProperty("kylin.hbase.dependency");
+        logger.info("append kylin.hive.dependency: " + kylinHiveDependency + " and kylin.hive.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH);
+
+        Configuration jobConf = job.getConfiguration();
+        String classpath = jobConf.get(MAP_REDUCE_CLASSPATH);
+        if (classpath == null || classpath.length() == 0) {
+            logger.info("Didn't find " + MAP_REDUCE_CLASSPATH + " in job configuration, will run 'mapred classpath' to get the default value.");
+            classpath = getDefaultMapRedClasspath();
+            logger.info("The default mapred classpath is: " + classpath);
+        }
+
+
+        if (kylinHBaseDependency != null) {
+            // yarn classpath is comma separated
+            kylinHBaseDependency = kylinHBaseDependency.replace(":", ",");
+            classpath = classpath + "," + kylinHBaseDependency;
+        }
+
+        if (kylinHiveDependency != null) {
+            // yarn classpath is comma separated
+            kylinHiveDependency = kylinHiveDependency.replace(":", ",");
+            classpath = classpath + "," + kylinHiveDependency;
+        }
+
+        jobConf.set(MAP_REDUCE_CLASSPATH, classpath + "," + kylinHiveDependency);
+        logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH));
+    }
+
+
+    private String getDefaultMapRedClasspath() {
+
+        String classpath = "";
+        try {
+            CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
+            ShellCmdOutput output = new ShellCmdOutput();
+            executor.execute("mapred classpath", output);
+
+            classpath = output.getOutput().trim().replace(':', ',');
+        } catch (IOException e) {
+            logger.error("Failed to run: 'mapred classpath'.", e);
+        }
+
+        return classpath;
+    }
+
+
+    public void addInputDirs(String input, Job job) throws IOException {
+        for (String inp : StringSplitter.split(input, ",")) {
+            inp = inp.trim();
+            if (inp.endsWith("/*")) {
+                inp = inp.substring(0, inp.length() - 2);
+                FileSystem fs = FileSystem.get(job.getConfiguration());
+                Path path = new Path(inp);
+                FileStatus[] fileStatuses = fs.listStatus(path);
+                boolean hasDir = false;
+                for (FileStatus stat : fileStatuses) {
+                    if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
+                        hasDir = true;
+                        addInputDirs(stat.getPath().toString(), job);
+                    }
+                }
+                if (fileStatuses.length > 0 && !hasDir) {
+                    addInputDirs(path.toString(), job);
+                }
+            } else {
+                logger.debug("Add input " + inp);
+                FileInputFormat.addInputPath(job, new Path(inp));
+            }
+        }
+    }
+
+    public static KylinConfig loadKylinPropsAndMetadata() throws IOException {
+        File metaDir = new File("meta");
+        System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
+        logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath());
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        kylinConfig.setMetadataUrl(metaDir.getCanonicalPath());
+        return kylinConfig;
+    }
+
+    protected void attachKylinPropsAndMetadata(TableDesc table, Configuration conf) throws IOException {
+        ArrayList<String> dumpList = new ArrayList<String>();
+        dumpList.add(table.getResourcePath());
+        attachKylinPropsAndMetadata(dumpList, conf);
+    }
+    
+    protected void attachKylinPropsAndMetadata(CubeInstance cube, Configuration conf) throws IOException {
+        MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        
+        // write cube / model_desc / cube_desc / dict / table
+        ArrayList<String> dumpList = new ArrayList<String>();
+        dumpList.add(cube.getResourcePath());
+        dumpList.add(cube.getDescriptor().getModel().getResourcePath());
+        dumpList.add(cube.getDescriptor().getResourcePath());
+        
+        for (String tableName : cube.getDescriptor().getModel().getAllTables()) {
+            TableDesc table = metaMgr.getTableDesc(tableName);
+            dumpList.add(table.getResourcePath());
+        }
+        for (CubeSegment segment : cube.getSegments()) {
+            dumpList.addAll(segment.getDictionaryPaths());
+        }
+        
+        attachKylinPropsAndMetadata(dumpList, conf);
+    }
+
+    protected void attachKylinPropsAndMetadata(ArrayList<String> dumpList, Configuration conf) throws IOException {
+        File tmp = File.createTempFile("kylin_job_meta", "");
+        tmp.delete(); // we need a directory, so delete the file first
+
+        File metaDir = new File(tmp, "meta");
+        metaDir.mkdirs();
+        metaDir.getParentFile().deleteOnExit();
+
+        // write kylin.properties
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        File kylinPropsFile = new File(metaDir, "kylin.properties");
+        kylinConfig.writeProperties(kylinPropsFile);
+
+        // write resources
+        dumpResources(kylinConfig, metaDir, dumpList);
+
+        // hadoop distributed cache
+        conf.set("tmpfiles", "file:///" + OptionsHelper.convertToFileURL(metaDir.getAbsolutePath()));
+    }
+
+    private void dumpResources(KylinConfig kylinConfig, File metaDir, ArrayList<String> dumpList) throws IOException {
+        ResourceStore from = ResourceStore.getStore(kylinConfig);
+        KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
+        ResourceStore to = ResourceStore.getStore(localConfig);
+        for (String path : dumpList) {
+            InputStream in = from.getResource(path);
+            if (in == null)
+                throw new IllegalStateException("No resource found at -- " + path);
+            long ts = from.getResourceTimestamp(path);
+            to.putResource(path, in, ts);
+            //The following log is duplicate with in ResourceStore
+            //log.info("Dumped resource " + path + " to " + metaDir.getAbsolutePath());
+        }
+    }
+
+    protected void deletePath(Configuration conf, Path path) throws IOException {
+        HadoopUtil.deletePath(conf, path);
+    }
+
+    protected double getTotalMapInputMB() throws ClassNotFoundException, IOException, InterruptedException, JobException {
+        if (job == null) {
+            throw new JobException("Job is null");
+        }
+
+        long mapInputBytes = 0;
+        InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
+        for (InputSplit split : input.getSplits(job)) {
+            mapInputBytes += split.getLength();
+        }
+        if (mapInputBytes == 0) {
+            throw new IllegalArgumentException("Map input splits are 0 bytes, something is wrong!");
+        }
+        double totalMapInputMB = (double) mapInputBytes / 1024 / 1024;
+        return totalMapInputMB;
+    }
+
+    protected int getMapInputSplitCount() throws ClassNotFoundException, JobException, IOException, InterruptedException {
+        if (job == null) {
+            throw new JobException("Job is null");
+        }
+        InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
+        return input.getSplits(job).size();
+    }
+
+    public void kill() throws JobException {
+        if (job != null) {
+            try {
+                job.killJob();
+            } catch (IOException e) {
+                throw new JobException(e);
+            }
+        }
+    }
+
+    public Map<String, String> getInfo() throws JobException {
+        if (job != null) {
+            Map<String, String> status = new HashMap<String, String>();
+            if (null != job.getJobID()) {
+                status.put(JobInstance.MR_JOB_ID, job.getJobID().toString());
+            }
+            if (null != job.getTrackingURL()) {
+                status.put(JobInstance.YARN_APP_URL, job.getTrackingURL().toString());
+            }
+
+            return status;
+        } else {
+            throw new JobException("Job is null");
+        }
+    }
+
+    public Counters getCounters() throws JobException {
+        if (job != null) {
+            try {
+                return job.getCounters();
+            } catch (IOException e) {
+                throw new JobException(e);
+            }
+        } else {
+            throw new JobException("Job is null");
+        }
+    }
+
+    public void setAsync(boolean isAsync) {
+        this.isAsync = isAsync;
+    }
+
+    public Job getJob() {
+        return this.job;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
new file mode 100644
index 0000000..3a40ce2
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+/**
+ * @author George Song (ysong1)
+ * 
+ */
+public interface BatchConstants {
+
+    public static final char INTERMEDIATE_TABLE_ROW_DELIMITER = 127;
+
+    public static final String CFG_CUBE_NAME = "cube.name";
+    public static final String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
+
+    public static final String CFG_II_NAME = "ii.name";
+    public static final String CFG_II_SEGMENT_NAME = "ii.segment.name";
+
+    public static final String INPUT_DELIM = "input.delim";
+    public static final String OUTPUT_PATH = "output.path";
+
+    public static final String TABLE_NAME = "table.name";
+    public static final String TABLE_COLUMNS = "table.columns";
+
+    public static final String CFG_IS_MERGE = "is.merge";
+    public static final String CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER = "cube.intermediate.table.row.delimiter";
+
+    public static final String MAPREDUCE_COUTNER_GROUP_NAME = "Cube Builder";
+
+    public static final String MAPPER_SAMPLE_NUMBER = "mapper.sample.number";
+    public static final String REGION_NUMBER = "region.number";
+    public static final String CUBE_CAPACITY = "cube.capacity";
+
+    public static final String CFG_STATISTICS_ENABLED = "statistics.enabled";
+    public static final String CFG_STATISTICS_OUTPUT = "statistics.ouput";
+    public static final String CFG_STATISTICS_SAMPLING_PERCENT = "statistics.sampling.percent";
+    public static final String CFG_STATISTICS_CUBE_ESTIMATION = "cube_statistics.txt";
+    public static final String CFG_STATISTICS_CUBOID_ESTIMATION = "cuboid_statistics.seq";
+
+    public static final int COUNTER_MAX = 100000;
+    public static final int ERROR_RECORD_THRESHOLD = 100;
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java
new file mode 100644
index 0000000..c8e74f6
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+
+import org.apache.commons.httpclient.ConnectTimeoutException;
+import org.apache.commons.httpclient.HttpClientError;
+import org.apache.commons.httpclient.params.HttpConnectionParams;
+import org.apache.commons.httpclient.protocol.ControllerThreadSocketFactory;
+import org.apache.commons.httpclient.protocol.SecureProtocolSocketFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author xduo
+ * 
+ */
+public class DefaultSslProtocolSocketFactory implements SecureProtocolSocketFactory {
+    /** Log object for this class. */
+    private static Logger LOG = LoggerFactory.getLogger(DefaultSslProtocolSocketFactory.class);
+    private SSLContext sslcontext = null;
+
+    /**
+     * Constructor for DefaultSslProtocolSocketFactory.
+     */
+    public DefaultSslProtocolSocketFactory() {
+        super();
+    }
+
+    /**
+     * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int,java.net.InetAddress,int)
+     */
+    public Socket createSocket(String host, int port, InetAddress clientHost, int clientPort) throws IOException, UnknownHostException {
+        return getSSLContext().getSocketFactory().createSocket(host, port, clientHost, clientPort);
+    }
+
+    /**
+     * Attempts to get a new socket connection to the given host within the
+     * given time limit.
+     * 
+     * <p>
+     * To circumvent the limitations of older JREs that do not support connect
+     * timeout a controller thread is executed. The controller thread attempts
+     * to create a new socket within the given limit of time. If socket
+     * constructor does not return until the timeout expires, the controller
+     * terminates and throws an {@link ConnectTimeoutException}
+     * </p>
+     * 
+     * @param host
+     *            the host name/IP
+     * @param port
+     *            the port on the host
+     * @param localAddress
+     *            the local host name/IP to bind the socket to
+     * @param localPort
+     *            the port on the local machine
+     * @param params
+     *            {@link HttpConnectionParams Http connection parameters}
+     * 
+     * @return Socket a new socket
+     * 
+     * @throws IOException
+     *             if an I/O error occurs while creating the socket
+     * @throws UnknownHostException
+     *             if the IP address of the host cannot be determined
+     * @throws ConnectTimeoutException
+     *             DOCUMENT ME!
+     * @throws IllegalArgumentException
+     *             DOCUMENT ME!
+     */
+    public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort, final HttpConnectionParams params) throws IOException, UnknownHostException, ConnectTimeoutException {
+        if (params == null) {
+            throw new IllegalArgumentException("Parameters may not be null");
+        }
+
+        int timeout = params.getConnectionTimeout();
+
+        if (timeout == 0) {
+            return createSocket(host, port, localAddress, localPort);
+        } else {
+            // To be eventually deprecated when migrated to Java 1.4 or above
+            return ControllerThreadSocketFactory.createSocket(this, host, port, localAddress, localPort, timeout);
+        }
+    }
+
+    /**
+     * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int)
+     */
+    public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
+        return getSSLContext().getSocketFactory().createSocket(host, port);
+    }
+
+    /**
+     * @see SecureProtocolSocketFactory#createSocket(java.net.Socket,java.lang.String,int,boolean)
+     */
+    public Socket createSocket(Socket socket, String host, int port, boolean autoClose) throws IOException, UnknownHostException {
+        return getSSLContext().getSocketFactory().createSocket(socket, host, port, autoClose);
+    }
+
+    public boolean equals(Object obj) {
+        return ((obj != null) && obj.getClass().equals(DefaultX509TrustManager.class));
+    }
+
+    public int hashCode() {
+        return DefaultX509TrustManager.class.hashCode();
+    }
+
+    private static SSLContext createEasySSLContext() {
+        try {
+            SSLContext context = SSLContext.getInstance("TLS");
+            context.init(null, new TrustManager[] { new DefaultX509TrustManager(null) }, null);
+
+            return context;
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+            throw new HttpClientError(e.toString());
+        }
+    }
+
+    private SSLContext getSSLContext() {
+        if (this.sslcontext == null) {
+            this.sslcontext = createEasySSLContext();
+        }
+
+        return this.sslcontext;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultX509TrustManager.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultX509TrustManager.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultX509TrustManager.java
new file mode 100644
index 0000000..d7901e5
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultX509TrustManager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author xduo
+ * 
+ */
+public class DefaultX509TrustManager implements X509TrustManager {
+
+    /** Log object for this class. */
+    private static Logger LOG = LoggerFactory.getLogger(DefaultX509TrustManager.class);
+    private X509TrustManager standardTrustManager = null;
+
+    /**
+     * Constructor for DefaultX509TrustManager.
+     * 
+     */
+    public DefaultX509TrustManager(KeyStore keystore) throws NoSuchAlgorithmException, KeyStoreException {
+        super();
+
+        TrustManagerFactory factory = TrustManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        factory.init(keystore);
+
+        TrustManager[] trustmanagers = factory.getTrustManagers();
+
+        if (trustmanagers.length == 0) {
+            throw new NoSuchAlgorithmException("SunX509 trust manager not supported");
+        }
+
+        this.standardTrustManager = (X509TrustManager) trustmanagers[0];
+    }
+
+    public X509Certificate[] getAcceptedIssuers() {
+        return this.standardTrustManager.getAcceptedIssuers();
+    }
+
+    public boolean isClientTrusted(X509Certificate[] certificates) {
+        return true;
+        // return this.standardTrustManager.isClientTrusted(certificates);
+    }
+
+    public boolean isServerTrusted(X509Certificate[] certificates) {
+        if ((certificates != null) && LOG.isDebugEnabled()) {
+            LOG.debug("Server certificate chain:");
+
+            for (int i = 0; i < certificates.length; i++) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("X509Certificate[" + i + "]=" + certificates[i]);
+                }
+            }
+        }
+
+        if ((certificates != null) && (certificates.length == 1)) {
+            X509Certificate certificate = certificates[0];
+
+            try {
+                certificate.checkValidity();
+            } catch (CertificateException e) {
+                LOG.error(e.toString());
+
+                return false;
+            }
+
+            return true;
+        } else {
+            return true;
+            // return this.standardTrustManager.isServerTrusted(certificates);
+        }
+    }
+
+    @Override
+    public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
new file mode 100644
index 0000000..294c957
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author xduo
+ * 
+ */
+public class HadoopCmdOutput {
+
+    protected static final Logger log = LoggerFactory.getLogger(HadoopCmdOutput.class);
+
+    private final StringBuilder output;
+    private final Job job;
+
+    public HadoopCmdOutput(Job job, StringBuilder output) {
+        super();
+        this.job = job;
+        this.output = output;
+    }
+
+    public String getMrJobId() {
+        return getInfo().get(ExecutableConstants.MR_JOB_ID);
+    }
+
+    public Map<String, String> getInfo() {
+        if (job != null) {
+            Map<String, String> status = new HashMap<String, String>();
+            if (null != job.getJobID()) {
+                status.put(ExecutableConstants.MR_JOB_ID, job.getJobID().toString());
+            }
+            if (null != job.getTrackingURL()) {
+                status.put(ExecutableConstants.YARN_APP_URL, job.getTrackingURL().toString());
+            }
+            return status;
+        } else {
+            return Collections.emptyMap();
+        }
+    }
+
+    private String mapInputRecords;
+    private String hdfsBytesWritten;
+    private String hdfsBytesRead;
+
+    public String getMapInputRecords() {
+        return mapInputRecords;
+    }
+
+    public String getHdfsBytesWritten() {
+        return hdfsBytesWritten;
+    }
+
+    public String getHdfsBytesRead() {
+        return hdfsBytesRead;
+    }
+    
+    public void updateJobCounter() {
+        try {
+            Counters counters = job.getCounters();
+            if (counters == null) {
+                String errorMsg = "no counters for job " + getMrJobId();
+                log.warn(errorMsg);
+                output.append(errorMsg);
+                return;
+            }
+            this.output.append(counters.toString()).append("\n");
+            log.debug(counters.toString());
+
+            mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue());
+            hdfsBytesWritten = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue());
+            hdfsBytesRead = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_READ").getValue());
+        } catch (Exception e) {
+            log.error(e.getLocalizedMessage(), e);
+            output.append(e.getLocalizedMessage());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
new file mode 100644
index 0000000..089df5f
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.Constructor;
+
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+import com.google.common.base.Preconditions;
+
+/**
+ */
+public class HadoopShellExecutable extends AbstractExecutable {
+
+    private static final String KEY_MR_JOB = "HADOOP_SHELL_JOB_CLASS";
+    private static final String KEY_PARAMS = "HADOOP_SHELL_JOB_PARAMS";
+
+    public HadoopShellExecutable() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        final String mapReduceJobClass = getJobClass();
+        String params = getJobParams();
+        Preconditions.checkNotNull(mapReduceJobClass);
+        Preconditions.checkNotNull(params);
+        try {
+            final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
+            final AbstractHadoopJob job = constructor.newInstance();
+            String[] args = params.trim().split("\\s+");
+            logger.info("parameters of the HadoopShellExecutable:");
+            logger.info(params);
+            int result;
+            StringBuilder log = new StringBuilder();
+            try {
+                result = ToolRunner.run(job, args);
+            } catch (Exception ex) {
+                logger.error("error execute " + this.toString(), ex);
+                StringWriter stringWriter = new StringWriter();
+                ex.printStackTrace(new PrintWriter(stringWriter));
+                log.append(stringWriter.toString()).append("\n");
+                result = 2;
+            }
+            log.append("result code:").append(result);
+            return result == 0 ? new ExecuteResult(ExecuteResult.State.SUCCEED, log.toString()):new ExecuteResult(ExecuteResult.State.FAILED, log.toString());
+        } catch (ReflectiveOperationException e) {
+            logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        } catch (Exception e) {
+            logger.error("error execute " + this.toString(), e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        }
+    }
+
+    public void setJobClass(Class<? extends AbstractHadoopJob> clazzName) {
+        setParam(KEY_MR_JOB, clazzName.getName());
+    }
+
+    public String getJobClass() throws ExecuteException {
+        return getParam(KEY_MR_JOB);
+    }
+
+    public void setJobParams(String param) {
+        setParam(KEY_PARAMS, param);
+    }
+
+    public String getJobParams() {
+        return getParam(KEY_PARAMS);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java
new file mode 100644
index 0000000..1b71b92
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author xduo
+ * 
+ */
+public class HadoopStatusChecker {
+
+    protected static final Logger logger = LoggerFactory.getLogger(HadoopStatusChecker.class);
+
+    private final String yarnUrl;
+    private final String mrJobID;
+    private final StringBuilder output;
+
+    public HadoopStatusChecker(String yarnUrl, String mrJobID, StringBuilder output) {
+        this.yarnUrl = yarnUrl;
+        this.mrJobID = mrJobID;
+        this.output = output;
+    }
+
+    public JobStepStatusEnum checkStatus() {
+        if (null == mrJobID) {
+            this.output.append("Skip status check with empty job id..\n");
+            return JobStepStatusEnum.WAITING;
+        }
+        JobStepStatusEnum status = null;
+        try {
+            final Pair<RMAppState, FinalApplicationStatus> result = new HadoopStatusGetter(yarnUrl, mrJobID).get();
+            logger.debug("State of Hadoop job: " + mrJobID + ":" + result.getLeft() + "-" + result.getRight());
+            output.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S").format(new Date()) + " - State of Hadoop job: " + mrJobID + ":" + result.getLeft() + " - " + result.getRight() + "\n");
+
+            switch (result.getRight()) {
+            case SUCCEEDED:
+                status = JobStepStatusEnum.FINISHED;
+                break;
+            case FAILED:
+                status = JobStepStatusEnum.ERROR;
+                break;
+            case KILLED:
+                status = JobStepStatusEnum.KILLED;
+                break;
+            case UNDEFINED:
+                switch (result.getLeft()) {
+                case NEW:
+                case NEW_SAVING:
+                case SUBMITTED:
+                case ACCEPTED:
+                    status = JobStepStatusEnum.WAITING;
+                    break;
+                case RUNNING:
+                    status = JobStepStatusEnum.RUNNING;
+                    break;
+                case FINAL_SAVING:
+                case FINISHING:
+                case FINISHED:
+                case FAILED:
+                case KILLING:
+                case KILLED:
+                }
+                break;
+            }
+        } catch (Exception e) {
+            logger.error("error check status", e);
+            output.append("Exception: " + e.getLocalizedMessage() + "\n");
+            status = JobStepStatusEnum.ERROR;
+        }
+
+        return status;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
new file mode 100644
index 0000000..53b8850
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+
+import org.apache.commons.httpclient.Header;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpMethod;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.protocol.Protocol;
+import org.apache.commons.httpclient.protocol.ProtocolSocketFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class HadoopStatusGetter {
+
+    private final String mrJobId;
+    private final String yarnUrl;
+
+    protected static final Logger log = LoggerFactory.getLogger(HadoopStatusChecker.class);
+
+    public HadoopStatusGetter(String yarnUrl, String mrJobId) {
+        this.yarnUrl = yarnUrl;
+        this.mrJobId = mrJobId;
+    }
+
+    public Pair<RMAppState, FinalApplicationStatus> get() throws IOException {
+        String applicationId = mrJobId.replace("job", "application");
+        String url = yarnUrl.replace("${job_id}", applicationId);
+        JsonNode root = new ObjectMapper().readTree(getHttpResponse(url));
+        RMAppState state = RMAppState.valueOf(root.findValue("state").getTextValue());
+        FinalApplicationStatus finalStatus = FinalApplicationStatus.valueOf(root.findValue("finalStatus").getTextValue());
+        return Pair.of(state, finalStatus);
+    }
+
+    private String getHttpResponse(String url) throws IOException {
+        HttpClient client = new HttpClient();
+
+        String response = null;
+        while (response == null) { // follow redirects via 'refresh'
+            if (url.startsWith("https://")) {
+                registerEasyHttps();
+            }
+            if (url.contains("anonymous=true") == false) {
+                url += url.contains("?") ? "&" : "?";
+                url += "anonymous=true";
+            }
+
+            HttpMethod get = new GetMethod(url);
+            get.addRequestHeader("accept", "application/json");
+
+            try {
+                client.executeMethod(get);
+
+                String redirect = null;
+                Header h = get.getResponseHeader("Refresh");
+                if (h != null) {
+                    String s = h.getValue();
+                    int cut = s.indexOf("url=");
+                    if (cut >= 0) {
+                        redirect = s.substring(cut + 4);
+                    }
+                }
+
+                if (redirect == null) {
+                    response = get.getResponseBodyAsString();
+                    log.debug("Job " + mrJobId + " get status check result.\n");
+                } else {
+                    url = redirect;
+                    log.debug("Job " + mrJobId + " check redirect url " + url + ".\n");
+                }
+            } finally {
+                get.releaseConnection();
+            }
+        }
+
+        return response;
+    }
+
+    private static Protocol EASY_HTTPS = null;
+
+    private static void registerEasyHttps() {
+        // by pass all https issue
+        if (EASY_HTTPS == null) {
+            EASY_HTTPS = new Protocol("https", (ProtocolSocketFactory) new DefaultSslProtocolSocketFactory(), 443);
+            Protocol.registerProtocol("https", EASY_HTTPS);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
new file mode 100644
index 0000000..b62fd21
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.Constructor;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.execution.Output;
+
+import com.google.common.base.Preconditions;
+
+/**
+ */
+public class MapReduceExecutable extends AbstractExecutable {
+
+    private static final String KEY_MR_JOB = "MR_JOB_CLASS";
+    private static final String KEY_PARAMS = "MR_JOB_PARAMS";
+    private static final String KEY_COUNTER_SAVEAS = "MR_COUNTER_SAVEAS";
+    
+    public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
+
+    public MapReduceExecutable() {
+        super();
+    }
+
+    @Override
+    protected void onExecuteStart(ExecutableContext executableContext) {
+        final Output output = executableManager.getOutput(getId());
+        if (output.getExtra().containsKey(START_TIME)) {
+            final String mrJobId = output.getExtra().get(ExecutableConstants.MR_JOB_ID);
+            if (mrJobId == null) {
+                executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+                return;
+            }
+            try {
+                Job job = new Cluster(new Configuration()).getJob(JobID.forName(mrJobId));
+                if (job.getJobState() == JobStatus.State.FAILED) {
+                    //remove previous mr job info
+                    super.onExecuteStart(executableContext);
+                } else {
+                    executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+                }
+            } catch (IOException e) {
+                logger.warn("error get hadoop status");
+                super.onExecuteStart(executableContext);
+            } catch (InterruptedException e) {
+                logger.warn("error get hadoop status");
+                super.onExecuteStart(executableContext);
+            }
+        } else {
+            super.onExecuteStart(executableContext);
+        }
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        final String mapReduceJobClass = getMapReduceJobClass();
+        String params = getMapReduceParams();
+        Preconditions.checkNotNull(mapReduceJobClass);
+        Preconditions.checkNotNull(params);
+        try {
+            Job job;
+            final Map<String, String> extra = executableManager.getOutput(getId()).getExtra();
+            if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) {
+                job = new Cluster(new Configuration()).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
+                logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID + " resumed"));
+            } else {
+                final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
+                final AbstractHadoopJob hadoopJob = constructor.newInstance();
+                hadoopJob.setAsync(true); // so the ToolRunner.run() returns right away
+                logger.info("parameters of the MapReduceExecutable:");
+                logger.info(params);
+                String[] args = params.trim().split("\\s+");
+                try {
+                    //for async mr job, ToolRunner just return 0;
+                    ToolRunner.run(hadoopJob, args);
+                } catch (Exception ex) {
+                    StringBuilder log = new StringBuilder();
+                    logger.error("error execute " + this.toString(), ex);
+                    StringWriter stringWriter = new StringWriter();
+                    ex.printStackTrace(new PrintWriter(stringWriter));
+                    log.append(stringWriter.toString()).append("\n");
+                    log.append("result code:").append(2);
+                    return new ExecuteResult(ExecuteResult.State.ERROR, log.toString());
+                }
+                job = hadoopJob.getJob();
+            }
+            final StringBuilder output = new StringBuilder();
+            final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, output);
+
+            final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig());
+            if (restStatusCheckUrl == null) {
+                logger.error("restStatusCheckUrl is null");
+                return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null");
+            }
+            String mrJobId = hadoopCmdOutput.getMrJobId();
+            HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output);
+            JobStepStatusEnum status = JobStepStatusEnum.NEW;
+            while (!isDiscarded()) {
+                JobStepStatusEnum newStatus = statusChecker.checkStatus();
+                if (status == JobStepStatusEnum.KILLED) {
+                    executableManager.updateJobOutput(getId(), ExecutableState.ERROR, Collections.<String, String>emptyMap(), "killed by admin");
+                    return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin");
+                }
+                if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) {
+                    final long waitTime = System.currentTimeMillis() - getStartTime();
+                    setMapReduceWaitTime(waitTime);
+                }
+                status = newStatus;
+                executableManager.addJobInfo(getId(), hadoopCmdOutput.getInfo());
+                if (status.isComplete()) {
+                    final Map<String, String> info = hadoopCmdOutput.getInfo();
+                    readCounters(hadoopCmdOutput, info);
+                    executableManager.addJobInfo(getId(), info);
+
+                    if (status == JobStepStatusEnum.FINISHED) {
+                        return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+                    } else {
+                        return new ExecuteResult(ExecuteResult.State.FAILED, output.toString());
+                    }
+                }
+                Thread.sleep(context.getConfig().getYarnStatusCheckIntervalSeconds() * 1000);
+            }
+            //TODO kill discarded mr job using "hadoop job -kill " + mrJobId
+
+            return new ExecuteResult(ExecuteResult.State.DISCARDED, output.toString());
+
+        } catch (ReflectiveOperationException e) {
+            logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        } catch (Exception e) {
+            logger.error("error execute " + this.toString(), e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        }
+    }
+
+    private void readCounters(final HadoopCmdOutput hadoopCmdOutput, final Map<String, String> info) {
+        hadoopCmdOutput.updateJobCounter();
+        info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords());
+        info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead());
+        info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten());
+        
+        String saveAs = getParam(KEY_COUNTER_SAVEAS);
+        if (saveAs != null) {
+            String[] saveAsNames = saveAs.split(",");
+            saveCounterAs(hadoopCmdOutput.getMapInputRecords(), saveAsNames, 0, info);
+            saveCounterAs(hadoopCmdOutput.getHdfsBytesRead(), saveAsNames, 1, info);
+            saveCounterAs(hadoopCmdOutput.getHdfsBytesWritten(), saveAsNames, 2, info);
+        }
+    }
+
+    private void saveCounterAs(String counter, String[] saveAsNames, int i, Map<String, String> info) {
+        if (saveAsNames.length > i && StringUtils.isBlank(saveAsNames[i]) == false) {
+            info.put(saveAsNames[i].trim(), counter);
+        }
+    }
+
+    private String getRestStatusCheckUrl(Job job, KylinConfig config) {
+        final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl();
+        if (yarnStatusCheckUrl != null) {
+            return yarnStatusCheckUrl;
+        } else {
+            logger.info(KylinConfig.KYLIN_JOB_YARN_APP_REST_CHECK_URL + " is not set, read from job configuration");
+        }
+        String rmWebHost = job.getConfiguration().get("yarn.resourcemanager.webapp.address");
+        if (StringUtils.isEmpty(rmWebHost)) {
+            return null;
+        }
+        if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) {
+            //do nothing
+        } else {
+            rmWebHost = "http://" + rmWebHost;
+        }
+        logger.info("yarn.resourcemanager.webapp.address:" + rmWebHost);
+        return rmWebHost + "/ws/v1/cluster/apps/${job_id}?anonymous=true";
+    }
+
+    public long getMapReduceWaitTime() {
+        return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
+    }
+
+    public void setMapReduceWaitTime(long t) {
+        addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
+    }
+
+    public void setMapReduceJobClass(Class<? extends AbstractHadoopJob> clazzName) {
+        setParam(KEY_MR_JOB, clazzName.getName());
+    }
+
+    public String getMapReduceJobClass() throws ExecuteException {
+        return getParam(KEY_MR_JOB);
+    }
+
+    public void setMapReduceParams(String param) {
+        setParam(KEY_PARAMS, param);
+    }
+
+    public String getMapReduceParams() {
+        return getParam(KEY_PARAMS);
+    }
+    
+    public String getCounterSaveAs() {
+        return getParam(KEY_COUNTER_SAVEAS);
+    }
+    
+    public void setCounterSaveAs(String value) {
+        setParam(KEY_COUNTER_SAVEAS, value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidJob.java
new file mode 100644
index 0000000..2e716eb
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidJob.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * @author honma
+ * 
+ */
+
+public class BaseCuboidJob extends CuboidJob {
+    public BaseCuboidJob() {
+        this.setMapperClass(HiveToBaseCuboidMapper.class);
+    }
+
+    public static void main(String[] args) throws Exception {
+        CuboidJob job = new BaseCuboidJob();
+        int exitCode = ToolRunner.run(job, args);
+        System.exit(exitCode);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
new file mode 100644
index 0000000..9bc84ff
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -0,0 +1,205 @@
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.collect.Lists;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesSplitter;
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ */
+public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> {
+    protected static final Logger logger = LoggerFactory.getLogger(HiveToBaseCuboidMapper.class);
+    public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
+    public static final byte[] ONE = Bytes.toBytes("1");
+    protected String cubeName;
+    protected String segmentName;
+    protected Cuboid baseCuboid;
+    protected CubeInstance cube;
+    protected CubeDesc cubeDesc;
+    protected CubeSegment cubeSegment;
+    protected List<byte[]> nullBytes;
+    protected CubeJoinedFlatTableDesc intermediateTableDesc;
+    protected String intermediateTableRowDelimiter;
+    protected byte byteRowDelimiter;
+    protected int counter;
+    protected Object[] measures;
+    protected byte[][] keyBytesBuf;
+    protected BytesSplitter bytesSplitter;
+    protected AbstractRowKeyEncoder rowKeyEncoder;
+    protected MeasureCodec measureCodec;
+    private int errorRecordCounter;
+    private Text outputKey = new Text();
+    private Text outputValue = new Text();
+    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+
+        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
+        intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER));
+        if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) {
+            throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length);
+        }
+
+        byteRowDelimiter = Bytes.toBytes(intermediateTableRowDelimiter)[0];
+
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        cube = CubeManager.getInstance(config).getCube(cubeName);
+        cubeDesc = cube.getDescriptor();
+        cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+
+        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+
+        intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment);
+
+        bytesSplitter = new BytesSplitter(200, 4096);
+        rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
+
+        measureCodec = new MeasureCodec(cubeDesc.getMeasures());
+        measures = new Object[cubeDesc.getMeasures().size()];
+
+        int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
+        keyBytesBuf = new byte[colCount][];
+
+        initNullBytes();
+    }
+
+    private void initNullBytes() {
+        nullBytes = Lists.newArrayList();
+        nullBytes.add(HIVE_NULL);
+        String[] nullStrings = cubeDesc.getNullStrings();
+        if (nullStrings != null) {
+            for (String s : nullStrings) {
+                nullBytes.add(Bytes.toBytes(s));
+            }
+        }
+    }
+
+    private boolean isNull(byte[] v) {
+        for (byte[] nullByte : nullBytes) {
+            if (Bytes.equals(v, nullByte))
+                return true;
+        }
+        return false;
+    }
+
+    private byte[] buildKey(SplittedBytes[] splitBuffers) {
+        int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
+        for (int i = 0; i < baseCuboid.getColumns().size(); i++) {
+            int index = rowKeyColumnIndexes[i];
+            keyBytesBuf[i] = Arrays.copyOf(splitBuffers[index].value, splitBuffers[index].length);
+            if (isNull(keyBytesBuf[i])) {
+                keyBytesBuf[i] = null;
+            }
+        }
+        return rowKeyEncoder.encode(keyBytesBuf);
+    }
+
+    private void buildValue(SplittedBytes[] splitBuffers) {
+
+        for (int i = 0; i < measures.length; i++) {
+            byte[] valueBytes = getValueBytes(splitBuffers, i);
+            measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes);
+        }
+
+        valueBuf.clear();
+        measureCodec.encode(measures, valueBuf);
+    }
+
+    private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) {
+        MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx);
+        FunctionDesc func = desc.getFunction();
+        ParameterDesc paramDesc = func.getParameter();
+        int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
+
+        byte[] result = null;
+
+        // constant
+        if (flatTableIdx == null) {
+            result = Bytes.toBytes(paramDesc.getValue());
+        }
+        // column values
+        else {
+            // for multiple columns, their values are joined
+            for (int i = 0; i < flatTableIdx.length; i++) {
+                SplittedBytes split = splitBuffers[flatTableIdx[i]];
+                if (result == null) {
+                    result = Arrays.copyOf(split.value, split.length);
+                } else {
+                    byte[] newResult = new byte[result.length + split.length];
+                    System.arraycopy(result, 0, newResult, 0, result.length);
+                    System.arraycopy(split.value, 0, newResult, result.length, split.length);
+                    result = newResult;
+                }
+            }
+        }
+
+        if (func.isCount() || func.isHolisticCountDistinct()) {
+            // note for holistic count distinct, this value will be ignored
+            result = ONE;
+        }
+
+        if (isNull(result)) {
+            result = null;
+        }
+
+        return result;
+    }
+
+    protected void outputKV(Context context) throws IOException, InterruptedException {
+        intermediateTableDesc.sanityCheck(bytesSplitter);
+
+        byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
+        outputKey.set(rowKey, 0, rowKey.length);
+
+        buildValue(bytesSplitter.getSplitBuffers());
+        outputValue.set(valueBuf.array(), 0, valueBuf.position());
+        context.write(outputKey, outputValue);
+    }
+
+    protected void handleErrorRecord(BytesSplitter bytesSplitter, Exception ex) throws IOException {
+
+        System.err.println("Insane record: " + bytesSplitter);
+        ex.printStackTrace(System.err);
+
+        errorRecordCounter++;
+        if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) {
+            if (ex instanceof IOException)
+                throw (IOException) ex;
+            else if (ex instanceof RuntimeException)
+                throw (RuntimeException) ex;
+            else
+                throw new RuntimeException("", ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
new file mode 100644
index 0000000..4b4c815
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.cli.DictionaryGeneratorCLI;
+import org.apache.kylin.dict.DistinctColumnValuesProvider;
+import org.apache.kylin.engine.mr.DFSFileTable;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.ReadableTable;
+
+/**
+ * @author ysong1
+ * 
+ */
+
+public class CreateDictionaryJob extends AbstractHadoopJob {
+
+    private int returnCode = 0;
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_SEGMENT_NAME);
+            options.addOption(OPTION_INPUT_PATH);
+            parseOptions(options, args);
+
+            final String cubeName = getOptionValue(OPTION_CUBE_NAME);
+            final String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+            final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
+
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+            DictionaryGeneratorCLI.processSegment(config, cubeName, segmentName, new DistinctColumnValuesProvider() {
+                @Override
+                public ReadableTable getDistinctValuesFor(TblColRef col) {
+                    return new DFSFileTable(factColumnsInputPath + "/" + col.getName(), -1);
+                }
+            });
+        } catch (Exception e) {
+            printUsage(options);
+            throw e;
+        }
+
+        return returnCode;
+    }
+    
+    public static void main(String[] args) throws Exception {
+        int exitCode = ToolRunner.run(new CreateDictionaryJob(), args);
+        System.exit(exitCode);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
new file mode 100644
index 0000000..88edfe1
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidCLI;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.exception.JobException;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * @author ysong1
+ */
+public class CuboidJob extends AbstractHadoopJob {
+
+    protected static final Logger logger = LoggerFactory.getLogger(CuboidJob.class);
+    private static final String MAPRED_REDUCE_TASKS = "mapred.reduce.tasks";
+
+    @SuppressWarnings("rawtypes")
+    private Class<? extends Mapper> mapperClass;
+
+    @Override
+    public int run(String[] args) throws Exception {
+        if (this.mapperClass == null)
+            throw new Exception("Mapper class is not set!");
+        
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_SEGMENT_NAME);
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_NCUBOID_LEVEL);
+            options.addOption(OPTION_INPUT_FORMAT);
+            parseOptions(options, args);
+
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+            int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL));
+            String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            CubeManager cubeMgr = CubeManager.getInstance(config);
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            logger.info("Starting: " + job.getJobName());
+
+            setJobClasspath(job);
+
+            // Mapper
+            configureMapperInputFormat(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
+            job.setMapperClass(this.mapperClass);
+            job.setMapOutputKeyClass(Text.class);
+            job.setMapOutputValueClass(Text.class);
+            job.setCombinerClass(CuboidReducer.class); // for base cuboid shuffle skew, some rowkey aggregates far more records than others
+
+            // Reducer
+            job.setReducerClass(CuboidReducer.class);
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(Text.class);
+
+            FileOutputFormat.setOutputPath(job, output);
+
+            // set job configuration
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+            // add metadata to distributed cache
+            attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+            setReduceTaskNum(job, config, cubeName, nCuboidLevel);
+
+            this.deletePath(job.getConfiguration(), output);
+
+            return waitForCompletion(job);
+        } catch (Exception e) {
+            logger.error("error in CuboidJob", e);
+            printUsage(options);
+            throw e;
+        }
+    }
+
+    private void configureMapperInputFormat(CubeSegment cubeSeg) throws IOException {
+        String input = getOptionValue(OPTION_INPUT_PATH);
+        
+        if (StringUtils.isBlank(input)) {
+            // base cuboid case
+            IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+            flatTableInputFormat.configureJob(job);
+        }
+        else {
+            // n-dimension cuboid case
+            FileInputFormat.setInputPaths(job, new Path(input));
+            if (hasOption(OPTION_INPUT_FORMAT) && ("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) {
+                job.setInputFormatClass(TextInputFormat.class);
+            } else {
+                job.setInputFormatClass(SequenceFileInputFormat.class);
+            }
+        }
+    }
+
+    protected void setReduceTaskNum(Job job, KylinConfig config, String cubeName, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
+        Configuration jobConf = job.getConfiguration();
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+
+        CubeDesc cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
+
+        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+
+        // total map input MB
+        double totalMapInputMB = this.getTotalMapInputMB();
+
+        // output / input ratio
+        int preLevelCuboids, thisLevelCuboids;
+        if (level == 0) { // base cuboid
+            preLevelCuboids = thisLevelCuboids = 1;
+        } else { // n-cuboid
+            int[] allLevelCount = CuboidCLI.calculateAllLevelCount(cubeDesc);
+            preLevelCuboids = allLevelCount[level - 1];
+            thisLevelCuboids = allLevelCount[level];
+        }
+
+        // total reduce input MB
+        double totalReduceInputMB = totalMapInputMB * thisLevelCuboids / preLevelCuboids;
+
+        // number of reduce tasks
+        int numReduceTasks = (int) Math.round(totalReduceInputMB / perReduceInputMB * reduceCountRatio);
+
+        // adjust reducer number for cube which has DISTINCT_COUNT measures for
+        // better performance
+        if (cubeDesc.hasHolisticCountDistinctMeasures()) {
+            numReduceTasks = numReduceTasks * 4;
+        }
+
+        // at least 1 reducer
+        numReduceTasks = Math.max(1, numReduceTasks);
+        // no more than 5000 reducer by default
+        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+        jobConf.setInt(MAPRED_REDUCE_TASKS, numReduceTasks);
+
+        logger.info("Having total map input MB " + Math.round(totalMapInputMB));
+        logger.info("Having level " + level + ", pre-level cuboids " + preLevelCuboids + ", this level cuboids " + thisLevelCuboids);
+        logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio);
+        logger.info("Setting " + MAPRED_REDUCE_TASKS + "=" + numReduceTasks);
+    }
+
+    /**
+     * @param mapperClass
+     *            the mapperClass to set
+     */
+    @SuppressWarnings("rawtypes")
+    public void setMapperClass(Class<? extends Mapper> mapperClass) {
+        this.mapperClass = mapperClass;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
new file mode 100644
index 0000000..eab967e
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.metadata.measure.MeasureAggregators;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+/**
+ * @author George Song (ysong1)
+ * 
+ */
+public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
+
+    private static final Logger logger = LoggerFactory.getLogger(CuboidReducer.class);
+
+    private String cubeName;
+    private CubeDesc cubeDesc;
+    private List<MeasureDesc> measuresDescs;
+
+    private MeasureCodec codec;
+    private MeasureAggregators aggs;
+
+    private int counter;
+    private Object[] input;
+    private Object[] result;
+
+    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+    private Text outputValue = new Text();
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
+        measuresDescs = cubeDesc.getMeasures();
+
+        codec = new MeasureCodec(measuresDescs);
+        aggs = new MeasureAggregators(measuresDescs);
+
+        input = new Object[measuresDescs.size()];
+        result = new Object[measuresDescs.size()];
+    }
+
+    @Override
+    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+
+        aggs.reset();
+
+        for (Text value : values) {
+            codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input);
+            aggs.aggregate(input);
+        }
+        aggs.collectStates(result);
+
+        valueBuf.clear();
+        codec.encode(result, valueBuf);
+
+        outputValue.set(valueBuf.array(), 0, valueBuf.position());
+        context.write(key, outputValue);
+
+        counter++;
+        if (counter % BatchConstants.COUNTER_MAX == 0) {
+            logger.info("Handled " + counter + " records!");
+        }
+    }
+
+}