You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/02/11 13:49:51 UTC

[12/51] [partial] kylin git commit: KYLIN-1416 keep only website in document branch

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java b/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
deleted file mode 100644
index 2813596..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xjiang
- * 
- */
-public class ShellCmdOutput extends BaseCommandOutput implements ICommandOutput {
-
-    protected static final Logger log = LoggerFactory.getLogger(ShellCmdOutput.class);
-
-    protected StringBuilder output;
-    protected int exitCode;
-    protected JobStepStatusEnum status;
-
-    public ShellCmdOutput() {
-        init();
-    }
-
-    private void init() {
-        output = new StringBuilder();
-        exitCode = -1;
-        status = JobStepStatusEnum.NEW;
-    }
-
-    @Override
-    public JobStepStatusEnum getStatus() {
-        return status;
-    }
-
-    @Override
-    public void setStatus(JobStepStatusEnum s) {
-        this.status = s;
-    }
-
-    @Override
-    public String getOutput() {
-        return output.toString();
-    }
-
-    @Override
-    public void appendOutput(String message) {
-        output.append(message).append(System.getProperty("line.separator"));
-        log.debug(message);
-    }
-
-    @Override
-    public int getExitCode() {
-        return exitCode;
-    }
-
-    @Override
-    public void setExitCode(int code) {
-        exitCode = code;
-    }
-
-    @Override
-    public void reset() {
-        init();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java b/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java
deleted file mode 100644
index 873607c..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.common;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskCounter;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xduo
- * 
- */
-public class HadoopCmdOutput {
-
-    protected static final Logger log = LoggerFactory.getLogger(HadoopCmdOutput.class);
-
-    private final StringBuilder output;
-    private final Job job;
-
-    public HadoopCmdOutput(Job job, StringBuilder output) {
-        super();
-        this.job = job;
-        this.output = output;
-    }
-
-    public String getMrJobId() {
-        return getInfo().get(ExecutableConstants.MR_JOB_ID);
-    }
-
-    public Map<String, String> getInfo() {
-        if (job != null) {
-            Map<String, String> status = new HashMap<String, String>();
-            if (null != job.getJobID()) {
-                status.put(ExecutableConstants.MR_JOB_ID, job.getJobID().toString());
-            }
-            if (null != job.getTrackingURL()) {
-                status.put(ExecutableConstants.YARN_APP_URL, job.getTrackingURL().toString());
-            }
-            return status;
-        } else {
-            return Collections.emptyMap();
-        }
-    }
-
-    private String mapInputRecords;
-    private String hdfsBytesWritten;
-    private String hdfsBytesRead;
-
-    public String getMapInputRecords() {
-        return mapInputRecords;
-    }
-
-    public String getHdfsBytesWritten() {
-        return hdfsBytesWritten;
-    }
-
-    public String getHdfsBytesRead() {
-        return hdfsBytesRead;
-    }
-
-    public void updateJobCounter() {
-        try {
-            Counters counters = job.getCounters();
-            if (counters == null) {
-                String errorMsg = "no counters for job " + getMrJobId();
-                log.warn(errorMsg);
-                output.append(errorMsg);
-                return;
-            }
-            this.output.append(counters.toString()).append("\n");
-            log.debug(counters.toString());
-
-            mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue());
-            hdfsBytesWritten = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue());
-            hdfsBytesRead = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_READ").getValue());
-        } catch (Exception e) {
-            log.error(e.getLocalizedMessage(), e);
-            output.append(e.getLocalizedMessage());
-
-            mapInputRecords = "0";
-            hdfsBytesWritten = "0";
-            hdfsBytesRead = "0";
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java b/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java
deleted file mode 100644
index dc412ce..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.common;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.reflect.Constructor;
-
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Created by qianzhou on 12/26/14.
- */
-public class HadoopShellExecutable extends AbstractExecutable {
-
-    private static final String KEY_MR_JOB = "HADOOP_SHELL_JOB_CLASS";
-    private static final String KEY_PARAMS = "HADOOP_SHELL_JOB_PARAMS";
-
-    public HadoopShellExecutable() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final String mapReduceJobClass = getJobClass();
-        String params = getJobParams();
-        Preconditions.checkNotNull(mapReduceJobClass);
-        Preconditions.checkNotNull(params);
-        try {
-            final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
-            final AbstractHadoopJob job = constructor.newInstance();
-            String[] args = params.trim().split("\\s+");
-            logger.info("parameters of the HadoopShellExecutable:");
-            logger.info(params);
-            int result;
-            StringBuilder log = new StringBuilder();
-            try {
-                result = ToolRunner.run(job, args);
-            } catch (Exception ex) {
-                logger.error("error execute " + this.toString(), ex);
-                StringWriter stringWriter = new StringWriter();
-                ex.printStackTrace(new PrintWriter(stringWriter));
-                log.append(stringWriter.toString()).append("\n");
-                result = 2;
-            }
-            log.append("result code:").append(result);
-            return result == 0 ? new ExecuteResult(ExecuteResult.State.SUCCEED, log.toString()) : new ExecuteResult(ExecuteResult.State.FAILED, log.toString());
-        } catch (ReflectiveOperationException e) {
-            logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        } catch (Exception e) {
-            logger.error("error execute " + this.toString(), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-    public void setJobClass(Class<? extends AbstractHadoopJob> clazzName) {
-        setParam(KEY_MR_JOB, clazzName.getName());
-    }
-
-    public String getJobClass() throws ExecuteException {
-        return getParam(KEY_MR_JOB);
-    }
-
-    public void setJobParams(String param) {
-        setParam(KEY_PARAMS, param);
-    }
-
-    public String getJobParams() {
-        return getParam(KEY_PARAMS);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java b/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java
deleted file mode 100644
index 75c461b..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.common;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.util.HiveClient;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.datanucleus.store.types.backed.HashMap;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.collect.Lists;
-
-/**
- * Created by qianzhou on 1/15/15.
- */
-public class HqlExecutable extends AbstractExecutable {
-
-    private static final String HQL = "hql";
-    private static final String HIVE_CONFIG = "hive-config";
-
-    public HqlExecutable() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        try {
-            Map<String, String> configMap = getConfiguration();
-            HiveClient hiveClient = new HiveClient(configMap);
-
-            for (String hql : getHqls()) {
-                hiveClient.executeHQL(hql);
-            }
-            return new ExecuteResult(ExecuteResult.State.SUCCEED);
-        } catch (Exception e) {
-            logger.error("error run hive query:" + getHqls(), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-    public void setConfiguration(Map<String, String> configMap) {
-        if (configMap != null) {
-            String configStr = "";
-            try {
-                configStr = JsonUtil.writeValueAsString(configMap);
-            } catch (JsonProcessingException e) {
-                e.printStackTrace();
-            }
-            setParam(HIVE_CONFIG, configStr);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private Map<String, String> getConfiguration() {
-        String configStr = getParam(HIVE_CONFIG);
-        Map<String, String> result = null;
-        if (configStr != null) {
-            try {
-                result = JsonUtil.readValue(configStr, HashMap.class);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-
-        return result;
-    }
-
-    public void setHqls(List<String> hqls) {
-        setParam(HQL, StringUtils.join(hqls, ";"));
-    }
-
-    private List<String> getHqls() {
-        final String hqls = getParam(HQL);
-        if (hqls != null) {
-            return Lists.newArrayList(StringUtils.split(hqls, ";"));
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
deleted file mode 100644
index cb6e76c..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.common;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.reflect.Constructor;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Cluster;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.conf.HAUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.RMHAUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.tools.HadoopStatusChecker;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Created by qianzhou on 12/25/14.
- */
-public class MapReduceExecutable extends AbstractExecutable {
-
-    public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
-    private static final String KEY_MR_JOB = "MR_JOB_CLASS";
-    private static final String KEY_PARAMS = "MR_JOB_PARAMS";
-
-    public MapReduceExecutable() {
-        super();
-    }
-
-    @Override
-    protected void onExecuteStart(ExecutableContext executableContext) {
-        final Output output = executableManager.getOutput(getId());
-        if (output.getExtra().containsKey(START_TIME)) {
-            final String mrJobId = output.getExtra().get(ExecutableConstants.MR_JOB_ID);
-            if (mrJobId == null) {
-                executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
-                return;
-            }
-            try {
-                Configuration conf = HadoopUtil.getCurrentConfiguration();
-                Job job = new Cluster(conf).getJob(JobID.forName(mrJobId));
-                if (job.getJobState() == JobStatus.State.FAILED) {
-                    //remove previous mr job info
-                    super.onExecuteStart(executableContext);
-                } else {
-                    executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
-                }
-            } catch (IOException e) {
-                logger.warn("error get hadoop status");
-                super.onExecuteStart(executableContext);
-            } catch (InterruptedException e) {
-                logger.warn("error get hadoop status");
-                super.onExecuteStart(executableContext);
-            }
-        } else {
-            super.onExecuteStart(executableContext);
-        }
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final String mapReduceJobClass = getMapReduceJobClass();
-        String params = getMapReduceParams();
-        Preconditions.checkNotNull(mapReduceJobClass);
-        Preconditions.checkNotNull(params);
-        try {
-            Job job;
-            final Map<String, String> extra = executableManager.getOutput(getId()).getExtra();
-            if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) {
-                Configuration conf = HadoopUtil.getCurrentConfiguration();
-                job = new Cluster(conf).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
-                logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID + " resumed"));
-            } else {
-                final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
-                final AbstractHadoopJob hadoopJob = constructor.newInstance();
-                hadoopJob.setConf(HadoopUtil.getCurrentConfiguration());
-                hadoopJob.setAsync(true); // so the ToolRunner.run() returns right away
-                logger.info("parameters of the MapReduceExecutable:");
-                logger.info(params);
-                String[] args = params.trim().split("\\s+");
-                try {
-                    //for async mr job, ToolRunner just return 0;
-                    ToolRunner.run(hadoopJob, args);
-                } catch (Exception ex) {
-                    StringBuilder log = new StringBuilder();
-                    logger.error("error execute " + this.toString(), ex);
-                    StringWriter stringWriter = new StringWriter();
-                    ex.printStackTrace(new PrintWriter(stringWriter));
-                    log.append(stringWriter.toString()).append("\n");
-                    log.append("result code:").append(2);
-                    return new ExecuteResult(ExecuteResult.State.ERROR, log.toString());
-                }
-                job = hadoopJob.getJob();
-            }
-            final StringBuilder output = new StringBuilder();
-            final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, output);
-
-            final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig());
-            if (restStatusCheckUrl == null) {
-                logger.error("restStatusCheckUrl is null");
-                return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null");
-            }
-            String mrJobId = hadoopCmdOutput.getMrJobId();
-            HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, context.getConfig());
-            JobStepStatusEnum status = JobStepStatusEnum.NEW;
-            while (!isDiscarded()) {
-                JobStepStatusEnum newStatus = statusChecker.checkStatus();
-                if (status == JobStepStatusEnum.KILLED) {
-                    executableManager.updateJobOutput(getId(), ExecutableState.ERROR, Collections.<String, String> emptyMap(), "killed by admin");
-                    return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin");
-                }
-                if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) {
-                    final long waitTime = System.currentTimeMillis() - getStartTime();
-                    setMapReduceWaitTime(waitTime);
-                }
-                status = newStatus;
-                executableManager.addJobInfo(getId(), hadoopCmdOutput.getInfo());
-                if (status.isComplete()) {
-                    hadoopCmdOutput.updateJobCounter();
-                    final Map<String, String> info = hadoopCmdOutput.getInfo();
-                    info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords());
-                    info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead());
-                    info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten());
-                    executableManager.addJobInfo(getId(), info);
-
-                    if (status == JobStepStatusEnum.FINISHED) {
-                        return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
-                    } else {
-                        return new ExecuteResult(ExecuteResult.State.FAILED, output.toString());
-                    }
-                }
-                Thread.sleep(context.getConfig().getYarnStatusCheckIntervalSeconds() * 1000);
-            }
-
-            // try to kill running map-reduce job to release resources.
-            if (job != null) {
-                try {
-                    job.killJob();
-                } catch (Exception e) {
-                    logger.warn("failed to kill hadoop job: " + job.getJobID(), e);
-                }
-            }
-            return new ExecuteResult(ExecuteResult.State.DISCARDED, output.toString());
-
-        } catch (ReflectiveOperationException e) {
-            logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        } catch (Exception e) {
-            logger.error("error execute " + this.toString(), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-    private String getRestStatusCheckUrl(Job job, KylinConfig config) {
-        final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl();
-        if (yarnStatusCheckUrl != null) {
-            return yarnStatusCheckUrl;
-        } else {
-            logger.info(KylinConfig.KYLIN_JOB_YARN_APP_REST_CHECK_URL + " is not set, read from job configuration");
-        }
-        String rmWebHost = HAUtil.getConfValueForRMInstance(YarnConfiguration.RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, job.getConfiguration());
-        if (HAUtil.isHAEnabled(job.getConfiguration())) {
-            YarnConfiguration conf = new YarnConfiguration(job.getConfiguration());
-            String active = RMHAUtils.findActiveRMHAId(conf);
-            rmWebHost = HAUtil.getConfValueForRMInstance(HAUtil.addSuffix(YarnConfiguration.RM_WEBAPP_ADDRESS, active), YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, conf);
-        }
-        if (StringUtils.isEmpty(rmWebHost)) {
-            return null;
-        }
-        if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) {
-            //do nothing
-        } else {
-            rmWebHost = "http://" + rmWebHost;
-        }
-        logger.info("yarn.resourcemanager.webapp.address:" + rmWebHost);
-        return rmWebHost + "/ws/v1/cluster/apps/${job_id}?anonymous=true";
-    }
-
-    public long getMapReduceWaitTime() {
-        return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
-    }
-
-    public void setMapReduceWaitTime(long t) {
-        addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
-    }
-
-    public String getMapReduceJobClass() throws ExecuteException {
-        return getParam(KEY_MR_JOB);
-    }
-
-    public void setMapReduceJobClass(Class<? extends AbstractHadoopJob> clazzName) {
-        setParam(KEY_MR_JOB, clazzName.getName());
-    }
-
-    public String getMapReduceParams() {
-        return getParam(KEY_PARAMS);
-    }
-
-    public void setMapReduceParams(String param) {
-        setParam(KEY_PARAMS, param);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
deleted file mode 100644
index 19aa915..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.common;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.kylin.common.util.Logger;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-import com.google.common.collect.Maps;
-
-/**
- * Created by qianzhou on 12/26/14.
- */
-public class ShellExecutable extends AbstractExecutable {
-
-    private static final String CMD = "cmd";
-
-    public ShellExecutable() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        try {
-            logger.info("executing:" + getCmd());
-            final ShellExecutableLogger logger = new ShellExecutableLogger();
-            final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), logger);
-            executableManager.addJobInfo(getId(), logger.getInfo());
-            return new ExecuteResult(result.getFirst() == 0 ? ExecuteResult.State.SUCCEED : ExecuteResult.State.FAILED, result.getSecond());
-        } catch (IOException e) {
-            logger.error("job:" + getId() + " execute finished with exception", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-    public void setCmd(String cmd) {
-        setParam(CMD, cmd);
-    }
-
-    public String getCmd() {
-        return getParam(CMD);
-    }
-
-    private static class ShellExecutableLogger implements Logger {
-
-        private final Map<String, String> info = Maps.newHashMap();
-
-        private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
-        private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
-        private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)");
-        private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)");
-        private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)");
-        private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write");
-
-        // hive
-        private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)");
-        private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS");
-
-        @Override
-        public void log(String message) {
-            Matcher matcher = PATTERN_APP_ID.matcher(message);
-            if (matcher.find()) {
-                String appId = matcher.group(1);
-                info.put(ExecutableConstants.YARN_APP_ID, appId);
-            }
-
-            matcher = PATTERN_APP_URL.matcher(message);
-            if (matcher.find()) {
-                String appTrackingUrl = matcher.group(1);
-                info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl);
-            }
-
-            matcher = PATTERN_JOB_ID.matcher(message);
-            if (matcher.find()) {
-                String mrJobID = matcher.group(1);
-                info.put(ExecutableConstants.MR_JOB_ID, mrJobID);
-            }
-
-            matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message);
-            if (matcher.find()) {
-                String hdfsWritten = matcher.group(1);
-                info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
-            }
-
-            matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message);
-            if (matcher.find()) {
-                String sourceCount = matcher.group(1);
-                info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount);
-            }
-
-            matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message);
-            if (matcher.find()) {
-                String sourceSize = matcher.group(1);
-                info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize);
-            }
-
-            // hive
-            matcher = PATTERN_HIVE_APP_ID_URL.matcher(message);
-            if (matcher.find()) {
-                String jobId = matcher.group(1);
-                String trackingUrl = matcher.group(2);
-                info.put(ExecutableConstants.MR_JOB_ID, jobId);
-                info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
-            }
-
-            matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message);
-            if (matcher.find()) {
-                // String hdfsRead = matcher.group(1);
-                String hdfsWritten = matcher.group(2);
-                info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
-            }
-        }
-
-        Map<String, String> getInfo() {
-            return info;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
deleted file mode 100644
index 38f4a87..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.constant;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-public interface BatchConstants {
-
-    char INTERMEDIATE_TABLE_ROW_DELIMITER = 127;
-
-    String CFG_CUBE_NAME = "cube.name";
-    String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
-    String CFG_CUBE_CUBOID_LEVEL = "cube.cuboid.level";
-
-    String CFG_II_NAME = "ii.name";
-    String CFG_II_SEGMENT_NAME = "ii.segment.name";
-
-    String OUTPUT_PATH = "output.path";
-
-    String TABLE_NAME = "table.name";
-    String TABLE_COLUMNS = "table.columns";
-
-    String CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER = "cube.intermediate.table.row.delimiter";
-
-    String MAPREDUCE_COUNTER_GROUP_NAME = "Cube Builder";
-
-    String MAPPER_SAMPLE_NUMBER = "mapper.sample.number";
-    String REGION_NUMBER = "region.number";
-    String REGION_NUMBER_MIN = "region.number.min";
-    String REGION_NUMBER_MAX = "region.number.max";
-    String REGION_SPLIT_SIZE = "region.split.size";
-    String CUBE_CAPACITY = "cube.capacity";
-
-    String CFG_KYLIN_LOCAL_TEMP_DIR = "/tmp/kylin/";
-    String CFG_KYLIN_HDFS_TEMP_DIR = "/tmp/kylin/";
-
-    int COUNTER_MAX = 100000;
-    int ERROR_RECORD_THRESHOLD = 100;
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
deleted file mode 100644
index 3d98b0b..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.constant;
-
-/**
- * Created by qianzhou on 1/5/15.
- */
-public final class ExecutableConstants {
-
-    private ExecutableConstants() {
-    }
-
-    public static final String YARN_APP_ID = "yarn_application_id";
-
-    public static final String YARN_APP_URL = "yarn_application_tracking_url";
-    public static final String MR_JOB_ID = "mr_job_id";
-    public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
-    public static final String SOURCE_RECORDS_COUNT = "source_records_count";
-    public static final String SOURCE_RECORDS_SIZE = "source_records_size";
-    public static final String GLOBAL_LISTENER_NAME = "ChainListener";
-
-    public static final int DEFAULT_SCHEDULER_INTERVAL_SECONDS = 60;
-
-    public static final String CUBE_JOB_GROUP_NAME = "cube_job_group";
-
-    public static final String DAEMON_JOB_GROUP_NAME = "daemon_job_group";
-    public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
-
-    public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
-    public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
-    public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data";
-    public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid Data";
-    public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits";
-    public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable";
-    public static final String STEP_NAME_CONVERT_CUBOID_TO_HFILE = "Convert Cuboid Data to HFile";
-    public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table";
-    public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
-    public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
-    public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
-    public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
-
-    public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
-    public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";
-
-    public static final String PROP_ENGINE_CONTEXT = "jobengineConfig";
-    public static final String PROP_JOB_FLOW = "jobFlow";
-    public static final String PROP_JOBINSTANCE_UUID = "jobInstanceUuid";
-    public static final String PROP_JOBSTEP_SEQ_ID = "jobStepSequenceID";
-    public static final String PROP_COMMAND = "command";
-    // public static final String PROP_STORAGE_LOCATION =
-    // "storageLocationIdentifier";
-    public static final String PROP_JOB_ASYNC = "jobAsync";
-    public static final String PROP_JOB_CMD_EXECUTOR = "jobCmdExecutor";
-    public static final String PROP_JOB_CMD_OUTPUT = "jobCmdOutput";
-    public static final String PROP_JOB_KILLED = "jobKilled";
-    public static final String PROP_JOB_RUNTIME_FLOWS = "jobFlows";
-
-    public static final String NOTIFY_EMAIL_TEMPLATE = "<div><b>Build Result of Job ${job_name}</b><pre><ul>" + "<li>Build Result: <b>${result}</b></li>" + "<li>Job Engine: ${job_engine}</li>" + "<li>Cube Name: ${cube_name}</li>" + "<li>Source Records Count: ${source_records_count}</li>" + "<li>Start Time: ${start_time}</li>" + "<li>Duration: ${duration}</li>" + "<li>MR Waiting: ${mr_waiting}</li>" + "<li>Last Update Time: ${last_update_time}</li>" + "<li>Submitter: ${submitter}</li>" + "<li>Error Log: ${error_log}</li>" + "</ul></pre><div/>";
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
deleted file mode 100644
index 9c8f083..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.constant;
-
-public enum JobStatusEnum {
-
-    NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16);
-
-    private final int code;
-
-    private JobStatusEnum(int statusCode) {
-        this.code = statusCode;
-    }
-
-    public static JobStatusEnum getByCode(int statusCode) {
-        for (JobStatusEnum status : values()) {
-            if (status.getCode() == statusCode) {
-                return status;
-            }
-        }
-
-        return null;
-    }
-
-    public int getCode() {
-        return this.code;
-    }
-
-    public boolean isComplete() {
-        return code == JobStatusEnum.FINISHED.getCode() || code == JobStatusEnum.ERROR.getCode() || code == JobStatusEnum.DISCARDED.getCode();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
deleted file mode 100644
index 0e4c18e..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.constant;
-
-/**
- * @author xduo, ysong1
- * 
- */
-public enum JobStepCmdTypeEnum {
-    SHELL_CMD, SHELL_CMD_HADOOP, JAVA_CMD_HADOOP_FACTDISTINCT, JAVA_CMD_HADOOP_BASECUBOID, JAVA_CMD_HADOOP_NDCUBOID, JAVA_CMD_HADOOP_RANGEKEYDISTRIBUTION, JAVA_CMD_HADOOP_CONVERTHFILE, JAVA_CMD_HADOOP_MERGECUBOID, JAVA_CMD_HADOOP_NO_MR_DICTIONARY, JAVA_CMD_HADDOP_NO_MR_CREATEHTABLE, JAVA_CMD_HADOOP_NO_MR_BULKLOAD
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
deleted file mode 100644
index fbcfd97..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.constant;
-
-public enum JobStepStatusEnum {
-    NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16), WAITING(32), KILLED(64);
-
-    private final int code;
-
-    private JobStepStatusEnum(int statusCode) {
-        this.code = statusCode;
-    }
-
-    public static JobStepStatusEnum getByCode(int statusCode) {
-        for (JobStepStatusEnum status : values()) {
-            if (status.getCode() == statusCode) {
-                return status;
-            }
-        }
-
-        return null;
-    }
-
-    public int getCode() {
-        return this.code;
-    }
-
-    public boolean isComplete() {
-        return code == JobStepStatusEnum.FINISHED.getCode() || code == JobStepStatusEnum.ERROR.getCode() || code == JobStepStatusEnum.DISCARDED.getCode();
-    }
-
-    public boolean isRunable() {
-        return code == JobStepStatusEnum.PENDING.getCode() || code == JobStepStatusEnum.ERROR.getCode();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
deleted file mode 100644
index 182196c..0000000
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.cube;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.execution.Output;
-
-/**
- * Created by qianzhou on 12/25/14.
- */
-public class CubingJob extends DefaultChainedExecutable {
-
-    public CubingJob() {
-        super();
-    }
-
-    private static final String CUBE_INSTANCE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
-
-    public void setCubeName(String name) {
-        setParam(CUBE_INSTANCE_NAME, name);
-    }
-
-    public String getCubeName() {
-        return getParam(CUBE_INSTANCE_NAME);
-    }
-
-    void setSegmentIds(List<String> segmentIds) {
-        setParam(SEGMENT_ID, StringUtils.join(segmentIds, ","));
-    }
-
-    void setSegmentId(String segmentId) {
-        setParam(SEGMENT_ID, segmentId);
-    }
-
-    public String getSegmentIds() {
-        return getParam(SEGMENT_ID);
-    }
-
-    @Override
-    protected Pair<String, String> formatNotifications(ExecutableState state) {
-        final Output output = jobService.getOutput(getId());
-        String logMsg = "";
-        switch (output.getState()) {
-        case ERROR:
-            logMsg = output.getVerboseMsg();
-            break;
-        case DISCARDED:
-            break;
-        case SUCCEED:
-            break;
-        default:
-            return null;
-        }
-        String content = ExecutableConstants.NOTIFY_EMAIL_TEMPLATE;
-        content = content.replaceAll("\\$\\{job_name\\}", getName());
-        content = content.replaceAll("\\$\\{result\\}", state.toString());
-        content = content.replaceAll("\\$\\{cube_name\\}", getCubeName());
-        content = content.replaceAll("\\$\\{source_records_count\\}", StringUtil.noBlank(getSourceRecordCount(), "0"));
-        content = content.replaceAll("\\$\\{start_time\\}", new Date(getStartTime()).toString());
-        content = content.replaceAll("\\$\\{duration\\}", getDuration() / 60000 + "mins");
-        content = content.replaceAll("\\$\\{mr_waiting\\}", getMapReduceWaitTime() / 60000 + "mins");
-        content = content.replaceAll("\\$\\{last_update_time\\}", new Date(getLastModified()).toString());
-        content = content.replaceAll("\\$\\{submitter\\}", StringUtil.noBlank(getSubmitter(), "missing submitter"));
-        content = content.replaceAll("\\$\\{error_log\\}", StringUtil.noBlank(logMsg, "no error log"));
-
-        try {
-            InetAddress inetAddress = InetAddress.getLocalHost();
-            content = content.replaceAll("\\$\\{job_engine\\}", inetAddress.getCanonicalHostName());
-        } catch (UnknownHostException e) {
-            logger.warn(e.getLocalizedMessage(), e);
-        }
-
-        String title = "[" + state.toString() + "] - [Kylin Cube Build Job]-" + getCubeName();
-        return Pair.of(title, content);
-    }
-
-    @Override
-    protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
-        long time = 0L;
-        for (AbstractExecutable task : getTasks()) {
-            final ExecutableState status = task.getStatus();
-            if (status != ExecutableState.SUCCEED) {
-                break;
-            }
-            if (task instanceof MapReduceExecutable) {
-                time += ((MapReduceExecutable) task).getMapReduceWaitTime();
-            }
-        }
-        setMapReduceWaitTime(time);
-        super.onExecuteFinished(result, executableContext);
-    }
-
-    public long getMapReduceWaitTime() {
-        return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
-    }
-
-    public void setMapReduceWaitTime(long t) {
-        addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
-    }
-
-
-    public final String getSourceRecordCount() {
-        for (AbstractExecutable task : getTasks()) {
-            if (ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID.equals(task.getName())) {
-               return getExtraInfo(task.getOutput(), ExecutableConstants.SOURCE_RECORDS_COUNT);
-            }
-        }
-        return "N/A";
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
deleted file mode 100644
index 80c030f..0000000
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ /dev/null
@@ -1,476 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.cube;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-import java.util.TimeZone;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.AbstractJobBuilder;
-import org.apache.kylin.job.common.HadoopShellExecutable;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.hadoop.cube.BaseCuboidJob;
-import org.apache.kylin.job.hadoop.cube.CubeHFileJob;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
-import org.apache.kylin.job.hadoop.cube.MergeCuboidJob;
-import org.apache.kylin.job.hadoop.cube.NDCuboidJob;
-import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob;
-import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
-import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
-import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
-import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * Created by qianzhou on 12/25/14.
- */
-public final class CubingJobBuilder extends AbstractJobBuilder {
-
-    public CubingJobBuilder(JobEngineConfig engineConfig) {
-        super(engineConfig);
-    }
-
-    public CubingJob buildJob(CubeSegment seg) {
-        checkPreconditions(seg);
-
-        final CubingJob result = initialJob(seg, "BUILD");
-        final String jobId = result.getId();
-        final String cuboidRootPath = getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
-        final List<String> toDeletePaths = Lists.newArrayList();
-
-        // cubing
-        Pair<AbstractExecutable, AbstractExecutable> twoSteps = addCubingSteps(seg, cuboidRootPath, result, toDeletePaths);
-        String intermediateHiveTableStepId = twoSteps.getFirst().getId();
-        String baseCuboidStepId = twoSteps.getSecond().getId();
-
-        // convert htable
-        AbstractExecutable convertCuboidToHfileStep = addHTableSteps(seg, cuboidRootPath, result);
-
-        // update cube info
-        result.addTask(createUpdateCubeInfoAfterBuildStep(seg, intermediateHiveTableStepId, baseCuboidStepId, convertCuboidToHfileStep.getId(), jobId));
-
-        final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
-        final String hiveIntermediateTable = this.getIntermediateHiveTableName(intermediateTableDesc, jobId);
-        result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable, toDeletePaths));
-
-        return result;
-    }
-
-    public CubingJob buildAndMergeJob(CubeSegment appendSegment, CubeSegment mergeSegment) {
-        checkPreconditions(appendSegment, mergeSegment);
-
-        CubingJob result = initialJob(mergeSegment, "BUILD");
-        result.setSegmentIds(Lists.newArrayList(new String[] { appendSegment.getUuid(), mergeSegment.getUuid() }));
-        final String jobId = result.getId();
-        final String appendRootPath = getJobWorkingDir(jobId) + "/" + appendSegment.getCubeInstance().getName() + "/append_cuboid/";
-        final String mergedRootPath = getJobWorkingDir(jobId) + "/" + appendSegment.getCubeInstance().getName() + "/cuboid/";
-        List<String> mergingSegmentIds = Lists.newArrayList();
-        List<String> mergingCuboidPaths = Lists.newArrayList();
-        List<String> mergingHTables = Lists.newArrayList();
-        final List<String> toDeletePaths = Lists.newArrayList();
-
-        // cubing the incremental segment
-        Pair<AbstractExecutable, AbstractExecutable> twoSteps = addCubingSteps(appendSegment, appendRootPath, result, toDeletePaths);
-        final String intermediateHiveTableStepId = twoSteps.getFirst().getId();
-        final String baseCuboidStepId = twoSteps.getSecond().getId();
-
-        // update the append segment info
-        result.addTask(createUpdateCubeInfoAfterBuildStep(appendSegment, intermediateHiveTableStepId, baseCuboidStepId, null, jobId));
-
-        List<CubeSegment> mergingSegments = mergeSegment.getCubeInstance().getMergingSegments(mergeSegment);
-        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
-
-        for (CubeSegment merging : mergingSegments) {
-            mergingSegmentIds.add(merging.getUuid());
-            mergingHTables.add(merging.getStorageLocationIdentifier());
-            if (merging.equals(appendSegment)) {
-                mergingCuboidPaths.add(appendRootPath + "*");
-            } else {
-                mergingCuboidPaths.add(getPathToMerge(merging));
-            }
-            toDeletePaths.add(getJobWorkingDir(merging.getLastBuildJobID()));
-        }
-
-        // merge cuboid
-        addMergeSteps(mergeSegment, mergingSegmentIds, mergingCuboidPaths, mergedRootPath, result);
-
-        // convert htable
-        AbstractExecutable convertCuboidToHfileStep = addHTableSteps(mergeSegment, mergedRootPath, result);
-
-        // update cube info
-        result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
-        result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null, toDeletePaths));
-
-        return result;
-    }
-
-    public CubingJob mergeJob(CubeSegment seg) {
-        checkPreconditions(seg);
-
-        CubingJob result = initialJob(seg, "MERGE");
-        final String jobId = result.getId();
-        final String mergedCuboidPath = getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
-
-        List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
-        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
-        List<String> mergingSegmentIds = Lists.newArrayList();
-        List<String> mergingCuboidPaths = Lists.newArrayList();
-        List<String> mergingHTables = Lists.newArrayList();
-        final List<String> toDeletePaths = Lists.newArrayList();
-
-        for (CubeSegment merging : mergingSegments) {
-            mergingSegmentIds.add(merging.getUuid());
-            mergingCuboidPaths.add(getPathToMerge(merging));
-            mergingHTables.add(merging.getStorageLocationIdentifier());
-            toDeletePaths.add(getJobWorkingDir(merging.getLastBuildJobID()));
-        }
-
-        // merge cuboid
-        addMergeSteps(seg, mergingSegmentIds, mergingCuboidPaths, mergedCuboidPath, result);
-
-        // convert htable
-        AbstractExecutable convertCuboidToHfileStep = addHTableSteps(seg, mergedCuboidPath, result);
-
-        // update cube info
-        result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
-        result.addTask(createGarbageCollectionStep(seg, mergingHTables, null, toDeletePaths));
-        return result;
-    }
-
-    void addMergeSteps(CubeSegment seg, List<String> mergingSegmentIds, List<String> mergingCuboidPaths, String mergedCuboidPath, CubingJob result) {
-
-        result.addTask(createMergeDictionaryStep(seg, mergingSegmentIds));
-
-        String formattedPath = StringUtils.join(mergingCuboidPaths, ",");
-        result.addTask(createMergeCuboidDataStep(seg, formattedPath, mergedCuboidPath));
-    }
-
-    Pair<AbstractExecutable, AbstractExecutable> addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result, List<String> toDeletePaths) {
-        final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
-        final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
-
-        final String jobId = result.getId();
-        final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
-        final String intermediateHiveTableName = getIntermediateHiveTableName(intermediateTableDesc, jobId);
-        final String intermediateHiveTableLocation = getIntermediateHiveTableLocation(intermediateTableDesc, jobId);
-        final String factDistinctColumnsPath = getFactDistinctColumnsPath(seg, jobId);
-        final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
-
-        final AbstractExecutable intermediateHiveTableStep = createIntermediateHiveTableStep(intermediateTableDesc, jobId);
-        result.addTask(intermediateHiveTableStep);
-
-        result.addTask(createFactDistinctColumnsStep(seg, intermediateHiveTableName, jobId));
-
-        result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
-
-        // base cuboid step
-        final MapReduceExecutable baseCuboidStep = createBaseCuboidStep(seg, intermediateHiveTableLocation, cuboidOutputTempPath);
-        result.addTask(baseCuboidStep);
-
-        // n dim cuboid steps
-        for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
-            int dimNum = totalRowkeyColumnsCount - i;
-            result.addTask(createNDimensionCuboidStep(seg, cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
-        }
-
-        toDeletePaths.add(intermediateHiveTableLocation);
-        toDeletePaths.add(factDistinctColumnsPath);
-
-        return new Pair<AbstractExecutable, AbstractExecutable>(intermediateHiveTableStep, baseCuboidStep);
-    }
-
-    AbstractExecutable addHTableSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
-        final String jobId = result.getId();
-        final String cuboidPath = cuboidRootPath + "*";
-
-        result.addTask(createRangeRowkeyDistributionStep(seg, cuboidPath, jobId));
-        // create htable step
-        result.addTask(createCreateHTableStep(seg, jobId));
-        // generate hfiles step
-        final MapReduceExecutable convertCuboidToHfileStep = createConvertCuboidToHfileStep(seg, cuboidPath, jobId);
-        result.addTask(convertCuboidToHfileStep);
-        // bulk load step
-        result.addTask(createBulkLoadStep(seg, jobId));
-
-        return convertCuboidToHfileStep;
-    }
-
-    private CubingJob initialJob(CubeSegment seg, String type) {
-        CubingJob result = new CubingJob();
-        SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
-        format.setTimeZone(TimeZone.getTimeZone(engineConfig.getTimeZone()));
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setName(seg.getCubeInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis())));
-        result.setSubmitter(submitter);
-        result.setNotifyList(seg.getCubeInstance().getDescriptor().getNotifyList());
-        return result;
-    }
-
-    private void checkPreconditions(CubeSegment... segments) {
-        for (CubeSegment seg : segments) {
-            Preconditions.checkNotNull(seg, "segment cannot be null");
-        }
-        Preconditions.checkNotNull(engineConfig, "jobEngineConfig cannot be null");
-    }
-
-    private void appendMapReduceParameters(StringBuilder builder, CubeSegment seg) {
-        try {
-            String jobConf = engineConfig.getHadoopJobConfFilePath(seg.getCubeDesc().getModel().getCapacity());
-            if (jobConf != null && jobConf.length() > 0) {
-                builder.append(" -conf ").append(jobConf);
-            }
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
-        String[] paths = new String[groupRowkeyColumnsCount + 1];
-        for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
-            int dimNum = totalRowkeyColumnCount - i;
-            if (dimNum == totalRowkeyColumnCount) {
-                paths[i] = cuboidRootPath + "base_cuboid";
-            } else {
-                paths[i] = cuboidRootPath + dimNum + "d_cuboid";
-            }
-        }
-        return paths;
-    }
-
-    private String getPathToMerge(CubeSegment seg) {
-        return getJobWorkingDir(seg.getLastBuildJobID()) + "/" + seg.getCubeInstance().getName() + "/cuboid/*";
-    }
-
-    private String getRowkeyDistributionOutputPath(CubeSegment seg, String jobId) {
-        return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats");
-    }
-
-    private String getFactDistinctColumnsPath(CubeSegment seg, String jobUuid) {
-        return getJobWorkingDir(jobUuid) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
-    }
-
-    private String getHFilePath(CubeSegment seg, String jobId) {
-        return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/");
-    }
-
-    private MapReduceExecutable createFactDistinctColumnsStep(CubeSegment seg, String intermediateHiveTableName, String jobId) {
-        MapReduceExecutable result = new MapReduceExecutable();
-        result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
-        result.setMapReduceJobClass(FactDistinctColumnsJob.class);
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(seg, jobId));
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
-        appendExecCmdParameters(cmd, "tablename", intermediateHiveTableName);
-
-        result.setMapReduceParams(cmd.toString());
-        return result;
-    }
-
-    private HadoopShellExecutable createBuildDictionaryStep(CubeSegment seg, String factDistinctColumnsPath) {
-        // base cuboid job
-        HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
-        buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", factDistinctColumnsPath);
-
-        buildDictionaryStep.setJobParams(cmd.toString());
-        buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
-        return buildDictionaryStep;
-    }
-
-    private MapReduceExecutable createBaseCuboidStep(CubeSegment seg, String intermediateHiveTableLocation, String[] cuboidOutputTempPath) {
-        // base cuboid job
-        MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
-
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg);
-
-        baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
-
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", intermediateHiveTableLocation);
-        appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "level", "0");
-
-        baseCuboidStep.setMapReduceParams(cmd.toString());
-        baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
-        return baseCuboidStep;
-    }
-
-    private MapReduceExecutable createNDimensionCuboidStep(CubeSegment seg, String[] cuboidOutputTempPath, int dimNum, int totalRowkeyColumnCount) {
-        // ND cuboid job
-        MapReduceExecutable ndCuboidStep = new MapReduceExecutable();
-
-        ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension");
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
-        appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getCubeInstance().getName() + "_Step");
-        appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum));
-
-        ndCuboidStep.setMapReduceParams(cmd.toString());
-        ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class);
-        return ndCuboidStep;
-    }
-
-    private MapReduceExecutable createRangeRowkeyDistributionStep(CubeSegment seg, String inputPath, String jobId) {
-        MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable();
-        rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(seg, jobId));
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getCubeInstance().getName() + "_Step");
-
-        rowkeyDistributionStep.setMapReduceParams(cmd.toString());
-        rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class);
-        return rowkeyDistributionStep;
-    }
-
-    private HadoopShellExecutable createCreateHTableStep(CubeSegment seg, String jobId) {
-        HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
-        createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(seg, jobId) + "/part-r-00000");
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-
-        createHtableStep.setJobParams(cmd.toString());
-        createHtableStep.setJobClass(CreateHTableJob.class);
-
-        return createHtableStep;
-    }
-
-    private MapReduceExecutable createConvertCuboidToHfileStep(CubeSegment seg, String inputPath, String jobId) {
-        MapReduceExecutable createHFilesStep = new MapReduceExecutable();
-        createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getCubeInstance().getName() + "_Step");
-
-        createHFilesStep.setMapReduceParams(cmd.toString());
-        createHFilesStep.setMapReduceJobClass(CubeHFileJob.class);
-
-        return createHFilesStep;
-    }
-
-    private HadoopShellExecutable createBulkLoadStep(CubeSegment seg, String jobId) {
-        HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
-        bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
-
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "input", getHFilePath(seg, jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-
-        bulkLoadStep.setJobParams(cmd.toString());
-        bulkLoadStep.setJobClass(BulkLoadJob.class);
-
-        return bulkLoadStep;
-
-    }
-
-    private UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(CubeSegment seg, String createFlatTableStepId, String baseCuboidStepId, String convertToHFileStepId, String jobId) {
-        final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
-        updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
-        updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
-        updateCubeInfoStep.setSegmentId(seg.getUuid());
-        updateCubeInfoStep.setCreateFlatTableStepId(createFlatTableStepId);
-        updateCubeInfoStep.setBaseCuboidStepId(baseCuboidStepId);
-        updateCubeInfoStep.setConvertToHFileStepId(convertToHFileStepId);
-        updateCubeInfoStep.setCubingJobId(jobId);
-        return updateCubeInfoStep;
-    }
-
-    private MergeDictionaryStep createMergeDictionaryStep(CubeSegment seg, List<String> mergingSegmentIds) {
-        MergeDictionaryStep result = new MergeDictionaryStep();
-        result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
-        return result;
-    }
-
-    private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) {
-        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
-        mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", outputPath);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
-
-        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
-        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class);
-        return mergeCuboidDataStep;
-    }
-
-    private UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(CubeSegment seg, List<String> mergingSegmentIds, String convertToHFileStepId, String jobId) {
-        UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
-        result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
-        result.setConvertToHFileStepId(convertToHFileStepId);
-        result.setCubingJobId(jobId);
-        return result;
-    }
-
-    private GarbageCollectionStep createGarbageCollectionStep(CubeSegment seg, List<String> oldHtables, String hiveIntermediateTable, List<String> oldHdsfPaths) {
-        GarbageCollectionStep result = new GarbageCollectionStep();
-        result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
-        result.setOldHTables(oldHtables);
-        result.setOldHiveTable(hiveIntermediateTable);
-        result.setOldHdfsPaths(oldHdsfPaths);
-        return result;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
deleted file mode 100644
index f2f1fc0..0000000
--- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.job.cmd.ShellCmdOutput;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * Drop the resources that is no longer needed, including intermediate hive table (after cube build) and hbase tables (after cube merge)
- */
-public class GarbageCollectionStep extends AbstractExecutable {
-
-    private static final String OLD_HTABLES = "oldHTables";
-
-    private static final String OLD_HIVE_TABLE = "oldHiveTable";
-
-    private static final String OLD_HDFS_PATHS = "oldHdfsPaths";
-
-    private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
-
-    private StringBuffer output;
-
-    public GarbageCollectionStep() {
-        super();
-        output = new StringBuffer();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-
-        try {
-            dropHBaseTable(context);
-            dropHiveTable(context);
-            dropHdfsPath(context);
-        } catch (IOException e) {
-            logger.error("job:" + getId() + " execute finished with exception", e);
-            output.append("\n").append(e.getLocalizedMessage());
-            return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
-        }
-
-        return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
-    }
-
-    private void dropHiveTable(ExecutableContext context) throws IOException {
-        final String hiveTable = this.getOldHiveTable();
-        if (StringUtils.isNotEmpty(hiveTable)) {
-            final String dropSQL = "USE " + KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable() + ";" + " DROP TABLE IF EXISTS  " + hiveTable + ";";
-            final String dropHiveCMD = "hive -e \"" + dropSQL + "\"";
-            logger.info("executing: " + dropHiveCMD);
-            ShellCmdOutput shellCmdOutput = new ShellCmdOutput();
-            context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput);
-            logger.debug("Dropped Hive table " + hiveTable + " \n");
-            output.append(shellCmdOutput.getOutput() + " \n");
-            output.append("Dropped Hive table " + hiveTable + " \n");
-        }
-
-    }
-
-    private void dropHBaseTable(ExecutableContext context) throws IOException {
-        List<String> oldTables = getOldHTables();
-        if (oldTables != null && oldTables.size() > 0) {
-            String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
-            Configuration conf = HBaseConfiguration.create();
-            HBaseAdmin admin = null;
-            try {
-                admin = new HBaseAdmin(conf);
-                for (String table : oldTables) {
-                    if (admin.tableExists(table)) {
-                        HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
-                        String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
-                        if (metadataUrlPrefix.equalsIgnoreCase(host)) {
-                            if (admin.isTableEnabled(table)) {
-                                admin.disableTable(table);
-                            }
-                            admin.deleteTable(table);
-                            logger.debug("Dropped HBase table " + table);
-                            output.append("Dropped HBase table " + table + " \n");
-                        } else {
-                            logger.debug("Skipped HBase table " + table);
-                            output.append("Skipped HBase table " + table + " \n");
-                        }
-                    }
-                }
-
-            } finally {
-                if (admin != null)
-                    try {
-                        admin.close();
-                    } catch (IOException e) {
-                        logger.error(e.getLocalizedMessage());
-                    }
-            }
-        }
-    }
-
-    private void dropHdfsPathOnCluster(List<String> oldHdfsPaths, FileSystem fileSystem) throws IOException {
-        if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) {
-            logger.debug("Drop HDFS path on FileSystem: " + fileSystem.getUri());
-            output.append("Drop HDFS path on FileSystem: \"" + fileSystem.getUri() + "\" \n");
-            for (String path : oldHdfsPaths) {
-                if (path.endsWith("*"))
-                    path = path.substring(0, path.length() - 1);
-
-                Path oldPath = new Path(path);
-                if (fileSystem.exists(oldPath)) {
-                    fileSystem.delete(oldPath, true);
-                    logger.debug("Dropped HDFS path: " + path);
-                    output.append("Dropped HDFS path  \"" + path + "\" \n");
-                } else {
-                    logger.debug("HDFS path not exists: " + path);
-                    output.append("HDFS path not exists: \"" + path + "\" \n");
-                }
-            }
-        }
-    }
-
-    private void dropHdfsPath(ExecutableContext context) throws IOException {
-        List<String> oldHdfsPaths = this.getOldHdfsPaths();
-        FileSystem fileSystem = FileSystem.get(HadoopUtil.getCurrentConfiguration());
-        dropHdfsPathOnCluster(oldHdfsPaths, fileSystem);
-        
-        if (StringUtils.isNotEmpty(context.getConfig().getHBaseClusterFs())) {
-            fileSystem = FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration());
-            dropHdfsPathOnCluster(oldHdfsPaths, fileSystem);
-        }
-        
-    }
-
-    public void setOldHTables(List<String> tables) {
-        setArrayParam(OLD_HTABLES, tables);
-    }
-
-    private List<String> getOldHTables() {
-        return getArrayParam(OLD_HTABLES);
-    }
-
-    public void setOldHdfsPaths(List<String> paths) {
-        setArrayParam(OLD_HDFS_PATHS, paths);
-    }
-
-    private List<String> getOldHdfsPaths() {
-        return getArrayParam(OLD_HDFS_PATHS);
-    }
-
-    private void setArrayParam(String paramKey, List<String> paramValues) {
-        setParam(paramKey, StringUtils.join(paramValues, ","));
-    }
-
-    private List<String> getArrayParam(String paramKey) {
-        final String ids = getParam(paramKey);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id : splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-    public void setOldHiveTable(String hiveTable) {
-        setParam(OLD_HIVE_TABLE, hiveTable);
-    }
-
-    private String getOldHiveTable() {
-        return getParam(OLD_HIVE_TABLE);
-    }
-
-}