You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/27 11:21:34 UTC

[32/52] [abbrv] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
deleted file mode 100644
index 4862bb1..0000000
--- a/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.dao;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.RawResource;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.job.exception.PersistentException;
-import org.apache.kylin.metadata.MetadataManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- */
-public class ExecutableDao {
-
-    private static final Serializer<ExecutablePO> JOB_SERIALIZER = new JsonSerializer<ExecutablePO>(ExecutablePO.class);
-    private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class);
-    private static final Logger logger = LoggerFactory.getLogger(ExecutableDao.class);
-    private static final ConcurrentHashMap<KylinConfig, ExecutableDao> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableDao>();
-    public static final String JOB_PATH_ROOT = "/execute";
-    public static final String JOB_OUTPUT_ROOT = "/execute_output";
-
-    private ResourceStore store;
-
-    public static ExecutableDao getInstance(KylinConfig config) {
-        ExecutableDao r = CACHE.get(config);
-        if (r == null) {
-            r = new ExecutableDao(config);
-            CACHE.put(config, r);
-            if (CACHE.size() > 1) {
-                logger.warn("More than one singleton exist");
-            }
-
-        }
-        return r;
-    }
-
-    private ExecutableDao(KylinConfig config) {
-        logger.info("Using metadata url: " + config);
-        this.store = MetadataManager.getInstance(config).getStore();
-    }
-
-    private String pathOfJob(ExecutablePO job) {
-        return pathOfJob(job.getUuid());
-    }
-    private String pathOfJob(String uuid) {
-        return JOB_PATH_ROOT + "/" + uuid;
-    }
-
-    private String pathOfJobOutput(String uuid) {
-        return JOB_OUTPUT_ROOT + "/" + uuid;
-    }
-
-    private ExecutablePO readJobResource(String path) throws IOException {
-        return store.getResource(path, ExecutablePO.class, JOB_SERIALIZER);
-    }
-
-    private void writeJobResource(String path, ExecutablePO job) throws IOException {
-        store.putResource(path, job, JOB_SERIALIZER);
-    }
-
-    private ExecutableOutputPO readJobOutputResource(String path) throws IOException {
-        return store.getResource(path, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER);
-    }
-
-    private long writeJobOutputResource(String path, ExecutableOutputPO output) throws IOException {
-        return store.putResource(path, output, JOB_OUTPUT_SERIALIZER);
-    }
-
-    public List<ExecutableOutputPO> getJobOutputs() throws PersistentException {
-        try {
-            ArrayList<String> resources = store.listResources(JOB_OUTPUT_ROOT);
-            if (resources == null || resources.isEmpty()) {
-                return Collections.emptyList();
-            }
-            Collections.sort(resources);
-            String rangeStart = resources.get(0);
-            String rangeEnd = resources.get(resources.size() - 1);
-            return store.getAllResources(rangeStart, rangeEnd, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER);
-        } catch (IOException e) {
-            logger.error("error get all Jobs:", e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public List<ExecutablePO> getJobs() throws PersistentException {
-        try {
-            final List<String> jobIds = store.listResources(JOB_PATH_ROOT);
-            if (jobIds == null || jobIds.isEmpty()) {
-                return Collections.emptyList();
-            }
-            Collections.sort(jobIds);
-            String rangeStart = jobIds.get(0);
-            String rangeEnd = jobIds.get(jobIds.size() - 1);
-            return store.getAllResources(rangeStart, rangeEnd, ExecutablePO.class, JOB_SERIALIZER);
-        } catch (IOException e) {
-            logger.error("error get all Jobs:", e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public List<String> getJobIds() throws PersistentException {
-        try {
-            ArrayList<String> resources = store.listResources(JOB_PATH_ROOT);
-            if (resources == null) {
-                return Collections.emptyList();
-            }
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(resources.size());
-            for (String path : resources) {
-                result.add(path.substring(path.lastIndexOf("/") + 1));
-            }
-            return result;
-        } catch (IOException e) {
-            logger.error("error get all Jobs:", e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public ExecutablePO getJob(String uuid) throws PersistentException {
-        try {
-            return readJobResource(pathOfJob(uuid));
-        } catch (IOException e) {
-            logger.error("error get job:" + uuid, e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public ExecutablePO addJob(ExecutablePO job) throws PersistentException {
-        try {
-            if (getJob(job.getUuid()) != null) {
-                throw new IllegalArgumentException("job id:" + job.getUuid() + " already exists");
-            }
-            writeJobResource(pathOfJob(job), job);
-            return job;
-        } catch (IOException e) {
-            logger.error("error save job:" + job.getUuid(), e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public void deleteJob(String uuid) throws PersistentException {
-        try {
-            store.deleteResource(pathOfJob(uuid));
-        } catch (IOException e) {
-            logger.error("error delete job:" + uuid, e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public ExecutableOutputPO getJobOutput(String uuid) throws PersistentException {
-        try {
-            ExecutableOutputPO result = readJobOutputResource(pathOfJobOutput(uuid));
-            if (result == null) {
-                result = new ExecutableOutputPO();
-                result.setUuid(uuid);
-                return result;
-            }
-            return result;
-        } catch (IOException e) {
-            logger.error("error get job output id:" + uuid, e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public void addJobOutput(ExecutableOutputPO output) throws PersistentException {
-        try {
-            output.setLastModified(0);
-            writeJobOutputResource(pathOfJobOutput(output.getUuid()), output);
-        } catch (IOException e) {
-            logger.error("error update job output id:" + output.getUuid(), e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public void updateJobOutput(ExecutableOutputPO output) throws PersistentException {
-        try {
-            final long ts = writeJobOutputResource(pathOfJobOutput(output.getUuid()), output);
-            output.setLastModified(ts);
-        } catch (IOException e) {
-            logger.error("error update job output id:" + output.getUuid(), e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public void deleteJobOutput(String uuid) throws PersistentException {
-        try {
-            store.deleteResource(pathOfJobOutput(uuid));
-        } catch (IOException e) {
-            logger.error("error delete job:" + uuid, e);
-            throw new PersistentException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java b/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
deleted file mode 100644
index 4dacd8a..0000000
--- a/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.dao;
-
-import java.util.Map;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-
-/**
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class ExecutableOutputPO extends RootPersistentEntity {
-
-    @JsonProperty("content")
-    private String content;
-
-    @JsonProperty("status")
-    private String status = "READY";
-
-    @JsonProperty("info")
-    private Map<String, String> info = Maps.newHashMap();
-
-    public String getContent() {
-        return content;
-    }
-
-    public void setContent(String content) {
-        this.content = content;
-    }
-
-    public String getStatus() {
-        return status;
-    }
-
-    public void setStatus(String status) {
-        this.status = status;
-    }
-
-    public Map<String, String> getInfo() {
-        return info;
-    }
-
-    public void setInfo(Map<String, String> info) {
-        this.info = info;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java b/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
deleted file mode 100644
index 6a17b29..0000000
--- a/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.dao;
-
-import java.util.List;
-import java.util.Map;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-
-/**
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class ExecutablePO extends RootPersistentEntity {
-
-    @JsonProperty("name")
-    private String name;
-
-    @JsonProperty("tasks")
-    private List<ExecutablePO> tasks;
-
-    @JsonProperty("type")
-    private String type;
-
-    @JsonProperty("params")
-    private Map<String, String> params = Maps.newHashMap();
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public List<ExecutablePO> getTasks() {
-        return tasks;
-    }
-
-    public void setTasks(List<ExecutablePO> tasks) {
-        this.tasks = tasks;
-    }
-
-    public String getType() {
-        return type;
-    }
-
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    public Map<String, String> getParams() {
-        return params;
-    }
-
-    public void setParams(Map<String, String> params) {
-        this.params = params;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java b/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
index 1aeb50f..b03cb5f 100644
--- a/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
@@ -27,8 +27,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-
-import org.apache.kylin.job.tools.LZOSupportnessChecker;
+import org.apache.kylin.storage.hbase.util.LZOSupportnessChecker;
 
 /**
  * <p/>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
deleted file mode 100644
index 2eb9b31..0000000
--- a/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.engine;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.tools.OptionsHelper;
-import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-
-/**
- * @author ysong1
- */
-public class JobEngineConfig {
-    private static final Logger logger = LoggerFactory.getLogger(JobEngineConfig.class);
-    public static String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf";
-    public static String HIVE_CONF_FILENAME = "kylin_hive_conf";
-
-    private static File getJobConfig(String fileName) {
-        String path = System.getProperty(KylinConfig.KYLIN_CONF);
-        if (StringUtils.isNotEmpty(path)) {
-            return new File(path, fileName);
-        }
-
-        path = KylinConfig.getKylinHome();
-        if (StringUtils.isNotEmpty(path)) {
-            return new File(path + File.separator + "conf", fileName);
-        }
-        return null;
-    }
-
-    private String getHadoopJobConfFilePath(RealizationCapacity capaticy, boolean appendSuffix) throws IOException {
-        String hadoopJobConfFile;
-        if (appendSuffix) {
-            hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + "_" + capaticy.toString().toLowerCase() + ".xml");
-        } else {
-            hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + ".xml");
-        }
-
-        File jobConfig = getJobConfig(hadoopJobConfFile);
-        if (jobConfig == null || !jobConfig.exists()) {
-            logger.warn("fail to locate " + hadoopJobConfFile + ", trying to locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
-            jobConfig = getJobConfig(HADOOP_JOB_CONF_FILENAME + ".xml");
-            if (jobConfig == null || !jobConfig.exists()) {
-                logger.error("fail to locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
-                throw new RuntimeException("fail to locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
-            }
-        }
-        return OptionsHelper.convertToFileURL(jobConfig.getAbsolutePath());
-    }
-
-    public String getHadoopJobConfFilePath(RealizationCapacity capaticy) throws IOException {
-        String path = getHadoopJobConfFilePath(capaticy, true);
-        if (!StringUtils.isEmpty(path)) {
-            logger.info("Chosen job conf is : " + path);
-            return path;
-        } else {
-            path = getHadoopJobConfFilePath(capaticy, false);
-            if (!StringUtils.isEmpty(path)) {
-                logger.info("Chosen job conf is : " + path);
-                return path;
-            }
-        }
-        return "";
-    }
-
-
-    public String getHiveConfFilePath() throws IOException {
-        String hiveConfFile = (HIVE_CONF_FILENAME + ".xml");
-
-        File jobConfig = getJobConfig(hiveConfFile);
-        if (jobConfig == null || !jobConfig.exists()) {
-
-            logger.error("fail to locate " + HIVE_CONF_FILENAME + ".xml");
-            throw new RuntimeException("fail to locate " + HIVE_CONF_FILENAME + ".xml");
-        }
-        return OptionsHelper.convertToFileURL(jobConfig.getAbsolutePath());
-    }
-
-    // there should be no setters
-    private final KylinConfig config;
-
-    public JobEngineConfig(KylinConfig config) {
-        this.config = config;
-    }
-
-    public KylinConfig getConfig() {
-        return config;
-    }
-
-    public String getHdfsWorkingDirectory() {
-        return config.getHdfsWorkingDirectory();
-    }
-    
-    /**
-     * @return the maxConcurrentJobLimit
-     */
-    public int getMaxConcurrentJobLimit() {
-        return config.getMaxConcurrentJobLimit();
-    }
-
-    /**
-     * @return the timeZone
-     */
-    public String getTimeZone() {
-        return config.getTimeZone();
-    }
-
-    /**
-     * @return the adminDls
-     */
-    public String getAdminDls() {
-        return config.getAdminDls();
-    }
-
-    /**
-     * @return the jobStepTimeout
-     */
-    public long getJobStepTimeout() {
-        return config.getJobStepTimeout();
-    }
-
-    /**
-     * @return the asyncJobCheckInterval
-     */
-    public int getAsyncJobCheckInterval() {
-        return config.getYarnStatusCheckIntervalSeconds();
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see java.lang.Object#hashCode()
-     */
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((config == null) ? 0 : config.hashCode());
-        return result;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see java.lang.Object#equals(java.lang.Object)
-     */
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        JobEngineConfig other = (JobEngineConfig) obj;
-        if (config == null) {
-            if (other.config != null)
-                return false;
-        } else if (!config.equals(other.config))
-            return false;
-        return true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java b/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
deleted file mode 100644
index 8544fff..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- */
-public class ExecuteException extends Exception {
-
-    private static final long serialVersionUID = 5677121412192984281L;
-
-    public ExecuteException() {
-    }
-
-    public ExecuteException(String message) {
-        super(message);
-    }
-
-    public ExecuteException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public ExecuteException(Throwable cause) {
-        super(cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java b/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
deleted file mode 100644
index f19b0ca..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- */
-public class IllegalStateTranferException extends RuntimeException {
-
-    private static final long serialVersionUID = 8466551519300132702L;
-
-    public IllegalStateTranferException() {
-    }
-
-    public IllegalStateTranferException(String message) {
-        super(message);
-    }
-
-    public IllegalStateTranferException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public IllegalStateTranferException(Throwable cause) {
-        super(cause);
-    }
-
-    public IllegalStateTranferException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
-        super(message, cause, enableSuppression, writableStackTrace);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/exception/JobException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/JobException.java b/job/src/main/java/org/apache/kylin/job/exception/JobException.java
deleted file mode 100644
index ba4c52a..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/JobException.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- * @author xduo
- * 
- */
-public class JobException extends Exception {
-
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * 
-     */
-    public JobException() {
-        super();
-    }
-
-    /**
-     * @param message
-     * @param cause
-     */
-    public JobException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    /**
-     * @param message
-     */
-    public JobException(String message) {
-        super(message);
-    }
-
-    /**
-     * @param cause
-     */
-    public JobException(Throwable cause) {
-        super(cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/exception/LockException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/LockException.java b/job/src/main/java/org/apache/kylin/job/exception/LockException.java
deleted file mode 100644
index cf43ac9..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/LockException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- */
-public class LockException extends Exception {
-    private static final long serialVersionUID = 2072745879281754945L;
-
-    public LockException() {
-    }
-
-    public LockException(String message) {
-        super(message);
-    }
-
-    public LockException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public LockException(Throwable cause) {
-        super(cause);
-    }
-
-    public LockException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
-        super(message, cause, enableSuppression, writableStackTrace);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/exception/PersistentException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/PersistentException.java b/job/src/main/java/org/apache/kylin/job/exception/PersistentException.java
deleted file mode 100644
index 8507a53..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/PersistentException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- */
-public class PersistentException extends Exception {
-    private static final long serialVersionUID = -4239863858506718998L;
-
-    public PersistentException() {
-    }
-
-    public PersistentException(String message) {
-        super(message);
-    }
-
-    public PersistentException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public PersistentException(Throwable cause) {
-        super(cause);
-    }
-
-    public PersistentException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
-        super(message, cause, enableSuppression, writableStackTrace);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java b/job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java
deleted file mode 100644
index 057bd4a..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- */
-public class SchedulerException extends Exception {
-    private static final long serialVersionUID = 349041244824274861L;
-
-    public SchedulerException() {
-    }
-
-    public SchedulerException(String message) {
-        super(message);
-    }
-
-    public SchedulerException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public SchedulerException(Throwable cause) {
-        super(cause);
-    }
-
-    public SchedulerException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
-        super(message, cause, enableSuppression, writableStackTrace);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
deleted file mode 100644
index be82b3a..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LogTitlePrinter;
-import org.apache.kylin.common.util.MailService;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.impl.threadpool.DefaultContext;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-/**
- */
-public abstract class AbstractExecutable implements Executable, Idempotent {
-
-    protected static final String SUBMITTER = "submitter";
-    protected static final String NOTIFY_LIST = "notify_list";
-    protected static final String START_TIME = "startTime";
-    protected static final String END_TIME = "endTime";
-
-    protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class);
-
-    private String name;
-    private String id;
-    private Map<String, String> params = Maps.newHashMap();
-
-    protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-    public AbstractExecutable() {
-        setId(UUID.randomUUID().toString());
-    }
-
-    protected void onExecuteStart(ExecutableContext executableContext) {
-        Map<String, String> info = Maps.newHashMap();
-        info.put(START_TIME, Long.toString(System.currentTimeMillis()));
-        executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
-    }
-
-    protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
-        setEndTime(System.currentTimeMillis());
-        if (!isDiscarded()) {
-            if (result.succeed()) {
-                executableManager.updateJobOutput(getId(), ExecutableState.SUCCEED, null, result.output());
-            } else {
-                executableManager.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output());
-            }
-        } else {
-        }
-    }
-
-    protected void onExecuteError(Throwable exception, ExecutableContext executableContext) {
-        if (!isDiscarded()) {
-            executableManager.addJobInfo(getId(), END_TIME, Long.toString(System.currentTimeMillis()));
-            String output = null;
-            if (exception != null) {
-                final StringWriter out = new StringWriter();
-                exception.printStackTrace(new PrintWriter(out));
-                output = out.toString();
-            }
-            executableManager.updateJobOutput(getId(), ExecutableState.ERROR, null, output);
-        } else {
-        }
-    }
-
-    @Override
-    public final ExecuteResult execute(ExecutableContext executableContext) throws ExecuteException {
-
-        //print a eye-catching title in log
-        LogTitlePrinter.printTitle(this.getName());
-
-        Preconditions.checkArgument(executableContext instanceof DefaultContext);
-        ExecuteResult result;
-        try {
-            onExecuteStart(executableContext);
-            result = doWork(executableContext);
-        } catch (Throwable e) {
-            logger.error("error running Executable", e);
-            onExecuteError(e, executableContext);
-            throw new ExecuteException(e);
-        }
-        onExecuteFinished(result, executableContext);
-        return result;
-    }
-
-    protected abstract ExecuteResult doWork(ExecutableContext context) throws ExecuteException;
-
-    @Override
-    public void cleanup() throws ExecuteException {
-
-    }
-
-    @Override
-    public boolean isRunnable() {
-        return this.getStatus() == ExecutableState.READY;
-    }
-
-    @Override
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    @Override
-    public final String getId() {
-        return this.id;
-    }
-
-    public final void setId(String id) {
-        this.id = id;
-    }
-
-    @Override
-    public final ExecutableState getStatus() {
-        return executableManager.getOutput(this.getId()).getState();
-    }
-
-    @Override
-    public final Map<String, String> getParams() {
-        return Collections.unmodifiableMap(this.params);
-    }
-
-    public final String getParam(String key) {
-        return this.params.get(key);
-    }
-
-    public final void setParam(String key, String value) {
-        this.params.put(key, value);
-    }
-
-    public final void setParams(Map<String, String> params) {
-        this.params.putAll(params);
-    }
-
-    public final long getLastModified() {
-        return executableManager.getOutput(getId()).getLastModified();
-    }
-
-    public final void setSubmitter(String submitter) {
-        setParam(SUBMITTER, submitter);
-    }
-
-    public final List<String> getNotifyList() {
-        final String str = getParam(NOTIFY_LIST);
-        if (str != null) {
-            return Lists.newArrayList(StringUtils.split(str, ","));
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-    public final void setNotifyList(String notifications) {
-        setParam(NOTIFY_LIST, notifications);
-    }
-
-    public final void setNotifyList(List<String> notifications) {
-        setNotifyList(StringUtils.join(notifications, ","));
-    }
-
-    protected Pair<String, String> formatNotifications(ExecutableContext executableContext, ExecutableState state) {
-        return null;
-    }
-
-    protected final void notifyUserStatusChange(ExecutableContext context, ExecutableState state) {
-        try {
-            List<String> users = Lists.newArrayList();
-            users.addAll(getNotifyList());
-            final String adminDls = KylinConfig.getInstanceFromEnv().getAdminDls();
-            if (null != adminDls) {
-                for (String adminDl : adminDls.split(",")) {
-                    users.add(adminDl);
-                }
-            }
-            if (users.isEmpty()) {
-                return;
-            }
-            final Pair<String, String> email = formatNotifications(context, state);
-            if (email == null) {
-                return;
-            }
-            logger.info("prepare to send email to:" + users);
-            logger.info("job name:" + getName());
-            logger.info("submitter:" + getSubmitter());
-            logger.info("notify list:" + users);
-            new MailService().sendMail(users, email.getLeft(), email.getRight());
-        } catch (Exception e) {
-            logger.error(e.getLocalizedMessage(), e);
-        }
-    }
-
-    public final String getSubmitter() {
-        return getParam(SUBMITTER);
-    }
-
-    @Override
-    public final Output getOutput() {
-        return executableManager.getOutput(getId());
-    }
-
-    protected long getExtraInfoAsLong(String key, long defaultValue) {
-        return getExtraInfoAsLong(executableManager.getOutput(getId()), key, defaultValue);
-    }
-
-    public static long getStartTime(Output output) {
-        return getExtraInfoAsLong(output, START_TIME, 0L);
-    }
-
-    public static long getEndTime(Output output) {
-        return getExtraInfoAsLong(output, END_TIME, 0L);
-    }
-
-    public static long getDuration(long startTime, long endTime) {
-        if (startTime == 0) {
-            return 0;
-        }
-        if (endTime == 0) {
-            return System.currentTimeMillis() - startTime;
-        } else {
-            return endTime - startTime;
-        }
-    }
-
-    public static long getExtraInfoAsLong(Output output, String key, long defaultValue) {
-        final String str = output.getExtra().get(key);
-        if (str != null) {
-            return Long.parseLong(str);
-        } else {
-            return defaultValue;
-        }
-    }
-
-    protected final void addExtraInfo(String key, String value) {
-        executableManager.addJobInfo(getId(), key, value);
-    }
-
-    public final void setStartTime(long time) {
-        addExtraInfo(START_TIME, time + "");
-    }
-
-    public final void setEndTime(long time) {
-        addExtraInfo(END_TIME, time + "");
-    }
-
-    public final long getStartTime() {
-        return getExtraInfoAsLong(START_TIME, 0L);
-    }
-
-    public final long getEndTime() {
-        return getExtraInfoAsLong(END_TIME, 0L);
-    }
-
-    public final long getDuration() {
-        return getDuration(getStartTime(), getEndTime());
-    }
-
-    /*
-    * discarded is triggered by JobService, the Scheduler is not awake of that
-    *
-    * */
-    protected final boolean isDiscarded() {
-        final ExecutableState status = executableManager.getOutput(getId()).getState();
-        return status == ExecutableState.DISCARDED;
-    }
-
-    @Override
-    public String toString() {
-        return Objects.toStringHelper(this).add("id", getId()).add("name", getName()).add("state", getStatus()).toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java b/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java
deleted file mode 100644
index d5a7aae..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import java.util.List;
-
-/**
- */
-public interface ChainedExecutable extends Executable {
-
-    List<? extends AbstractExecutable> getTasks();
-    
-    void addTask(AbstractExecutable executable);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
deleted file mode 100644
index 6443762..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.manager.ExecutableManager;
-
-/**
- */
-public class DefaultChainedExecutable extends AbstractExecutable implements ChainedExecutable {
-
-    private final List<AbstractExecutable> subTasks = Lists.newArrayList();
-
-    protected final ExecutableManager jobService = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-    public DefaultChainedExecutable(){
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        List<? extends Executable> executables = getTasks();
-        final int size = executables.size();
-        for (int i = 0; i < size; ++i) {
-            Executable subTask = executables.get(i);
-            if (subTask.isRunnable()) {
-                return subTask.execute(context);
-            }
-        }
-        return new ExecuteResult(ExecuteResult.State.SUCCEED, null);
-    }
-
-    @Override
-    protected void onExecuteStart(ExecutableContext executableContext) {
-        Map<String, String> info = Maps.newHashMap();
-        info.put(START_TIME, Long.toString(System.currentTimeMillis()));
-        final long startTime = getStartTime();
-        if (startTime > 0) {
-            jobService.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
-        } else {
-            jobService.updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
-        }
-    }
-
-    @Override
-    protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
-        if (isDiscarded()) {
-            setEndTime(System.currentTimeMillis());
-            notifyUserStatusChange(executableContext, ExecutableState.DISCARDED);
-        } else if (result.succeed()) {
-            List<? extends Executable> jobs = getTasks();
-            boolean allSucceed = true;
-            boolean hasError = false;
-            for (Executable task: jobs) {
-                final ExecutableState status = task.getStatus();
-                if (status == ExecutableState.ERROR) {
-                    hasError = true;
-                }
-                if (status != ExecutableState.SUCCEED) {
-                    allSucceed = false;
-                }
-            }
-            if (allSucceed) {
-                setEndTime(System.currentTimeMillis());
-                jobService.updateJobOutput(getId(), ExecutableState.SUCCEED, null, null);
-                notifyUserStatusChange(executableContext, ExecutableState.SUCCEED);
-            } else if (hasError) {
-                setEndTime(System.currentTimeMillis());
-                jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, null);
-                notifyUserStatusChange(executableContext, ExecutableState.ERROR);
-            } else {
-                jobService.updateJobOutput(getId(), ExecutableState.READY, null, null);
-            }
-        } else {
-            setEndTime(System.currentTimeMillis());
-            jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, null);
-            notifyUserStatusChange(executableContext, ExecutableState.ERROR);
-        }
-    }
-
-    @Override
-    public List<AbstractExecutable> getTasks() {
-        return subTasks;
-    }
-
-    public final AbstractExecutable getTaskByName(String name) {
-        for (AbstractExecutable task : subTasks) {
-            if (task.getName() != null && task.getName().equalsIgnoreCase(name)) {
-                return task;
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public void addTask(AbstractExecutable executable) {
-        executable.setId(getId() + "-" + String.format("%02d", subTasks.size()));
-        this.subTasks.add(executable);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/DefaultOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/DefaultOutput.java b/job/src/main/java/org/apache/kylin/job/execution/DefaultOutput.java
deleted file mode 100644
index 6bc3281..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/DefaultOutput.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.Map;
-
-/**
- */
-public class DefaultOutput implements Output {
-
-    private ExecutableState state;
-    private Map<String, String> extra;
-    private String verboseMsg;
-    private long lastModified;
-
-    @Override
-    public Map<String, String> getExtra() {
-        return extra;
-    }
-
-    @Override
-    public String getVerboseMsg() {
-        return verboseMsg;
-    }
-
-    @Override
-    public ExecutableState getState() {
-        return state;
-    }
-
-    @Override
-    public long getLastModified() {
-        return lastModified;
-    }
-
-    public void setState(ExecutableState state) {
-        this.state = state;
-    }
-
-    public void setExtra(Map<String, String> extra) {
-        this.extra = extra;
-    }
-
-    public void setVerboseMsg(String verboseMsg) {
-        this.verboseMsg = verboseMsg;
-    }
-
-    public void setLastModified(long lastModified) {
-        this.lastModified = lastModified;
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int hashCode = state.hashCode();
-        hashCode = hashCode * prime + extra.hashCode();
-        hashCode = hashCode * prime + verboseMsg.hashCode();
-        hashCode = hashCode * prime + Long.valueOf(lastModified).hashCode();
-        return hashCode;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (!(obj instanceof DefaultOutput)) {
-            return false;
-        }
-        DefaultOutput another = ((DefaultOutput) obj);
-        if (this.state != another.state) {
-            return false;
-        }
-        if (!extra.equals(another.extra)) {
-            return false;
-        }
-        if (this.lastModified != another.lastModified) {
-            return false;
-        }
-        return StringUtils.equals(verboseMsg, another.verboseMsg);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/Executable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/Executable.java b/job/src/main/java/org/apache/kylin/job/execution/Executable.java
deleted file mode 100644
index a7f1358..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/Executable.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import org.apache.kylin.job.exception.ExecuteException;
-
-import java.util.Map;
-
-/**
- */
-public interface Executable {
-
-    String getId();
-
-    String getName();
-
-    ExecuteResult execute(ExecutableContext executableContext) throws ExecuteException;
-
-    ExecutableState getStatus();
-
-    Output getOutput();
-
-    boolean isRunnable();
-
-    Map<String, String> getParams();
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java b/job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
deleted file mode 100644
index e3f99ca..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import org.apache.kylin.common.KylinConfig;
-
-/**
- */
-public interface ExecutableContext {
-
-    Object getSchedulerContext();
-
-    KylinConfig getConfig();
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java b/job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
deleted file mode 100644
index 5dad4b3..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-import com.google.common.base.Supplier;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-
-/**
- */
-public enum ExecutableState {
-
-    READY,
-    RUNNING,
-    ERROR,
-    STOPPED,
-    DISCARDED,
-    SUCCEED;
-
-    private static Multimap<ExecutableState, ExecutableState> VALID_STATE_TRANSFER;
-
-    static {
-        VALID_STATE_TRANSFER = Multimaps.newSetMultimap(Maps.<ExecutableState, Collection<ExecutableState>>newEnumMap(ExecutableState.class), new Supplier<Set<ExecutableState>>() {
-            @Override
-            public Set<ExecutableState> get() {
-                return new CopyOnWriteArraySet<ExecutableState>();
-            }
-        });
-
-        //scheduler
-        VALID_STATE_TRANSFER.put(ExecutableState.READY, ExecutableState.RUNNING);
-        VALID_STATE_TRANSFER.put(ExecutableState.READY, ExecutableState.ERROR);
-        //user
-        VALID_STATE_TRANSFER.put(ExecutableState.READY, ExecutableState.DISCARDED);
-
-        //job
-        VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.READY);
-        //job
-        VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.SUCCEED);
-        //user
-        VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.DISCARDED);
-        //scheduler,job
-        VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.ERROR);
-
-
-        VALID_STATE_TRANSFER.put(ExecutableState.STOPPED, ExecutableState.DISCARDED);
-        VALID_STATE_TRANSFER.put(ExecutableState.STOPPED, ExecutableState.READY);
-
-        VALID_STATE_TRANSFER.put(ExecutableState.ERROR, ExecutableState.DISCARDED);
-        VALID_STATE_TRANSFER.put(ExecutableState.ERROR, ExecutableState.READY);
-    }
-
-    public boolean isFinalState() {
-        return this == SUCCEED || this == DISCARDED;
-    }
-
-    public static boolean isValidStateTransfer(ExecutableState from, ExecutableState to) {
-        return VALID_STATE_TRANSFER.containsEntry(from, to);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java b/job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
deleted file mode 100644
index cddc0f7..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import com.google.common.base.Preconditions;
-
-/**
- */
-public final class ExecuteResult {
-
-    public static enum State {SUCCEED, FAILED, ERROR, DISCARDED, STOPPED}
-
-    private final State state;
-    private final String output;
-
-    public ExecuteResult(State state) {
-        this(state, "");
-    }
-
-    public ExecuteResult(State state, String output) {
-        Preconditions.checkArgument(state != null, "state cannot be null");
-        this.state = state;
-        this.output = output;
-    }
-
-    public State state() {
-        return state;
-    }
-
-    public boolean succeed() {
-        return state == State.SUCCEED;
-    }
-
-
-    public String output() {
-        return output;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/Idempotent.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/Idempotent.java b/job/src/main/java/org/apache/kylin/job/execution/Idempotent.java
deleted file mode 100644
index 98c950e..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/Idempotent.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import org.apache.kylin.job.exception.ExecuteException;
-
-/**
- */
-public interface Idempotent {
-
-    void cleanup() throws ExecuteException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/execution/Output.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/Output.java b/job/src/main/java/org/apache/kylin/job/execution/Output.java
deleted file mode 100644
index 4d93132..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/Output.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.execution;
-
-import java.util.Map;
-
-/**
- */
-public interface Output {
-
-    Map<String, String> getExtra();
-
-    String getVerboseMsg();
-
-    ExecutableState getState();
-
-    long getLastModified();
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
deleted file mode 100644
index c10d4e0..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop;
-
-/**
- * @author George Song (ysong1)
- *
- */
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.common.util.StringSplitter;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.cmd.ShellCmdOutput;
-import org.apache.kylin.job.exception.JobException;
-import org.apache.kylin.job.tools.OptionsHelper;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.hadoop.util.StringUtils.formatTime;
-
-@SuppressWarnings("static-access")
-public abstract class AbstractHadoopJob extends Configured implements Tool {
-    protected static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class);
-
-    protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname");
-    protected static final Option OPTION_JOB_FLOW_ID = OptionBuilder.withArgName("job flow ID").hasArg().isRequired(true).withDescription("job flow ID").create("jobflowid");
-    protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename");
-    protected static final Option OPTION_II_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create("iiname");
-    protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname");
-    protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Hive table name.").create("tablename");
-    protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
-    protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat");
-    protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("inputdelim").hasArg().isRequired(false).withDescription("Input delimeter").create("inputdelim");
-    protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output");
-    protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level");
-    protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("input");
-    protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename");
-    protected static final Option OPTION_KEY_COLUMN_PERCENTAGE = OptionBuilder.withArgName("rowkey column percentage").hasArg().isRequired(true).withDescription("Percentage of row key columns").create("columnpercentage");
-    protected static final Option OPTION_KEY_SPLIT_NUMBER = OptionBuilder.withArgName("key split number").hasArg().isRequired(true).withDescription("Number of key split range").create("splitnumber");
-
-    protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder.withArgName("statisticsenabled").hasArg().isRequired(false).withDescription("Statistics enabled").create("statisticsenabled");
-    protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName("statisticsoutput").hasArg().isRequired(false).withDescription("Statistics output").create("statisticsoutput");
-    protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder.withArgName("statisticssamplingpercent").hasArg().isRequired(false).withDescription("Statistics sampling percentage").create("statisticssamplingpercent");
-
-    protected String name;
-    protected String description;
-    protected boolean isAsync = false;
-    protected OptionsHelper optionsHelper = new OptionsHelper();
-
-    protected Job job;
-
-    public AbstractHadoopJob() {
-        super(HadoopUtil.newHadoopJobConfiguration());
-    }
-
-    protected void parseOptions(Options options, String[] args) throws ParseException {
-        optionsHelper.parseOptions(options, args);
-    }
-
-    public void printUsage(Options options) {
-        optionsHelper.printUsage(getClass().getSimpleName(), options);
-    }
-
-    public Option[] getOptions() {
-        return optionsHelper.getOptions();
-    }
-
-    public String getOptionsAsString() {
-        return optionsHelper.getOptionsAsString();
-    }
-
-    protected String getOptionValue(Option option) {
-        return optionsHelper.getOptionValue(option);
-    }
-
-    protected boolean hasOption(Option option) {
-        return optionsHelper.hasOption(option);
-    }
-
-    protected int waitForCompletion(Job job) throws IOException, InterruptedException, ClassNotFoundException {
-        int retVal = 0;
-        long start = System.nanoTime();
-        if (isAsync) {
-            job.submit();
-        } else {
-            job.waitForCompletion(true);
-            retVal = job.isSuccessful() ? 0 : 1;
-            logger.debug("Job '" + job.getJobName() + "' finished " + (job.isSuccessful() ? "successfully in " : "with failures.  Time taken ") + formatTime((System.nanoTime() - start) / 1000000L));
-        }
-        return retVal;
-    }
-
-    protected static void runJob(Tool job, String[] args) {
-        try {
-            int exitCode = ToolRunner.run(job, args);
-            System.exit(exitCode);
-        } catch (Exception e) {
-            e.printStackTrace(System.err);
-            System.exit(5);
-        }
-    }
-
-    private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
-
-    protected void setJobClasspath(Job job) {
-        String jarPath = KylinConfig.getInstanceFromEnv().getKylinJobJarPath();
-        File jarFile = new File(jarPath);
-        if (jarFile.exists()) {
-            job.setJar(jarPath);
-            logger.info("append job jar: " + jarPath);
-        } else {
-            job.setJarByClass(this.getClass());
-        }
-
-        String kylinHiveDependency = System.getProperty("kylin.hive.dependency");
-        String kylinHBaseDependency = System.getProperty("kylin.hbase.dependency");
-        logger.info("append kylin.hive.dependency: " + kylinHiveDependency + " and kylin.hive.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH);
-
-        Configuration jobConf = job.getConfiguration();
-        String classpath = jobConf.get(MAP_REDUCE_CLASSPATH);
-        if (classpath == null || classpath.length() == 0) {
-            logger.info("Didn't find " + MAP_REDUCE_CLASSPATH + " in job configuration, will run 'mapred classpath' to get the default value.");
-            classpath = getDefaultMapRedClasspath();
-            logger.info("The default mapred classpath is: " + classpath);
-        }
-
-
-        if (kylinHBaseDependency != null) {
-            // yarn classpath is comma separated
-            kylinHBaseDependency = kylinHBaseDependency.replace(":", ",");
-            classpath = classpath + "," + kylinHBaseDependency;
-        }
-
-        if (kylinHiveDependency != null) {
-            // yarn classpath is comma separated
-            kylinHiveDependency = kylinHiveDependency.replace(":", ",");
-            classpath = classpath + "," + kylinHiveDependency;
-        }
-
-        jobConf.set(MAP_REDUCE_CLASSPATH, classpath + "," + kylinHiveDependency);
-        logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH));
-    }
-
-
-    private String getDefaultMapRedClasspath() {
-
-        String classpath = "";
-        try {
-            CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
-            ShellCmdOutput output = new ShellCmdOutput();
-            executor.execute("mapred classpath", output);
-
-            classpath = output.getOutput().trim().replace(':', ',');
-        } catch (IOException e) {
-            logger.error("Failed to run: 'mapred classpath'.", e);
-        }
-
-        return classpath;
-    }
-
-
-    public void addInputDirs(String input, Job job) throws IOException {
-        for (String inp : StringSplitter.split(input, ",")) {
-            inp = inp.trim();
-            if (inp.endsWith("/*")) {
-                inp = inp.substring(0, inp.length() - 2);
-                FileSystem fs = FileSystem.get(job.getConfiguration());
-                Path path = new Path(inp);
-                FileStatus[] fileStatuses = fs.listStatus(path);
-                boolean hasDir = false;
-                for (FileStatus stat : fileStatuses) {
-                    if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
-                        hasDir = true;
-                        addInputDirs(stat.getPath().toString(), job);
-                    }
-                }
-                if (fileStatuses.length > 0 && !hasDir) {
-                    addInputDirs(path.toString(), job);
-                }
-            } else {
-                logger.debug("Add input " + inp);
-                FileInputFormat.addInputPath(job, new Path(inp));
-            }
-        }
-    }
-
-    public static KylinConfig loadKylinPropsAndMetadata() throws IOException {
-        File metaDir = new File("meta");
-        System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
-        logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath());
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        kylinConfig.setMetadataUrl(metaDir.getCanonicalPath());
-        return kylinConfig;
-    }
-
-    protected void attachKylinPropsAndMetadata(TableDesc table, Configuration conf) throws IOException {
-        ArrayList<String> dumpList = new ArrayList<String>();
-        dumpList.add(table.getResourcePath());
-        attachKylinPropsAndMetadata(dumpList, conf);
-    }
-    
-    protected void attachKylinPropsAndMetadata(CubeInstance cube, Configuration conf) throws IOException {
-        MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
-        
-        // write cube / model_desc / cube_desc / dict / table
-        ArrayList<String> dumpList = new ArrayList<String>();
-        dumpList.add(cube.getResourcePath());
-        dumpList.add(cube.getDescriptor().getModel().getResourcePath());
-        dumpList.add(cube.getDescriptor().getResourcePath());
-        
-        for (String tableName : cube.getDescriptor().getModel().getAllTables()) {
-            TableDesc table = metaMgr.getTableDesc(tableName);
-            dumpList.add(table.getResourcePath());
-        }
-        for (CubeSegment segment : cube.getSegments()) {
-            dumpList.addAll(segment.getDictionaryPaths());
-        }
-        
-        attachKylinPropsAndMetadata(dumpList, conf);
-    }
-
-    protected void attachKylinPropsAndMetadata(IIInstance ii, Configuration conf) throws IOException {
-        MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
-        
-        // write II / model_desc / II_desc / dict / table
-        ArrayList<String> dumpList = new ArrayList<String>();
-        dumpList.add(ii.getResourcePath());
-        dumpList.add(ii.getDescriptor().getModel().getResourcePath());
-        dumpList.add(ii.getDescriptor().getResourcePath());
-
-        for (String tableName : ii.getDescriptor().getModel().getAllTables()) {
-            TableDesc table = metaMgr.getTableDesc(tableName);
-            dumpList.add(table.getResourcePath());
-        }
-        for (IISegment segment : ii.getSegments()) {
-            dumpList.addAll(segment.getDictionaryPaths());
-        }
-
-        attachKylinPropsAndMetadata(dumpList, conf);
-    }
-
-    private void attachKylinPropsAndMetadata(ArrayList<String> dumpList, Configuration conf) throws IOException {
-        File tmp = File.createTempFile("kylin_job_meta", "");
-        tmp.delete(); // we need a directory, so delete the file first
-
-        File metaDir = new File(tmp, "meta");
-        metaDir.mkdirs();
-        metaDir.getParentFile().deleteOnExit();
-
-        // write kylin.properties
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        File kylinPropsFile = new File(metaDir, "kylin.properties");
-        kylinConfig.writeProperties(kylinPropsFile);
-
-        // write resources
-        dumpResources(kylinConfig, metaDir, dumpList);
-
-        // hadoop distributed cache
-        conf.set("tmpfiles", "file:///" + OptionsHelper.convertToFileURL(metaDir.getAbsolutePath()));
-    }
-
-    private void dumpResources(KylinConfig kylinConfig, File metaDir, ArrayList<String> dumpList) throws IOException {
-        ResourceStore from = ResourceStore.getStore(kylinConfig);
-        KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
-        ResourceStore to = ResourceStore.getStore(localConfig);
-        for (String path : dumpList) {
-            InputStream in = from.getResource(path);
-            if (in == null)
-                throw new IllegalStateException("No resource found at -- " + path);
-            long ts = from.getResourceTimestamp(path);
-            to.putResource(path, in, ts);
-            //The following log is duplicate with in ResourceStore
-            //log.info("Dumped resource " + path + " to " + metaDir.getAbsolutePath());
-        }
-    }
-
-    protected void deletePath(Configuration conf, Path path) throws IOException {
-        HadoopUtil.deletePath(conf, path);
-    }
-
-    protected double getTotalMapInputMB() throws ClassNotFoundException, IOException, InterruptedException, JobException {
-        if (job == null) {
-            throw new JobException("Job is null");
-        }
-
-        long mapInputBytes = 0;
-        InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
-        for (InputSplit split : input.getSplits(job)) {
-            mapInputBytes += split.getLength();
-        }
-        if (mapInputBytes == 0) {
-            throw new IllegalArgumentException("Map input splits are 0 bytes, something is wrong!");
-        }
-        double totalMapInputMB = (double) mapInputBytes / 1024 / 1024;
-        return totalMapInputMB;
-    }
-
-    protected int getMapInputSplitCount() throws ClassNotFoundException, JobException, IOException, InterruptedException {
-        if (job == null) {
-            throw new JobException("Job is null");
-        }
-        InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
-        return input.getSplits(job).size();
-    }
-
-    public void kill() throws JobException {
-        if (job != null) {
-            try {
-                job.killJob();
-            } catch (IOException e) {
-                throw new JobException(e);
-            }
-        }
-    }
-
-    public Map<String, String> getInfo() throws JobException {
-        if (job != null) {
-            Map<String, String> status = new HashMap<String, String>();
-            if (null != job.getJobID()) {
-                status.put(JobInstance.MR_JOB_ID, job.getJobID().toString());
-            }
-            if (null != job.getTrackingURL()) {
-                status.put(JobInstance.YARN_APP_URL, job.getTrackingURL().toString());
-            }
-
-            return status;
-        } else {
-            throw new JobException("Job is null");
-        }
-    }
-
-    public Counters getCounters() throws JobException {
-        if (job != null) {
-            try {
-                return job.getCounters();
-            } catch (IOException e) {
-                throw new JobException(e);
-            }
-        } else {
-            throw new JobException("Job is null");
-        }
-    }
-
-    public void setAsync(boolean isAsync) {
-        this.isAsync = isAsync;
-    }
-
-    public Job getJob() {
-        return this.job;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
index 7f549d0..2e3cdb0 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
@@ -32,10 +32,10 @@ import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
index 0670178..de10e65 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
@@ -30,9 +30,9 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 
 /**
  * This hadoop job will scan all rows of the hive table and then calculate the cardinality on each column.