You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/27 11:21:35 UTC

[33/52] [abbrv] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java b/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
index b00e10e..f8d6e1a 100644
--- a/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
+++ b/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
@@ -45,23 +45,20 @@ import org.apache.kylin.cube.model.v1.CubeSegment;
 import org.apache.kylin.cube.model.v1.CubeSegmentStatusEnum;
 import org.apache.kylin.cube.model.v1.CubeStatusEnum;
 import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.job.common.HadoopShellExecutable;
-import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
+import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
+import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
+import org.apache.kylin.engine.mr.steps.NDCuboidJob;
+import org.apache.kylin.engine.mr.steps.RangeKeyDistributionJob;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobStepStatusEnum;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.hadoop.cube.BaseCuboidJob;
-import org.apache.kylin.job.hadoop.cube.CubeHFileJob;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
-import org.apache.kylin.job.hadoop.cube.MergeCuboidJob;
-import org.apache.kylin.job.hadoop.cube.NDCuboidJob;
-import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob;
-import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
-import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
-import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
 import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.MetadataManager;
@@ -72,6 +69,9 @@ import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.project.RealizationEntry;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.storage.hbase.steps.BulkLoadJob;
+import org.apache.kylin.storage.hbase.steps.CreateHTableJob;
+import org.apache.kylin.storage.hbase.steps.CubeHFileJob;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/JobInstance.java b/job/src/main/java/org/apache/kylin/job/JobInstance.java
deleted file mode 100644
index e7f5772..0000000
--- a/job/src/main/java/org/apache/kylin/job/JobInstance.java
+++ /dev/null
@@ -1,504 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonBackReference;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonManagedReference;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Lists;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.cube.model.CubeBuildTypeEnum;
-import org.apache.kylin.job.constant.JobStatusEnum;
-import org.apache.kylin.job.constant.JobStepCmdTypeEnum;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.engine.JobEngineConfig;
-
-@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class JobInstance extends RootPersistentEntity implements Comparable<JobInstance> {
-
-    public static final String JOB_WORKING_DIR_PREFIX = "kylin-";
-
-    public static final String YARN_APP_ID = "yarn_application_id";
-    public static final String YARN_APP_URL = "yarn_application_tracking_url";
-    public static final String MR_JOB_ID = "mr_job_id";
-    public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
-    public static final String SOURCE_RECORDS_COUNT = "source_records_count";
-    public static final String SOURCE_RECORDS_SIZE = "source_records_size";
-
-    public static String getStepIdentity(JobInstance jobInstance, JobStep jobStep) {
-        return jobInstance.getRelatedCube() + "." + jobInstance.getUuid() + "." + jobStep.getSequenceID();
-    }
-
-    public static String getJobIdentity(JobInstance jobInstance) {
-        return jobInstance.getRelatedCube() + "." + jobInstance.getUuid();
-    }
-
-    public static String getJobWorkingDir(JobInstance jobInstance, JobEngineConfig engineConfig) {
-        return getJobWorkingDir(jobInstance.getUuid(), engineConfig.getHdfsWorkingDirectory());
-    }
-
-    public static String getJobWorkingDir(String jobUuid, String hdfsWorkdingDir) {
-        if (jobUuid == null || jobUuid.equals("")) {
-            throw new IllegalArgumentException("jobUuid can't be null or empty");
-        }
-        return hdfsWorkdingDir + "/" + JOB_WORKING_DIR_PREFIX + jobUuid;
-    }
-
-    @JsonProperty("name")
-    private String name;
-
-    @JsonProperty("type")
-    private CubeBuildTypeEnum type; // java implementation
-    @JsonProperty("duration")
-    private long duration;
-    @JsonProperty("related_cube")
-    private String relatedCube;
-    @JsonProperty("related_segment")
-    private String relatedSegment;
-    @JsonProperty("exec_start_time")
-    private long execStartTime;
-    @JsonProperty("exec_end_time")
-    private long execEndTime;
-    @JsonProperty("mr_waiting")
-    private long mrWaiting = 0;
-    @JsonManagedReference
-    @JsonProperty("steps")
-    private List<JobStep> steps;
-    @JsonProperty("submitter")
-    private String submitter;
-    @JsonProperty("job_status")
-    private JobStatusEnum status;
-
-    public JobStep getRunningStep() {
-        for (JobStep step : this.getSteps()) {
-            if (step.getStatus().equals(JobStepStatusEnum.RUNNING) || step.getStatus().equals(JobStepStatusEnum.WAITING)) {
-                return step;
-            }
-        }
-
-        return null;
-    }
-
-    @JsonProperty("progress")
-    public double getProgress() {
-        int completedStepCount = 0;
-        for (JobStep step : this.getSteps()) {
-            if (step.getStatus().equals(JobStepStatusEnum.FINISHED)) {
-                completedStepCount++;
-            }
-        }
-
-        return 100.0 * completedStepCount / steps.size();
-    }
-
-    public JobStatusEnum getStatus() {
-        return this.status;
-    }
-
-    public void setStatus(JobStatusEnum status) {
-        this.status = status;
-    }
-
-//    @JsonProperty("job_status")
-//    public JobStatusEnum getStatus() {
-//
-//        // JobStatusEnum finalJobStatus;
-//        int compositResult = 0;
-//
-//        // if steps status are all NEW, then job status is NEW
-//        // if steps status are all FINISHED, then job status is FINISHED
-//        // if steps status are all PENDING, then job status is PENDING
-//        // if steps status are FINISHED and PENDING, the job status is PENDING
-//        // if one of steps status is RUNNING, then job status is RUNNING
-//        // if one of steps status is ERROR, then job status is ERROR
-//        // if one of steps status is KILLED, then job status is KILLED
-//        // default status is RUNNING
-//
-//        System.out.println(this.getName());
-//
-//        for (JobStep step : this.getSteps()) {
-//            //System.out.println("step: " + step.getSequenceID() + "'s status:" + step.getStatus());
-//            compositResult = compositResult | step.getStatus().getCode();
-//        }
-//
-//        System.out.println();
-//
-//        if (compositResult == JobStatusEnum.FINISHED.getCode()) {
-//            return JobStatusEnum.FINISHED;
-//        } else if (compositResult == JobStatusEnum.NEW.getCode()) {
-//            return JobStatusEnum.NEW;
-//        } else if (compositResult == JobStatusEnum.PENDING.getCode()) {
-//            return JobStatusEnum.PENDING;
-//        } else if (compositResult == (JobStatusEnum.FINISHED.getCode() | JobStatusEnum.PENDING.getCode())) {
-//            return JobStatusEnum.PENDING;
-//        } else if ((compositResult & JobStatusEnum.ERROR.getCode()) == JobStatusEnum.ERROR.getCode()) {
-//            return JobStatusEnum.ERROR;
-//        } else if ((compositResult & JobStatusEnum.DISCARDED.getCode()) == JobStatusEnum.DISCARDED.getCode()) {
-//            return JobStatusEnum.DISCARDED;
-//        } else if ((compositResult & JobStatusEnum.RUNNING.getCode()) == JobStatusEnum.RUNNING.getCode()) {
-//            return JobStatusEnum.RUNNING;
-//        }
-//
-//        return JobStatusEnum.RUNNING;
-//    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public CubeBuildTypeEnum getType() {
-        return type;
-    }
-
-    public void setType(CubeBuildTypeEnum type) {
-        this.type = type;
-    }
-
-    public long getDuration() {
-        return duration;
-    }
-
-    public void setDuration(long duration) {
-        this.duration = duration;
-    }
-
-    public String getRelatedCube() {
-        return relatedCube;
-    }
-
-    public void setRelatedCube(String relatedCube) {
-        this.relatedCube = relatedCube;
-    }
-
-    public String getRelatedSegment() {
-        return relatedSegment;
-    }
-
-    public void setRelatedSegment(String relatedSegment) {
-        this.relatedSegment = relatedSegment;
-    }
-
-    /**
-     * @return the execStartTime
-     */
-    public long getExecStartTime() {
-        return execStartTime;
-    }
-
-    /**
-     * @param execStartTime the execStartTime to set
-     */
-    public void setExecStartTime(long execStartTime) {
-        this.execStartTime = execStartTime;
-    }
-
-    /**
-     * @return the execEndTime
-     */
-    public long getExecEndTime() {
-        return execEndTime;
-    }
-
-    /**
-     * @param execEndTime the execEndTime to set
-     */
-    public void setExecEndTime(long execEndTime) {
-        this.execEndTime = execEndTime;
-    }
-
-    public long getMrWaiting() {
-        return this.mrWaiting;
-    }
-
-    public void setMrWaiting(long mrWaiting) {
-        this.mrWaiting = mrWaiting;
-    }
-
-    public List<JobStep> getSteps() {
-        if (steps == null) {
-            steps = Lists.newArrayList();
-        }
-        return steps;
-    }
-
-    public void clearSteps() {
-        getSteps().clear();
-    }
-
-    public void addSteps(Collection<JobStep> steps) {
-        this.getSteps().addAll(steps);
-    }
-
-    public void addStep(JobStep step) {
-        getSteps().add(step);
-    }
-
-    public void addStep(int index, JobStep step) {
-        getSteps().add(index, step);
-    }
-
-    public JobStep findStep(String stepName) {
-        for (JobStep step : getSteps()) {
-            if (stepName.equals(step.getName())) {
-                return step;
-            }
-        }
-        return null;
-    }
-
-        
-    public String getSubmitter() {
-        return submitter;
-    }
-
-    public void setSubmitter(String submitter) {
-        this.submitter = submitter;
-    }
-
-
-
-
-    @JsonIgnoreProperties(ignoreUnknown = true)
-    public static class JobStep implements Comparable<JobStep> {
-
-        @JsonBackReference
-        private JobInstance jobInstance;
-        
-        @JsonProperty("id")
-        private String id;
-
-        @JsonProperty("name")
-        private String name;
-
-        @JsonProperty("sequence_id")
-        private int sequenceID;
-
-        @JsonProperty("exec_cmd")
-        private String execCmd;
-
-        @JsonProperty("interrupt_cmd")
-        private String InterruptCmd;
-
-        @JsonProperty("exec_start_time")
-        private long execStartTime;
-        @JsonProperty("exec_end_time")
-        private long execEndTime;
-        @JsonProperty("exec_wait_time")
-        private long execWaitTime;
-
-        @JsonProperty("step_status")
-        private JobStepStatusEnum status;
-
-        @JsonProperty("cmd_type")
-        private JobStepCmdTypeEnum cmdType = JobStepCmdTypeEnum.SHELL_CMD_HADOOP;
-
-        @JsonProperty("info")
-        private ConcurrentHashMap<String, String> info = new ConcurrentHashMap<String, String>();
-
-        @JsonProperty("run_async")
-        private boolean runAsync = false;
-
-        private ConcurrentHashMap<String, String> getInfo() {
-            return info;
-        }
-
-        public void putInfo(String key, String value) {
-            getInfo().put(key, value);
-        }
-
-        public String getInfo(String key) {
-            return getInfo().get(key);
-        }
-
-        public void clearInfo() {
-            getInfo().clear();
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public void setName(String name) {
-            this.name = name;
-        }
-
-        public int getSequenceID() {
-            return sequenceID;
-        }
-
-        public void setSequenceID(int sequenceID) {
-            this.sequenceID = sequenceID;
-        }
-
-        public String getExecCmd() {
-            return execCmd;
-        }
-
-        public void setExecCmd(String execCmd) {
-            this.execCmd = execCmd;
-        }
-
-        public JobStepStatusEnum getStatus() {
-            return status;
-        }
-
-        public void setStatus(JobStepStatusEnum status) {
-            this.status = status;
-        }
-        
-        
-
-        public String getId() {
-            return id;
-        }
-
-        public void setId(String id) {
-            this.id = id;
-        }
-
-        /**
-         * @return the execStartTime
-         */
-        public long getExecStartTime() {
-            return execStartTime;
-        }
-
-        /**
-         * @param execStartTime the execStartTime to set
-         */
-        public void setExecStartTime(long execStartTime) {
-            this.execStartTime = execStartTime;
-        }
-
-        /**
-         * @return the execEndTime
-         */
-        public long getExecEndTime() {
-            return execEndTime;
-        }
-
-        /**
-         * @param execEndTime the execEndTime to set
-         */
-        public void setExecEndTime(long execEndTime) {
-            this.execEndTime = execEndTime;
-        }
-
-        public long getExecWaitTime() {
-            return execWaitTime;
-        }
-
-        public void setExecWaitTime(long execWaitTime) {
-            this.execWaitTime = execWaitTime;
-        }
-
-        public String getInterruptCmd() {
-            return InterruptCmd;
-        }
-
-        public void setInterruptCmd(String interruptCmd) {
-            InterruptCmd = interruptCmd;
-        }
-
-        public JobStepCmdTypeEnum getCmdType() {
-            return cmdType;
-        }
-
-        public void setCmdType(JobStepCmdTypeEnum cmdType) {
-            this.cmdType = cmdType;
-        }
-
-        /**
-         * @return the runAsync
-         */
-        public boolean isRunAsync() {
-            return runAsync;
-        }
-
-        /**
-         * @param runAsync the runAsync to set
-         */
-        public void setRunAsync(boolean runAsync) {
-            this.runAsync = runAsync;
-        }
-
-        /**
-         * @return the jobInstance
-         */
-        public JobInstance getJobInstance() {
-            return jobInstance;
-        }
-
-        @Override
-        public int hashCode() {
-            final int prime = 31;
-            int result = 1;
-            result = prime * result + ((name == null) ? 0 : name.hashCode());
-            result = prime * result + sequenceID;
-            return result;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj)
-                return true;
-            if (obj == null)
-                return false;
-            if (getClass() != obj.getClass())
-                return false;
-            JobStep other = (JobStep) obj;
-            if (name == null) {
-                if (other.name != null)
-                    return false;
-            } else if (!name.equals(other.name))
-                return false;
-            if (sequenceID != other.sequenceID)
-                return false;
-            return true;
-        }
-
-        @Override
-        public int compareTo(JobStep o) {
-            if (this.sequenceID < o.sequenceID) {
-                return -1;
-            } else if (this.sequenceID > o.sequenceID) {
-                return 1;
-            } else {
-                return 0;
-            }
-        }
-    }
-
-    @Override
-    public int compareTo(JobInstance o) {
-        return o.lastModified<this.lastModified?-1:o.lastModified>this.lastModified?1:0;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
deleted file mode 100644
index eba6bd4..0000000
--- a/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.hive.SqlHiveDataTypeMapping;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
-import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.LookupDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.w3c.dom.Document;
-import org.w3c.dom.NodeList;
-import org.xml.sax.SAXException;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-
-public class JoinedFlatTable {
-
-    public static final String FACT_TABLE_ALIAS = "FACT_TABLE";
-
-    public static final String LOOKUP_TABLE_ALAIS_PREFIX = "LOOKUP_";
-
-    public static String getTableDir(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
-        return storageDfsDir + "/" + intermediateTableDesc.getTableName();
-    }
-
-    public static String generateCreateTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
-        StringBuilder ddl = new StringBuilder();
-
-        ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediateTableDesc.getTableName() + "\n");
-
-        ddl.append("(" + "\n");
-        for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
-            IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
-            if (i > 0) {
-                ddl.append(",");
-            }
-            ddl.append(colName(col.getCanonicalName()) + " " + SqlHiveDataTypeMapping.getHiveDataType(col.getDataType()) + "\n");
-        }
-        ddl.append(")" + "\n");
-
-        ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n");
-        ddl.append("STORED AS SEQUENCEFILE" + "\n");
-        ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName() + "';").append("\n");
-        // ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" +
-        // ";\n");
-        return ddl.toString();
-    }
-
-    public static String generateDropTableStatement(IJoinedFlatTableDesc intermediateTableDesc) {
-        StringBuilder ddl = new StringBuilder();
-        ddl.append("DROP TABLE IF EXISTS " + intermediateTableDesc.getTableName() + ";").append("\n");
-        return ddl.toString();
-    }
-
-    public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, JobEngineConfig engineConfig) throws IOException {
-        StringBuilder sql = new StringBuilder();
-
-        File hadoopPropertiesFile = new File(engineConfig.getHiveConfFilePath());
-
-        if (hadoopPropertiesFile.exists()) {
-            DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
-            DocumentBuilder builder;
-            Document doc;
-            try {
-                builder = factory.newDocumentBuilder();
-                doc = builder.parse(hadoopPropertiesFile);
-                NodeList nl = doc.getElementsByTagName("property");
-                for (int i = 0; i < nl.getLength(); i++) {
-                    String name = doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue();
-                    String value = doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue();
-                    if (name.equals("tmpjars") == false) {
-                        sql.append("SET " + name + "=" + value + ";").append("\n");
-                    }
-                }
-
-            } catch (ParserConfigurationException e) {
-                throw new IOException(e);
-            } catch (SAXException e) {
-                throw new IOException(e);
-            }
-        }
-
-        sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName() + " " + generateSelectDataStatement(intermediateTableDesc) + ";").append("\n");
-
-        return sql.toString();
-    }
-
-    public static String generateSelectDataStatement(IJoinedFlatTableDesc intermediateTableDesc) {
-        StringBuilder sql = new StringBuilder();
-        sql.append("SELECT" + "\n");
-        String tableAlias;
-        Map<String, String> tableAliasMap = buildTableAliasMap(intermediateTableDesc.getDataModel());
-        for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
-            IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
-            if (i > 0) {
-                sql.append(",");
-            }
-            tableAlias = tableAliasMap.get(col.getTableName());
-            sql.append(tableAlias + "." + col.getColumnName() + "\n");
-        }
-        appendJoinStatement(intermediateTableDesc, sql, tableAliasMap);
-        appendWhereStatement(intermediateTableDesc, sql, tableAliasMap);
-        return sql.toString();
-    }
-
-    private static Map<String, String> buildTableAliasMap(DataModelDesc dataModelDesc) {
-        Map<String, String> tableAliasMap = new HashMap<String, String>();
-
-        tableAliasMap.put(dataModelDesc.getFactTable().toUpperCase(), FACT_TABLE_ALIAS);
-
-        int i = 1;
-        for (LookupDesc lookupDesc: dataModelDesc.getLookups()) {
-            JoinDesc join = lookupDesc.getJoin();
-            if (join != null) {
-                tableAliasMap.put(lookupDesc.getTable().toUpperCase(), LOOKUP_TABLE_ALAIS_PREFIX + i);
-                i++;
-            }
-
-        }
-        return tableAliasMap;
-    }
-
-    private static void appendJoinStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
-        Set<String> dimTableCache = new HashSet<String>();
-
-        DataModelDesc dataModelDesc = intermediateTableDesc.getDataModel();
-        String factTableName = dataModelDesc.getFactTable();
-        String factTableAlias = tableAliasMap.get(factTableName);
-        sql.append("FROM " + factTableName + " as " + factTableAlias + " \n");
-
-        for (LookupDesc lookupDesc : dataModelDesc.getLookups()) {
-            JoinDesc join = lookupDesc.getJoin();
-            if (join != null && join.getType().equals("") == false) {
-                String joinType = join.getType().toUpperCase();
-                String dimTableName = lookupDesc.getTable();
-                if (!dimTableCache.contains(dimTableName)) {
-                    TblColRef[] pk = join.getPrimaryKeyColumns();
-                    TblColRef[] fk = join.getForeignKeyColumns();
-                    if (pk.length != fk.length) {
-                        throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
-                    }
-                    sql.append(joinType + " JOIN " + dimTableName + " as " + tableAliasMap.get(dimTableName) + "\n");
-                    sql.append("ON ");
-                    for (int i = 0; i < pk.length; i++) {
-                        if (i > 0) {
-                            sql.append(" AND ");
-                        }
-                        sql.append(factTableAlias + "." + fk[i].getName() + " = " + tableAliasMap.get(dimTableName) + "." + pk[i].getName());
-                    }
-                    sql.append("\n");
-
-                    dimTableCache.add(dimTableName);
-                }
-            }
-        }
-    }
-
-    private static void appendWhereStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
-        if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) {
-            return;//TODO: for now only cube segments support filter and partition
-        }
-        CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) intermediateTableDesc;
-
-        boolean hasCondition = false;
-        StringBuilder whereBuilder = new StringBuilder();
-        whereBuilder.append("WHERE");
-
-        CubeDesc cubeDesc = desc.getCubeDesc();
-        DataModelDesc model = cubeDesc.getModel();
-        
-        if (model.getFilterCondition() != null && model.getFilterCondition().equals("") == false) {
-            whereBuilder.append(" (").append(model.getFilterCondition()).append(") ");
-            hasCondition = true;
-        }
-
-        CubeSegment cubeSegment = desc.getCubeSegment();
-
-        if (null != cubeSegment) {
-            PartitionDesc partDesc = model.getPartitionDesc();
-            long dateStart = cubeSegment.getDateRangeStart();
-            long dateEnd = cubeSegment.getDateRangeEnd();
-
-            if (!(dateStart == 0 && dateEnd == Long.MAX_VALUE)) {
-                whereBuilder.append(hasCondition ? " AND (" : " (");
-                whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, dateStart, dateEnd, tableAliasMap));
-                whereBuilder.append(")\n");
-                hasCondition = true;
-            }
-        }
-
-        if (hasCondition) {
-            sql.append(whereBuilder.toString());
-        }
-    }
-
-    private static String colName(String canonicalColName) {
-        return canonicalColName.replace(".", "_");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/Scheduler.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/Scheduler.java b/job/src/main/java/org/apache/kylin/job/Scheduler.java
deleted file mode 100644
index 2ed2fc2..0000000
--- a/job/src/main/java/org/apache/kylin/job/Scheduler.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.SchedulerException;
-import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.lock.JobLock;
-
-/**
- */
-public interface Scheduler<T extends Executable> {
-
-    void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException;
-
-    void shutdown() throws SchedulerException;
-
-    boolean stop(T executable) throws SchedulerException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java b/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
deleted file mode 100644
index 29b5324..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-/**
- */
-public abstract class BaseCommandOutput implements ICommandOutput {
-
-    @Override
-    public void log(String message) {
-        this.appendOutput(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java b/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
deleted file mode 100644
index 6cab6a3..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.common.util.Logger;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-
-/**
- * @author xjiang
- * 
- */
-public interface ICommandOutput extends Logger {
-
-    public void setStatus(JobStepStatusEnum status);
-
-    public JobStepStatusEnum getStatus();
-
-    public void appendOutput(String message);
-
-    public String getOutput();
-
-    public void setExitCode(int exitCode);
-
-    public int getExitCode();
-
-    public void reset();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java b/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
deleted file mode 100644
index 5a47173..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.job.exception.JobException;
-
-/**
- * @author xjiang
- * 
- */
-public interface IJobCommand {
-
-    public ICommandOutput execute() throws JobException;
-
-    public void cancel() throws JobException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java b/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
deleted file mode 100644
index 6a718fc..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.exception.JobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.*;
-
-/**
- * @author xjiang
- * 
- */
-public class ShellCmd implements IJobCommand {
-
-    private static Logger log = LoggerFactory.getLogger(ShellCmd.class);
-
-    private final String executeCommand;
-    private final ICommandOutput output;
-    private final boolean isAsync;
-    private final CliCommandExecutor cliCommandExecutor;
-
-    private FutureTask<Integer> future;
-
-    private ShellCmd(String executeCmd, ICommandOutput out, String host, int port, String user, String password, boolean async) {
-        this.executeCommand = executeCmd;
-        this.output = out;
-        this.cliCommandExecutor = new CliCommandExecutor();
-        this.cliCommandExecutor.setRunAtRemote(host, port, user, password);
-        this.isAsync = async;
-    }
-
-    public ShellCmd(String executeCmd, String host, int port, String user, String password, boolean async) {
-        this(executeCmd, new ShellCmdOutput(), host, port, user, password, async);
-    }
-
-    @Override
-    public ICommandOutput execute() throws JobException {
-
-        final ExecutorService executor = Executors.newSingleThreadExecutor();
-        future = new FutureTask<Integer>(new Callable<Integer>() {
-            public Integer call() throws JobException, IOException {
-                executor.shutdown();
-                return executeCommand(executeCommand);
-            }
-        });
-        executor.execute(future);
-
-        int exitCode = -1;
-        if (!isAsync) {
-            try {
-                exitCode = future.get();
-                log.info("finish executing");
-            } catch (CancellationException e) {
-                log.debug("Command is cancelled");
-                exitCode = -2;
-            } catch (Exception e) {
-                throw new JobException("Error when execute job " + executeCommand, e);
-            } finally {
-                if (exitCode == 0) {
-                    output.setStatus(JobStepStatusEnum.FINISHED);
-                } else if (exitCode == -2) {
-                    output.setStatus(JobStepStatusEnum.DISCARDED);
-                } else {
-                    output.setStatus(JobStepStatusEnum.ERROR);
-                }
-                output.setExitCode(exitCode);
-            }
-        }
-        return output;
-    }
-
-    protected int executeCommand(String command) throws JobException, IOException {
-        output.reset();
-        output.setStatus(JobStepStatusEnum.RUNNING);
-        return cliCommandExecutor.execute(command, output).getFirst();
-    }
-
-    @Override
-    public void cancel() throws JobException {
-        future.cancel(true);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java b/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
deleted file mode 100644
index ebcad47..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-
-/**
- * @author xjiang
- * 
- */
-public class ShellCmdOutput extends BaseCommandOutput implements ICommandOutput {
-
-    protected static final Logger log = LoggerFactory.getLogger(ShellCmdOutput.class);
-
-    protected StringBuilder output;
-    protected int exitCode;
-    protected JobStepStatusEnum status;
-
-    public ShellCmdOutput() {
-        init();
-    }
-
-    private void init() {
-        output = new StringBuilder();
-        exitCode = -1;
-        status = JobStepStatusEnum.NEW;
-    }
-
-    @Override
-    public JobStepStatusEnum getStatus() {
-        return status;
-    }
-
-    @Override
-    public void setStatus(JobStepStatusEnum s) {
-        this.status = s;
-    }
-
-    @Override
-    public String getOutput() {
-        return output.toString();
-    }
-
-    @Override
-    public void appendOutput(String message) {
-        output.append(message).append(System.getProperty("line.separator"));
-        log.debug(message);
-    }
-
-    @Override
-    public int getExitCode() {
-        return exitCode;
-    }
-
-    @Override
-    public void setExitCode(int code) {
-        exitCode = code;
-    }
-
-    @Override
-    public void reset() {
-        init();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java b/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java
deleted file mode 100644
index 288fd31..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskCounter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @author xduo
- * 
- */
-public class HadoopCmdOutput {
-
-    protected static final Logger log = LoggerFactory.getLogger(HadoopCmdOutput.class);
-
-    private final StringBuilder output;
-    private final Job job;
-
-    public HadoopCmdOutput(Job job, StringBuilder output) {
-        super();
-        this.job = job;
-        this.output = output;
-    }
-
-    public String getMrJobId() {
-        return getInfo().get(ExecutableConstants.MR_JOB_ID);
-    }
-
-    public Map<String, String> getInfo() {
-        if (job != null) {
-            Map<String, String> status = new HashMap<String, String>();
-            if (null != job.getJobID()) {
-                status.put(ExecutableConstants.MR_JOB_ID, job.getJobID().toString());
-            }
-            if (null != job.getTrackingURL()) {
-                status.put(ExecutableConstants.YARN_APP_URL, job.getTrackingURL().toString());
-            }
-            return status;
-        } else {
-            return Collections.emptyMap();
-        }
-    }
-
-    private String mapInputRecords;
-    private String hdfsBytesWritten;
-    private String hdfsBytesRead;
-
-    public String getMapInputRecords() {
-        return mapInputRecords;
-    }
-
-    public String getHdfsBytesWritten() {
-        return hdfsBytesWritten;
-    }
-
-    public String getHdfsBytesRead() {
-        return hdfsBytesRead;
-    }
-    
-    public void updateJobCounter() {
-        try {
-            Counters counters = job.getCounters();
-            if (counters == null) {
-                String errorMsg = "no counters for job " + getMrJobId();
-                log.warn(errorMsg);
-                output.append(errorMsg);
-                return;
-            }
-            this.output.append(counters.toString()).append("\n");
-            log.debug(counters.toString());
-
-            mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue());
-            hdfsBytesWritten = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue());
-            hdfsBytesRead = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_READ").getValue());
-        } catch (Exception e) {
-            log.error(e.getLocalizedMessage(), e);
-            output.append(e.getLocalizedMessage());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java b/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java
deleted file mode 100644
index 2da5b2a..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.reflect.Constructor;
-
-import org.apache.hadoop.util.ToolRunner;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.execution.AbstractExecutable;
-
-/**
- */
-public class HadoopShellExecutable extends AbstractExecutable {
-
-    private static final String KEY_MR_JOB = "HADOOP_SHELL_JOB_CLASS";
-    private static final String KEY_PARAMS = "HADOOP_SHELL_JOB_PARAMS";
-
-    public HadoopShellExecutable() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final String mapReduceJobClass = getJobClass();
-        String params = getJobParams();
-        Preconditions.checkNotNull(mapReduceJobClass);
-        Preconditions.checkNotNull(params);
-        try {
-            final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
-            final AbstractHadoopJob job = constructor.newInstance();
-            String[] args = params.trim().split("\\s+");
-            logger.info("parameters of the HadoopShellExecutable:");
-            logger.info(params);
-            int result;
-            StringBuilder log = new StringBuilder();
-            try {
-                result = ToolRunner.run(job, args);
-            } catch (Exception ex) {
-                logger.error("error execute " + this.toString(), ex);
-                StringWriter stringWriter = new StringWriter();
-                ex.printStackTrace(new PrintWriter(stringWriter));
-                log.append(stringWriter.toString()).append("\n");
-                result = 2;
-            }
-            log.append("result code:").append(result);
-            return result == 0 ? new ExecuteResult(ExecuteResult.State.SUCCEED, log.toString()):new ExecuteResult(ExecuteResult.State.FAILED, log.toString());
-        } catch (ReflectiveOperationException e) {
-            logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        } catch (Exception e) {
-            logger.error("error execute " + this.toString(), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-    public void setJobClass(Class<? extends AbstractHadoopJob> clazzName) {
-        setParam(KEY_MR_JOB, clazzName.getName());
-    }
-
-    public String getJobClass() throws ExecuteException {
-        return getParam(KEY_MR_JOB);
-    }
-
-    public void setJobParams(String param) {
-        setParam(KEY_PARAMS, param);
-    }
-
-    public String getJobParams() {
-        return getParam(KEY_PARAMS);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java b/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java
deleted file mode 100644
index ffe45ed..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.source.hive.HiveClient;
-import org.datanucleus.store.types.backed.HashMap;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.collect.Lists;
-
-import org.apache.kylin.common.util.JsonUtil;
-
-/**
- */
-public class HqlExecutable extends AbstractExecutable {
-
-    private static final String HQL = "hql";
-    private static final String HIVE_CONFIG = "hive-config";
-
-    public HqlExecutable() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        try {
-            Map<String, String> configMap = getConfiguration();
-            HiveClient hiveClient = new HiveClient(configMap);
-            
-            for (String hql: getHqls()) {
-                hiveClient.executeHQL(hql);
-            }
-            return new ExecuteResult(ExecuteResult.State.SUCCEED);
-        } catch (Exception e) {
-            logger.error("error run hive query:" + getHqls(), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-    
-    public void setConfiguration(Map<String, String> configMap) {
-        if(configMap != null) {
-            String configStr = "";
-            try {
-                configStr = JsonUtil.writeValueAsString(configMap);
-            } catch (JsonProcessingException e) {
-                e.printStackTrace();
-            }
-            setParam(HIVE_CONFIG, configStr);
-        }
-    }
-
-
-    @SuppressWarnings("unchecked")
-    private Map<String, String> getConfiguration() {
-        String configStr = getParam(HIVE_CONFIG);
-        Map<String, String> result = null;
-        if(configStr != null) {
-            try {
-                result = JsonUtil.readValue(configStr, HashMap.class);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-        
-        return result;
-    }
-    
-    public void setHqls(List<String> hqls) {
-        setParam(HQL, StringUtils.join(hqls, ";"));
-    }
-
-    private List<String> getHqls() {
-        final String hqls = getParam(HQL);
-        if (hqls != null) {
-            return Lists.newArrayList(StringUtils.split(hqls, ";"));
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
deleted file mode 100644
index f8eab6c..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.reflect.Constructor;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Cluster;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.tools.HadoopStatusChecker;
-
-import com.google.common.base.Preconditions;
-
-/**
- */
-public class MapReduceExecutable extends AbstractExecutable {
-
-    private static final String KEY_MR_JOB = "MR_JOB_CLASS";
-    private static final String KEY_PARAMS = "MR_JOB_PARAMS";
-    private static final String KEY_COUNTER_SAVEAS = "MR_COUNTER_SAVEAS";
-    
-    public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
-
-    public MapReduceExecutable() {
-        super();
-    }
-
-    @Override
-    protected void onExecuteStart(ExecutableContext executableContext) {
-        final Output output = executableManager.getOutput(getId());
-        if (output.getExtra().containsKey(START_TIME)) {
-            final String mrJobId = output.getExtra().get(ExecutableConstants.MR_JOB_ID);
-            if (mrJobId == null) {
-                executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
-                return;
-            }
-            try {
-                Job job = new Cluster(new Configuration()).getJob(JobID.forName(mrJobId));
-                if (job.getJobState() == JobStatus.State.FAILED) {
-                    //remove previous mr job info
-                    super.onExecuteStart(executableContext);
-                } else {
-                    executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
-                }
-            } catch (IOException e) {
-                logger.warn("error get hadoop status");
-                super.onExecuteStart(executableContext);
-            } catch (InterruptedException e) {
-                logger.warn("error get hadoop status");
-                super.onExecuteStart(executableContext);
-            }
-        } else {
-            super.onExecuteStart(executableContext);
-        }
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final String mapReduceJobClass = getMapReduceJobClass();
-        String params = getMapReduceParams();
-        Preconditions.checkNotNull(mapReduceJobClass);
-        Preconditions.checkNotNull(params);
-        try {
-            Job job;
-            final Map<String, String> extra = executableManager.getOutput(getId()).getExtra();
-            if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) {
-                job = new Cluster(new Configuration()).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
-                logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID + " resumed"));
-            } else {
-                final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
-                final AbstractHadoopJob hadoopJob = constructor.newInstance();
-                hadoopJob.setAsync(true); // so the ToolRunner.run() returns right away
-                logger.info("parameters of the MapReduceExecutable:");
-                logger.info(params);
-                String[] args = params.trim().split("\\s+");
-                try {
-                    //for async mr job, ToolRunner just return 0;
-                    ToolRunner.run(hadoopJob, args);
-                } catch (Exception ex) {
-                    StringBuilder log = new StringBuilder();
-                    logger.error("error execute " + this.toString(), ex);
-                    StringWriter stringWriter = new StringWriter();
-                    ex.printStackTrace(new PrintWriter(stringWriter));
-                    log.append(stringWriter.toString()).append("\n");
-                    log.append("result code:").append(2);
-                    return new ExecuteResult(ExecuteResult.State.ERROR, log.toString());
-                }
-                job = hadoopJob.getJob();
-            }
-            final StringBuilder output = new StringBuilder();
-            final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, output);
-
-            final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig());
-            if (restStatusCheckUrl == null) {
-                logger.error("restStatusCheckUrl is null");
-                return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null");
-            }
-            String mrJobId = hadoopCmdOutput.getMrJobId();
-            HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output);
-            JobStepStatusEnum status = JobStepStatusEnum.NEW;
-            while (!isDiscarded()) {
-                JobStepStatusEnum newStatus = statusChecker.checkStatus();
-                if (status == JobStepStatusEnum.KILLED) {
-                    executableManager.updateJobOutput(getId(), ExecutableState.ERROR, Collections.<String, String>emptyMap(), "killed by admin");
-                    return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin");
-                }
-                if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) {
-                    final long waitTime = System.currentTimeMillis() - getStartTime();
-                    setMapReduceWaitTime(waitTime);
-                }
-                status = newStatus;
-                executableManager.addJobInfo(getId(), hadoopCmdOutput.getInfo());
-                if (status.isComplete()) {
-                    final Map<String, String> info = hadoopCmdOutput.getInfo();
-                    readCounters(hadoopCmdOutput, info);
-                    executableManager.addJobInfo(getId(), info);
-
-                    if (status == JobStepStatusEnum.FINISHED) {
-                        return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
-                    } else {
-                        return new ExecuteResult(ExecuteResult.State.FAILED, output.toString());
-                    }
-                }
-                Thread.sleep(context.getConfig().getYarnStatusCheckIntervalSeconds() * 1000);
-            }
-            //TODO kill discarded mr job using "hadoop job -kill " + mrJobId
-
-            return new ExecuteResult(ExecuteResult.State.DISCARDED, output.toString());
-
-        } catch (ReflectiveOperationException e) {
-            logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        } catch (Exception e) {
-            logger.error("error execute " + this.toString(), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-    private void readCounters(final HadoopCmdOutput hadoopCmdOutput, final Map<String, String> info) {
-        hadoopCmdOutput.updateJobCounter();
-        info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords());
-        info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead());
-        info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten());
-        
-        String saveAs = getParam(KEY_COUNTER_SAVEAS);
-        if (saveAs != null) {
-            String[] saveAsNames = saveAs.split(",");
-            saveCounterAs(hadoopCmdOutput.getMapInputRecords(), saveAsNames, 0, info);
-            saveCounterAs(hadoopCmdOutput.getHdfsBytesRead(), saveAsNames, 1, info);
-            saveCounterAs(hadoopCmdOutput.getHdfsBytesWritten(), saveAsNames, 2, info);
-        }
-    }
-
-    private void saveCounterAs(String counter, String[] saveAsNames, int i, Map<String, String> info) {
-        if (saveAsNames.length > i && StringUtils.isBlank(saveAsNames[i]) == false) {
-            info.put(saveAsNames[i].trim(), counter);
-        }
-    }
-
-    private String getRestStatusCheckUrl(Job job, KylinConfig config) {
-        final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl();
-        if (yarnStatusCheckUrl != null) {
-            return yarnStatusCheckUrl;
-        } else {
-            logger.info(KylinConfig.KYLIN_JOB_YARN_APP_REST_CHECK_URL + " is not set, read from job configuration");
-        }
-        String rmWebHost = job.getConfiguration().get("yarn.resourcemanager.webapp.address");
-        if (StringUtils.isEmpty(rmWebHost)) {
-            return null;
-        }
-        if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) {
-            //do nothing
-        } else {
-            rmWebHost = "http://" + rmWebHost;
-        }
-        logger.info("yarn.resourcemanager.webapp.address:" + rmWebHost);
-        return rmWebHost + "/ws/v1/cluster/apps/${job_id}?anonymous=true";
-    }
-
-    public long getMapReduceWaitTime() {
-        return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
-    }
-
-    public void setMapReduceWaitTime(long t) {
-        addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
-    }
-
-    public void setMapReduceJobClass(Class<? extends AbstractHadoopJob> clazzName) {
-        setParam(KEY_MR_JOB, clazzName.getName());
-    }
-
-    public String getMapReduceJobClass() throws ExecuteException {
-        return getParam(KEY_MR_JOB);
-    }
-
-    public void setMapReduceParams(String param) {
-        setParam(KEY_PARAMS, param);
-    }
-
-    public String getMapReduceParams() {
-        return getParam(KEY_PARAMS);
-    }
-    
-    public String getCounterSaveAs() {
-        return getParam(KEY_COUNTER_SAVEAS);
-    }
-    
-    public void setCounterSaveAs(String value) {
-        setParam(KEY_COUNTER_SAVEAS, value);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
deleted file mode 100644
index 786698e..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.kylin.common.util.Pair;
-
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.util.Logger;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-/**
- */
-public class ShellExecutable extends AbstractExecutable {
-
-    private static final String CMD = "cmd";
-
-    public ShellExecutable() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        try {
-            logger.info("executing:" + getCmd());
-            final ShellExecutableLogger logger = new ShellExecutableLogger();
-            final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), logger);
-            executableManager.addJobInfo(getId(), logger.getInfo());
-            return new ExecuteResult(result.getFirst() == 0? ExecuteResult.State.SUCCEED: ExecuteResult.State.FAILED, result.getSecond());
-        } catch (IOException e) {
-            logger.error("job:" + getId() + " execute finished with exception", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-    public void setCmd(String cmd) {
-        setParam(CMD, cmd);
-    }
-
-    public String getCmd() {
-        return getParam(CMD);
-    }
-
-    private static class ShellExecutableLogger implements Logger {
-
-        private final Map<String, String> info = Maps.newHashMap();
-
-        private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
-        private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
-        private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)");
-        private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)");
-        private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)");
-        private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write");
-
-        // hive
-        private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)");
-        private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS");
-
-        @Override
-        public void log(String message) {
-            Matcher matcher = PATTERN_APP_ID.matcher(message);
-            if (matcher.find()) {
-                String appId = matcher.group(1);
-                info.put(ExecutableConstants.YARN_APP_ID, appId);
-            }
-
-            matcher = PATTERN_APP_URL.matcher(message);
-            if (matcher.find()) {
-                String appTrackingUrl = matcher.group(1);
-                info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl);
-            }
-
-            matcher = PATTERN_JOB_ID.matcher(message);
-            if (matcher.find()) {
-                String mrJobID = matcher.group(1);
-                info.put(ExecutableConstants.MR_JOB_ID, mrJobID);
-            }
-
-            matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message);
-            if (matcher.find()) {
-                String hdfsWritten = matcher.group(1);
-                info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
-            }
-
-            matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message);
-            if (matcher.find()) {
-                String sourceCount = matcher.group(1);
-                info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount);
-            }
-
-            matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message);
-            if (matcher.find()) {
-                String sourceSize = matcher.group(1);
-                info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize);
-            }
-
-            // hive
-            matcher = PATTERN_HIVE_APP_ID_URL.matcher(message);
-            if (matcher.find()) {
-                String jobId = matcher.group(1);
-                String trackingUrl = matcher.group(2);
-                info.put(ExecutableConstants.MR_JOB_ID, jobId);
-                info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
-            }
-
-            matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message);
-            if (matcher.find()) {
-                // String hdfsRead = matcher.group(1);
-                String hdfsWritten = matcher.group(2);
-                info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
-            }
-        }
-
-        Map<String, String> getInfo() {
-            return info;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
deleted file mode 100644
index 3a64d02..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-public interface BatchConstants {
-
-    public static final char INTERMEDIATE_TABLE_ROW_DELIMITER = 127;
-
-    public static final String CFG_CUBE_NAME = "cube.name";
-    public static final String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
-
-    public static final String CFG_II_NAME = "ii.name";
-    public static final String CFG_II_SEGMENT_NAME = "ii.segment.name";
-
-    public static final String INPUT_DELIM = "input.delim";
-    public static final String OUTPUT_PATH = "output.path";
-
-    public static final String TABLE_NAME = "table.name";
-    public static final String TABLE_COLUMNS = "table.columns";
-
-    public static final String CFG_IS_MERGE = "is.merge";
-    public static final String CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER = "cube.intermediate.table.row.delimiter";
-
-    public static final String MAPREDUCE_COUTNER_GROUP_NAME = "Cube Builder";
-
-    public static final String MAPPER_SAMPLE_NUMBER = "mapper.sample.number";
-    public static final String REGION_NUMBER = "region.number";
-    public static final String CUBE_CAPACITY = "cube.capacity";
-
-    public static final String CFG_STATISTICS_ENABLED = "statistics.enabled";
-    public static final String CFG_STATISTICS_OUTPUT = "statistics.ouput";
-    public static final String CFG_STATISTICS_SAMPLING_PERCENT = "statistics.sampling.percent";
-    public static final String CFG_STATISTICS_CUBE_ESTIMATION = "cube_statistics.txt";
-    public static final String CFG_STATISTICS_CUBOID_ESTIMATION = "cuboid_statistics.seq";
-
-    public static final int COUNTER_MAX = 100000;
-    public static final int ERROR_RECORD_THRESHOLD = 100;
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
deleted file mode 100644
index fdcfdbe..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-/**
- */
-public final class ExecutableConstants {
-
-    private ExecutableConstants(){}
-
-    public static final String YARN_APP_ID = "yarn_application_id";
-
-    public static final String YARN_APP_URL = "yarn_application_tracking_url";
-    public static final String MR_JOB_ID = "mr_job_id";
-    public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
-    public static final String SOURCE_RECORDS_COUNT = "source_records_count";
-    public static final String SOURCE_RECORDS_SIZE = "source_records_size";
-    public static final String GLOBAL_LISTENER_NAME = "ChainListener";
-
-
-
-
-    public static final int DEFAULT_SCHEDULER_INTERVAL_SECONDS = 60;
-
-    public static final String CUBE_JOB_GROUP_NAME = "cube_job_group";
-
-    public static final String DAEMON_JOB_GROUP_NAME = "daemon_job_group";
-    public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
-
-    public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
-    public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
-    public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data";
-    public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube";
-    public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid Data";
-    public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits";
-    public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable";
-    public static final String STEP_NAME_CONVERT_CUBOID_TO_HFILE = "Convert Cuboid Data to HFile";
-    public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table";
-    public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
-    public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics";
-    public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics";
-    public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
-    public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
-    public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
-    
-    public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
-    public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";
-
-    public static final String PROP_ENGINE_CONTEXT = "jobengineConfig";
-    public static final String PROP_JOB_FLOW = "jobFlow";
-    public static final String PROP_JOBINSTANCE_UUID = "jobInstanceUuid";
-    public static final String PROP_JOBSTEP_SEQ_ID = "jobStepSequenceID";
-    public static final String PROP_COMMAND = "command";
-    // public static final String PROP_STORAGE_LOCATION =
-    // "storageLocationIdentifier";
-    public static final String PROP_JOB_ASYNC = "jobAsync";
-    public static final String PROP_JOB_CMD_EXECUTOR = "jobCmdExecutor";
-    public static final String PROP_JOB_CMD_OUTPUT = "jobCmdOutput";
-    public static final String PROP_JOB_KILLED = "jobKilled";
-    public static final String PROP_JOB_RUNTIME_FLOWS = "jobFlows";
-
-    public static final String NOTIFY_EMAIL_TEMPLATE = "<div><b>Build Result of Job ${job_name}</b><pre><ul>" + "<li>Build Result: <b>${result}</b></li>" + "<li>Job Engine: ${job_engine}</li>" + "<li>Cube Name: ${cube_name}</li>" + "<li>Start Time: ${start_time}</li>" + "<li>Duration: ${duration}</li>" + "<li>MR Waiting: ${mr_waiting}</li>" + "<li>Last Update Time: ${last_update_time}</li>" + "<li>Submitter: ${submitter}</li>" + "<li>Error Log: ${error_log}</li>" + "</ul></pre><div/>";
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
deleted file mode 100644
index a4ef564..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-public enum JobStatusEnum {
-
-    NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16);
-
-    private final int code;
-
-    private JobStatusEnum(int statusCode) {
-        this.code = statusCode;
-    }
-
-    public static JobStatusEnum getByCode(int statusCode) {
-        for (JobStatusEnum status : values()) {
-            if (status.getCode() == statusCode) {
-                return status;
-            }
-        }
-
-        return null;
-    }
-
-    public int getCode() {
-        return this.code;
-    }
-
-    public boolean isComplete() {
-        return code == JobStatusEnum.FINISHED.getCode() || code == JobStatusEnum.ERROR.getCode() || code == JobStatusEnum.DISCARDED.getCode();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
deleted file mode 100644
index 02b40a3..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-/**
- * @author xduo, ysong1
- * 
- */
-public enum JobStepCmdTypeEnum {
-    SHELL_CMD, SHELL_CMD_HADOOP, JAVA_CMD_HADOOP_FACTDISTINCT, JAVA_CMD_HADOOP_BASECUBOID, JAVA_CMD_HADOOP_NDCUBOID, JAVA_CMD_HADOOP_RANGEKEYDISTRIBUTION, JAVA_CMD_HADOOP_CONVERTHFILE, JAVA_CMD_HADOOP_MERGECUBOID, JAVA_CMD_HADOOP_NO_MR_DICTIONARY, JAVA_CMD_HADDOP_NO_MR_CREATEHTABLE, JAVA_CMD_HADOOP_NO_MR_BULKLOAD
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
deleted file mode 100644
index 08ee79a..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-public enum JobStepStatusEnum {
-    NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16), WAITING(32), KILLED(64);
-
-    private final int code;
-
-    private JobStepStatusEnum(int statusCode) {
-        this.code = statusCode;
-    }
-
-    public static JobStepStatusEnum getByCode(int statusCode) {
-        for (JobStepStatusEnum status : values()) {
-            if (status.getCode() == statusCode) {
-                return status;
-            }
-        }
-
-        return null;
-    }
-
-    public int getCode() {
-        return this.code;
-    }
-
-    public boolean isComplete() {
-        return code == JobStepStatusEnum.FINISHED.getCode() || code == JobStepStatusEnum.ERROR.getCode() || code == JobStepStatusEnum.DISCARDED.getCode();
-    }
-
-    public boolean isRunable() {
-        return code == JobStepStatusEnum.PENDING.getCode() || code == JobStepStatusEnum.ERROR.getCode();
-    }
-}