You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/23 13:21:49 UTC

[12/23] incubator-kylin git commit: KYLIN-875 half way

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/Scheduler.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/Scheduler.java b/job/src/main/java/org/apache/kylin/job/Scheduler.java
deleted file mode 100644
index 2ed2fc2..0000000
--- a/job/src/main/java/org/apache/kylin/job/Scheduler.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.SchedulerException;
-import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.lock.JobLock;
-
-/**
- */
-public interface Scheduler<T extends Executable> {
-
-    void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException;
-
-    void shutdown() throws SchedulerException;
-
-    boolean stop(T executable) throws SchedulerException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java b/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
deleted file mode 100644
index 29b5324..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-/**
- */
-public abstract class BaseCommandOutput implements ICommandOutput {
-
-    @Override
-    public void log(String message) {
-        this.appendOutput(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java b/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
deleted file mode 100644
index 6cab6a3..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.common.util.Logger;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-
-/**
- * @author xjiang
- * 
- */
-public interface ICommandOutput extends Logger {
-
-    public void setStatus(JobStepStatusEnum status);
-
-    public JobStepStatusEnum getStatus();
-
-    public void appendOutput(String message);
-
-    public String getOutput();
-
-    public void setExitCode(int exitCode);
-
-    public int getExitCode();
-
-    public void reset();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java b/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
deleted file mode 100644
index 5a47173..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.job.exception.JobException;
-
-/**
- * @author xjiang
- * 
- */
-public interface IJobCommand {
-
-    public ICommandOutput execute() throws JobException;
-
-    public void cancel() throws JobException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java b/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
deleted file mode 100644
index 6a718fc..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.exception.JobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.*;
-
-/**
- * @author xjiang
- * 
- */
-public class ShellCmd implements IJobCommand {
-
-    private static Logger log = LoggerFactory.getLogger(ShellCmd.class);
-
-    private final String executeCommand;
-    private final ICommandOutput output;
-    private final boolean isAsync;
-    private final CliCommandExecutor cliCommandExecutor;
-
-    private FutureTask<Integer> future;
-
-    private ShellCmd(String executeCmd, ICommandOutput out, String host, int port, String user, String password, boolean async) {
-        this.executeCommand = executeCmd;
-        this.output = out;
-        this.cliCommandExecutor = new CliCommandExecutor();
-        this.cliCommandExecutor.setRunAtRemote(host, port, user, password);
-        this.isAsync = async;
-    }
-
-    public ShellCmd(String executeCmd, String host, int port, String user, String password, boolean async) {
-        this(executeCmd, new ShellCmdOutput(), host, port, user, password, async);
-    }
-
-    @Override
-    public ICommandOutput execute() throws JobException {
-
-        final ExecutorService executor = Executors.newSingleThreadExecutor();
-        future = new FutureTask<Integer>(new Callable<Integer>() {
-            public Integer call() throws JobException, IOException {
-                executor.shutdown();
-                return executeCommand(executeCommand);
-            }
-        });
-        executor.execute(future);
-
-        int exitCode = -1;
-        if (!isAsync) {
-            try {
-                exitCode = future.get();
-                log.info("finish executing");
-            } catch (CancellationException e) {
-                log.debug("Command is cancelled");
-                exitCode = -2;
-            } catch (Exception e) {
-                throw new JobException("Error when execute job " + executeCommand, e);
-            } finally {
-                if (exitCode == 0) {
-                    output.setStatus(JobStepStatusEnum.FINISHED);
-                } else if (exitCode == -2) {
-                    output.setStatus(JobStepStatusEnum.DISCARDED);
-                } else {
-                    output.setStatus(JobStepStatusEnum.ERROR);
-                }
-                output.setExitCode(exitCode);
-            }
-        }
-        return output;
-    }
-
-    protected int executeCommand(String command) throws JobException, IOException {
-        output.reset();
-        output.setStatus(JobStepStatusEnum.RUNNING);
-        return cliCommandExecutor.execute(command, output).getFirst();
-    }
-
-    @Override
-    public void cancel() throws JobException {
-        future.cancel(true);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java b/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
deleted file mode 100644
index ebcad47..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-
-/**
- * @author xjiang
- * 
- */
-public class ShellCmdOutput extends BaseCommandOutput implements ICommandOutput {
-
-    protected static final Logger log = LoggerFactory.getLogger(ShellCmdOutput.class);
-
-    protected StringBuilder output;
-    protected int exitCode;
-    protected JobStepStatusEnum status;
-
-    public ShellCmdOutput() {
-        init();
-    }
-
-    private void init() {
-        output = new StringBuilder();
-        exitCode = -1;
-        status = JobStepStatusEnum.NEW;
-    }
-
-    @Override
-    public JobStepStatusEnum getStatus() {
-        return status;
-    }
-
-    @Override
-    public void setStatus(JobStepStatusEnum s) {
-        this.status = s;
-    }
-
-    @Override
-    public String getOutput() {
-        return output.toString();
-    }
-
-    @Override
-    public void appendOutput(String message) {
-        output.append(message).append(System.getProperty("line.separator"));
-        log.debug(message);
-    }
-
-    @Override
-    public int getExitCode() {
-        return exitCode;
-    }
-
-    @Override
-    public void setExitCode(int code) {
-        exitCode = code;
-    }
-
-    @Override
-    public void reset() {
-        init();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java b/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java
deleted file mode 100644
index 288fd31..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskCounter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @author xduo
- * 
- */
-public class HadoopCmdOutput {
-
-    protected static final Logger log = LoggerFactory.getLogger(HadoopCmdOutput.class);
-
-    private final StringBuilder output;
-    private final Job job;
-
-    public HadoopCmdOutput(Job job, StringBuilder output) {
-        super();
-        this.job = job;
-        this.output = output;
-    }
-
-    public String getMrJobId() {
-        return getInfo().get(ExecutableConstants.MR_JOB_ID);
-    }
-
-    public Map<String, String> getInfo() {
-        if (job != null) {
-            Map<String, String> status = new HashMap<String, String>();
-            if (null != job.getJobID()) {
-                status.put(ExecutableConstants.MR_JOB_ID, job.getJobID().toString());
-            }
-            if (null != job.getTrackingURL()) {
-                status.put(ExecutableConstants.YARN_APP_URL, job.getTrackingURL().toString());
-            }
-            return status;
-        } else {
-            return Collections.emptyMap();
-        }
-    }
-
-    private String mapInputRecords;
-    private String hdfsBytesWritten;
-    private String hdfsBytesRead;
-
-    public String getMapInputRecords() {
-        return mapInputRecords;
-    }
-
-    public String getHdfsBytesWritten() {
-        return hdfsBytesWritten;
-    }
-
-    public String getHdfsBytesRead() {
-        return hdfsBytesRead;
-    }
-    
-    public void updateJobCounter() {
-        try {
-            Counters counters = job.getCounters();
-            if (counters == null) {
-                String errorMsg = "no counters for job " + getMrJobId();
-                log.warn(errorMsg);
-                output.append(errorMsg);
-                return;
-            }
-            this.output.append(counters.toString()).append("\n");
-            log.debug(counters.toString());
-
-            mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue());
-            hdfsBytesWritten = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue());
-            hdfsBytesRead = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_READ").getValue());
-        } catch (Exception e) {
-            log.error(e.getLocalizedMessage(), e);
-            output.append(e.getLocalizedMessage());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java b/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java
deleted file mode 100644
index 2da5b2a..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.reflect.Constructor;
-
-import org.apache.hadoop.util.ToolRunner;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.execution.AbstractExecutable;
-
-/**
- */
-public class HadoopShellExecutable extends AbstractExecutable {
-
-    private static final String KEY_MR_JOB = "HADOOP_SHELL_JOB_CLASS";
-    private static final String KEY_PARAMS = "HADOOP_SHELL_JOB_PARAMS";
-
-    public HadoopShellExecutable() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final String mapReduceJobClass = getJobClass();
-        String params = getJobParams();
-        Preconditions.checkNotNull(mapReduceJobClass);
-        Preconditions.checkNotNull(params);
-        try {
-            final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
-            final AbstractHadoopJob job = constructor.newInstance();
-            String[] args = params.trim().split("\\s+");
-            logger.info("parameters of the HadoopShellExecutable:");
-            logger.info(params);
-            int result;
-            StringBuilder log = new StringBuilder();
-            try {
-                result = ToolRunner.run(job, args);
-            } catch (Exception ex) {
-                logger.error("error execute " + this.toString(), ex);
-                StringWriter stringWriter = new StringWriter();
-                ex.printStackTrace(new PrintWriter(stringWriter));
-                log.append(stringWriter.toString()).append("\n");
-                result = 2;
-            }
-            log.append("result code:").append(result);
-            return result == 0 ? new ExecuteResult(ExecuteResult.State.SUCCEED, log.toString()):new ExecuteResult(ExecuteResult.State.FAILED, log.toString());
-        } catch (ReflectiveOperationException e) {
-            logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        } catch (Exception e) {
-            logger.error("error execute " + this.toString(), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-    public void setJobClass(Class<? extends AbstractHadoopJob> clazzName) {
-        setParam(KEY_MR_JOB, clazzName.getName());
-    }
-
-    public String getJobClass() throws ExecuteException {
-        return getParam(KEY_MR_JOB);
-    }
-
-    public void setJobParams(String param) {
-        setParam(KEY_PARAMS, param);
-    }
-
-    public String getJobParams() {
-        return getParam(KEY_PARAMS);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java b/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java
deleted file mode 100644
index ffe45ed..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.source.hive.HiveClient;
-import org.datanucleus.store.types.backed.HashMap;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.collect.Lists;
-
-import org.apache.kylin.common.util.JsonUtil;
-
-/**
- */
-public class HqlExecutable extends AbstractExecutable {
-
-    private static final String HQL = "hql";
-    private static final String HIVE_CONFIG = "hive-config";
-
-    public HqlExecutable() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        try {
-            Map<String, String> configMap = getConfiguration();
-            HiveClient hiveClient = new HiveClient(configMap);
-            
-            for (String hql: getHqls()) {
-                hiveClient.executeHQL(hql);
-            }
-            return new ExecuteResult(ExecuteResult.State.SUCCEED);
-        } catch (Exception e) {
-            logger.error("error run hive query:" + getHqls(), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-    
-    public void setConfiguration(Map<String, String> configMap) {
-        if(configMap != null) {
-            String configStr = "";
-            try {
-                configStr = JsonUtil.writeValueAsString(configMap);
-            } catch (JsonProcessingException e) {
-                e.printStackTrace();
-            }
-            setParam(HIVE_CONFIG, configStr);
-        }
-    }
-
-
-    @SuppressWarnings("unchecked")
-    private Map<String, String> getConfiguration() {
-        String configStr = getParam(HIVE_CONFIG);
-        Map<String, String> result = null;
-        if(configStr != null) {
-            try {
-                result = JsonUtil.readValue(configStr, HashMap.class);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-        
-        return result;
-    }
-    
-    public void setHqls(List<String> hqls) {
-        setParam(HQL, StringUtils.join(hqls, ";"));
-    }
-
-    private List<String> getHqls() {
-        final String hqls = getParam(HQL);
-        if (hqls != null) {
-            return Lists.newArrayList(StringUtils.split(hqls, ";"));
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
deleted file mode 100644
index f8eab6c..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.reflect.Constructor;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Cluster;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.tools.HadoopStatusChecker;
-
-import com.google.common.base.Preconditions;
-
-/**
- */
-public class MapReduceExecutable extends AbstractExecutable {
-
-    private static final String KEY_MR_JOB = "MR_JOB_CLASS";
-    private static final String KEY_PARAMS = "MR_JOB_PARAMS";
-    private static final String KEY_COUNTER_SAVEAS = "MR_COUNTER_SAVEAS";
-    
-    public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
-
-    public MapReduceExecutable() {
-        super();
-    }
-
-    @Override
-    protected void onExecuteStart(ExecutableContext executableContext) {
-        final Output output = executableManager.getOutput(getId());
-        if (output.getExtra().containsKey(START_TIME)) {
-            final String mrJobId = output.getExtra().get(ExecutableConstants.MR_JOB_ID);
-            if (mrJobId == null) {
-                executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
-                return;
-            }
-            try {
-                Job job = new Cluster(new Configuration()).getJob(JobID.forName(mrJobId));
-                if (job.getJobState() == JobStatus.State.FAILED) {
-                    //remove previous mr job info
-                    super.onExecuteStart(executableContext);
-                } else {
-                    executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
-                }
-            } catch (IOException e) {
-                logger.warn("error get hadoop status");
-                super.onExecuteStart(executableContext);
-            } catch (InterruptedException e) {
-                logger.warn("error get hadoop status");
-                super.onExecuteStart(executableContext);
-            }
-        } else {
-            super.onExecuteStart(executableContext);
-        }
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final String mapReduceJobClass = getMapReduceJobClass();
-        String params = getMapReduceParams();
-        Preconditions.checkNotNull(mapReduceJobClass);
-        Preconditions.checkNotNull(params);
-        try {
-            Job job;
-            final Map<String, String> extra = executableManager.getOutput(getId()).getExtra();
-            if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) {
-                job = new Cluster(new Configuration()).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
-                logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID + " resumed"));
-            } else {
-                final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
-                final AbstractHadoopJob hadoopJob = constructor.newInstance();
-                hadoopJob.setAsync(true); // so the ToolRunner.run() returns right away
-                logger.info("parameters of the MapReduceExecutable:");
-                logger.info(params);
-                String[] args = params.trim().split("\\s+");
-                try {
-                    //for async mr job, ToolRunner just return 0;
-                    ToolRunner.run(hadoopJob, args);
-                } catch (Exception ex) {
-                    StringBuilder log = new StringBuilder();
-                    logger.error("error execute " + this.toString(), ex);
-                    StringWriter stringWriter = new StringWriter();
-                    ex.printStackTrace(new PrintWriter(stringWriter));
-                    log.append(stringWriter.toString()).append("\n");
-                    log.append("result code:").append(2);
-                    return new ExecuteResult(ExecuteResult.State.ERROR, log.toString());
-                }
-                job = hadoopJob.getJob();
-            }
-            final StringBuilder output = new StringBuilder();
-            final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, output);
-
-            final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig());
-            if (restStatusCheckUrl == null) {
-                logger.error("restStatusCheckUrl is null");
-                return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null");
-            }
-            String mrJobId = hadoopCmdOutput.getMrJobId();
-            HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output);
-            JobStepStatusEnum status = JobStepStatusEnum.NEW;
-            while (!isDiscarded()) {
-                JobStepStatusEnum newStatus = statusChecker.checkStatus();
-                if (status == JobStepStatusEnum.KILLED) {
-                    executableManager.updateJobOutput(getId(), ExecutableState.ERROR, Collections.<String, String>emptyMap(), "killed by admin");
-                    return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin");
-                }
-                if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) {
-                    final long waitTime = System.currentTimeMillis() - getStartTime();
-                    setMapReduceWaitTime(waitTime);
-                }
-                status = newStatus;
-                executableManager.addJobInfo(getId(), hadoopCmdOutput.getInfo());
-                if (status.isComplete()) {
-                    final Map<String, String> info = hadoopCmdOutput.getInfo();
-                    readCounters(hadoopCmdOutput, info);
-                    executableManager.addJobInfo(getId(), info);
-
-                    if (status == JobStepStatusEnum.FINISHED) {
-                        return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
-                    } else {
-                        return new ExecuteResult(ExecuteResult.State.FAILED, output.toString());
-                    }
-                }
-                Thread.sleep(context.getConfig().getYarnStatusCheckIntervalSeconds() * 1000);
-            }
-            //TODO kill discarded mr job using "hadoop job -kill " + mrJobId
-
-            return new ExecuteResult(ExecuteResult.State.DISCARDED, output.toString());
-
-        } catch (ReflectiveOperationException e) {
-            logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        } catch (Exception e) {
-            logger.error("error execute " + this.toString(), e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-    private void readCounters(final HadoopCmdOutput hadoopCmdOutput, final Map<String, String> info) {
-        hadoopCmdOutput.updateJobCounter();
-        info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords());
-        info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead());
-        info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten());
-        
-        String saveAs = getParam(KEY_COUNTER_SAVEAS);
-        if (saveAs != null) {
-            String[] saveAsNames = saveAs.split(",");
-            saveCounterAs(hadoopCmdOutput.getMapInputRecords(), saveAsNames, 0, info);
-            saveCounterAs(hadoopCmdOutput.getHdfsBytesRead(), saveAsNames, 1, info);
-            saveCounterAs(hadoopCmdOutput.getHdfsBytesWritten(), saveAsNames, 2, info);
-        }
-    }
-
-    private void saveCounterAs(String counter, String[] saveAsNames, int i, Map<String, String> info) {
-        if (saveAsNames.length > i && StringUtils.isBlank(saveAsNames[i]) == false) {
-            info.put(saveAsNames[i].trim(), counter);
-        }
-    }
-
-    private String getRestStatusCheckUrl(Job job, KylinConfig config) {
-        final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl();
-        if (yarnStatusCheckUrl != null) {
-            return yarnStatusCheckUrl;
-        } else {
-            logger.info(KylinConfig.KYLIN_JOB_YARN_APP_REST_CHECK_URL + " is not set, read from job configuration");
-        }
-        String rmWebHost = job.getConfiguration().get("yarn.resourcemanager.webapp.address");
-        if (StringUtils.isEmpty(rmWebHost)) {
-            return null;
-        }
-        if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) {
-            //do nothing
-        } else {
-            rmWebHost = "http://" + rmWebHost;
-        }
-        logger.info("yarn.resourcemanager.webapp.address:" + rmWebHost);
-        return rmWebHost + "/ws/v1/cluster/apps/${job_id}?anonymous=true";
-    }
-
-    public long getMapReduceWaitTime() {
-        return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
-    }
-
-    public void setMapReduceWaitTime(long t) {
-        addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
-    }
-
-    public void setMapReduceJobClass(Class<? extends AbstractHadoopJob> clazzName) {
-        setParam(KEY_MR_JOB, clazzName.getName());
-    }
-
-    public String getMapReduceJobClass() throws ExecuteException {
-        return getParam(KEY_MR_JOB);
-    }
-
-    public void setMapReduceParams(String param) {
-        setParam(KEY_PARAMS, param);
-    }
-
-    public String getMapReduceParams() {
-        return getParam(KEY_PARAMS);
-    }
-    
-    public String getCounterSaveAs() {
-        return getParam(KEY_COUNTER_SAVEAS);
-    }
-    
-    public void setCounterSaveAs(String value) {
-        setParam(KEY_COUNTER_SAVEAS, value);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
deleted file mode 100644
index 786698e..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.kylin.common.util.Pair;
-
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.util.Logger;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-/**
- */
-public class ShellExecutable extends AbstractExecutable {
-
-    private static final String CMD = "cmd";
-
-    public ShellExecutable() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        try {
-            logger.info("executing:" + getCmd());
-            final ShellExecutableLogger logger = new ShellExecutableLogger();
-            final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), logger);
-            executableManager.addJobInfo(getId(), logger.getInfo());
-            return new ExecuteResult(result.getFirst() == 0? ExecuteResult.State.SUCCEED: ExecuteResult.State.FAILED, result.getSecond());
-        } catch (IOException e) {
-            logger.error("job:" + getId() + " execute finished with exception", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-    public void setCmd(String cmd) {
-        setParam(CMD, cmd);
-    }
-
-    public String getCmd() {
-        return getParam(CMD);
-    }
-
-    private static class ShellExecutableLogger implements Logger {
-
-        private final Map<String, String> info = Maps.newHashMap();
-
-        private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
-        private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
-        private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)");
-        private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)");
-        private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)");
-        private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write");
-
-        // hive
-        private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)");
-        private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS");
-
-        @Override
-        public void log(String message) {
-            Matcher matcher = PATTERN_APP_ID.matcher(message);
-            if (matcher.find()) {
-                String appId = matcher.group(1);
-                info.put(ExecutableConstants.YARN_APP_ID, appId);
-            }
-
-            matcher = PATTERN_APP_URL.matcher(message);
-            if (matcher.find()) {
-                String appTrackingUrl = matcher.group(1);
-                info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl);
-            }
-
-            matcher = PATTERN_JOB_ID.matcher(message);
-            if (matcher.find()) {
-                String mrJobID = matcher.group(1);
-                info.put(ExecutableConstants.MR_JOB_ID, mrJobID);
-            }
-
-            matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message);
-            if (matcher.find()) {
-                String hdfsWritten = matcher.group(1);
-                info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
-            }
-
-            matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message);
-            if (matcher.find()) {
-                String sourceCount = matcher.group(1);
-                info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount);
-            }
-
-            matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message);
-            if (matcher.find()) {
-                String sourceSize = matcher.group(1);
-                info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize);
-            }
-
-            // hive
-            matcher = PATTERN_HIVE_APP_ID_URL.matcher(message);
-            if (matcher.find()) {
-                String jobId = matcher.group(1);
-                String trackingUrl = matcher.group(2);
-                info.put(ExecutableConstants.MR_JOB_ID, jobId);
-                info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
-            }
-
-            matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message);
-            if (matcher.find()) {
-                // String hdfsRead = matcher.group(1);
-                String hdfsWritten = matcher.group(2);
-                info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
-            }
-        }
-
-        Map<String, String> getInfo() {
-            return info;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
deleted file mode 100644
index 3a64d02..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-public interface BatchConstants {
-
-    public static final char INTERMEDIATE_TABLE_ROW_DELIMITER = 127;
-
-    public static final String CFG_CUBE_NAME = "cube.name";
-    public static final String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
-
-    public static final String CFG_II_NAME = "ii.name";
-    public static final String CFG_II_SEGMENT_NAME = "ii.segment.name";
-
-    public static final String INPUT_DELIM = "input.delim";
-    public static final String OUTPUT_PATH = "output.path";
-
-    public static final String TABLE_NAME = "table.name";
-    public static final String TABLE_COLUMNS = "table.columns";
-
-    public static final String CFG_IS_MERGE = "is.merge";
-    public static final String CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER = "cube.intermediate.table.row.delimiter";
-
-    public static final String MAPREDUCE_COUTNER_GROUP_NAME = "Cube Builder";
-
-    public static final String MAPPER_SAMPLE_NUMBER = "mapper.sample.number";
-    public static final String REGION_NUMBER = "region.number";
-    public static final String CUBE_CAPACITY = "cube.capacity";
-
-    public static final String CFG_STATISTICS_ENABLED = "statistics.enabled";
-    public static final String CFG_STATISTICS_OUTPUT = "statistics.ouput";
-    public static final String CFG_STATISTICS_SAMPLING_PERCENT = "statistics.sampling.percent";
-    public static final String CFG_STATISTICS_CUBE_ESTIMATION = "cube_statistics.txt";
-    public static final String CFG_STATISTICS_CUBOID_ESTIMATION = "cuboid_statistics.seq";
-
-    public static final int COUNTER_MAX = 100000;
-    public static final int ERROR_RECORD_THRESHOLD = 100;
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
deleted file mode 100644
index fdcfdbe..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-/**
- */
-public final class ExecutableConstants {
-
-    private ExecutableConstants(){}
-
-    public static final String YARN_APP_ID = "yarn_application_id";
-
-    public static final String YARN_APP_URL = "yarn_application_tracking_url";
-    public static final String MR_JOB_ID = "mr_job_id";
-    public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
-    public static final String SOURCE_RECORDS_COUNT = "source_records_count";
-    public static final String SOURCE_RECORDS_SIZE = "source_records_size";
-    public static final String GLOBAL_LISTENER_NAME = "ChainListener";
-
-
-
-
-    public static final int DEFAULT_SCHEDULER_INTERVAL_SECONDS = 60;
-
-    public static final String CUBE_JOB_GROUP_NAME = "cube_job_group";
-
-    public static final String DAEMON_JOB_GROUP_NAME = "daemon_job_group";
-    public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
-
-    public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
-    public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
-    public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data";
-    public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube";
-    public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid Data";
-    public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits";
-    public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable";
-    public static final String STEP_NAME_CONVERT_CUBOID_TO_HFILE = "Convert Cuboid Data to HFile";
-    public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table";
-    public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
-    public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics";
-    public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics";
-    public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
-    public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
-    public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
-    
-    public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
-    public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";
-
-    public static final String PROP_ENGINE_CONTEXT = "jobengineConfig";
-    public static final String PROP_JOB_FLOW = "jobFlow";
-    public static final String PROP_JOBINSTANCE_UUID = "jobInstanceUuid";
-    public static final String PROP_JOBSTEP_SEQ_ID = "jobStepSequenceID";
-    public static final String PROP_COMMAND = "command";
-    // public static final String PROP_STORAGE_LOCATION =
-    // "storageLocationIdentifier";
-    public static final String PROP_JOB_ASYNC = "jobAsync";
-    public static final String PROP_JOB_CMD_EXECUTOR = "jobCmdExecutor";
-    public static final String PROP_JOB_CMD_OUTPUT = "jobCmdOutput";
-    public static final String PROP_JOB_KILLED = "jobKilled";
-    public static final String PROP_JOB_RUNTIME_FLOWS = "jobFlows";
-
-    public static final String NOTIFY_EMAIL_TEMPLATE = "<div><b>Build Result of Job ${job_name}</b><pre><ul>" + "<li>Build Result: <b>${result}</b></li>" + "<li>Job Engine: ${job_engine}</li>" + "<li>Cube Name: ${cube_name}</li>" + "<li>Start Time: ${start_time}</li>" + "<li>Duration: ${duration}</li>" + "<li>MR Waiting: ${mr_waiting}</li>" + "<li>Last Update Time: ${last_update_time}</li>" + "<li>Submitter: ${submitter}</li>" + "<li>Error Log: ${error_log}</li>" + "</ul></pre><div/>";
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
deleted file mode 100644
index a4ef564..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-public enum JobStatusEnum {
-
-    NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16);
-
-    private final int code;
-
-    private JobStatusEnum(int statusCode) {
-        this.code = statusCode;
-    }
-
-    public static JobStatusEnum getByCode(int statusCode) {
-        for (JobStatusEnum status : values()) {
-            if (status.getCode() == statusCode) {
-                return status;
-            }
-        }
-
-        return null;
-    }
-
-    public int getCode() {
-        return this.code;
-    }
-
-    public boolean isComplete() {
-        return code == JobStatusEnum.FINISHED.getCode() || code == JobStatusEnum.ERROR.getCode() || code == JobStatusEnum.DISCARDED.getCode();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
deleted file mode 100644
index 02b40a3..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-/**
- * @author xduo, ysong1
- * 
- */
-public enum JobStepCmdTypeEnum {
-    SHELL_CMD, SHELL_CMD_HADOOP, JAVA_CMD_HADOOP_FACTDISTINCT, JAVA_CMD_HADOOP_BASECUBOID, JAVA_CMD_HADOOP_NDCUBOID, JAVA_CMD_HADOOP_RANGEKEYDISTRIBUTION, JAVA_CMD_HADOOP_CONVERTHFILE, JAVA_CMD_HADOOP_MERGECUBOID, JAVA_CMD_HADOOP_NO_MR_DICTIONARY, JAVA_CMD_HADDOP_NO_MR_CREATEHTABLE, JAVA_CMD_HADOOP_NO_MR_BULKLOAD
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
deleted file mode 100644
index 08ee79a..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-public enum JobStepStatusEnum {
-    NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16), WAITING(32), KILLED(64);
-
-    private final int code;
-
-    private JobStepStatusEnum(int statusCode) {
-        this.code = statusCode;
-    }
-
-    public static JobStepStatusEnum getByCode(int statusCode) {
-        for (JobStepStatusEnum status : values()) {
-            if (status.getCode() == statusCode) {
-                return status;
-            }
-        }
-
-        return null;
-    }
-
-    public int getCode() {
-        return this.code;
-    }
-
-    public boolean isComplete() {
-        return code == JobStepStatusEnum.FINISHED.getCode() || code == JobStepStatusEnum.ERROR.getCode() || code == JobStepStatusEnum.DISCARDED.getCode();
-    }
-
-    public boolean isRunable() {
-        return code == JobStepStatusEnum.PENDING.getCode() || code == JobStepStatusEnum.ERROR.getCode();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
deleted file mode 100644
index 4862bb1..0000000
--- a/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.dao;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.RawResource;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.job.exception.PersistentException;
-import org.apache.kylin.metadata.MetadataManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- */
-public class ExecutableDao {
-
-    private static final Serializer<ExecutablePO> JOB_SERIALIZER = new JsonSerializer<ExecutablePO>(ExecutablePO.class);
-    private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class);
-    private static final Logger logger = LoggerFactory.getLogger(ExecutableDao.class);
-    private static final ConcurrentHashMap<KylinConfig, ExecutableDao> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableDao>();
-    public static final String JOB_PATH_ROOT = "/execute";
-    public static final String JOB_OUTPUT_ROOT = "/execute_output";
-
-    private ResourceStore store;
-
-    public static ExecutableDao getInstance(KylinConfig config) {
-        ExecutableDao r = CACHE.get(config);
-        if (r == null) {
-            r = new ExecutableDao(config);
-            CACHE.put(config, r);
-            if (CACHE.size() > 1) {
-                logger.warn("More than one singleton exist");
-            }
-
-        }
-        return r;
-    }
-
-    private ExecutableDao(KylinConfig config) {
-        logger.info("Using metadata url: " + config);
-        this.store = MetadataManager.getInstance(config).getStore();
-    }
-
-    private String pathOfJob(ExecutablePO job) {
-        return pathOfJob(job.getUuid());
-    }
-    private String pathOfJob(String uuid) {
-        return JOB_PATH_ROOT + "/" + uuid;
-    }
-
-    private String pathOfJobOutput(String uuid) {
-        return JOB_OUTPUT_ROOT + "/" + uuid;
-    }
-
-    private ExecutablePO readJobResource(String path) throws IOException {
-        return store.getResource(path, ExecutablePO.class, JOB_SERIALIZER);
-    }
-
-    private void writeJobResource(String path, ExecutablePO job) throws IOException {
-        store.putResource(path, job, JOB_SERIALIZER);
-    }
-
-    private ExecutableOutputPO readJobOutputResource(String path) throws IOException {
-        return store.getResource(path, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER);
-    }
-
-    private long writeJobOutputResource(String path, ExecutableOutputPO output) throws IOException {
-        return store.putResource(path, output, JOB_OUTPUT_SERIALIZER);
-    }
-
-    public List<ExecutableOutputPO> getJobOutputs() throws PersistentException {
-        try {
-            ArrayList<String> resources = store.listResources(JOB_OUTPUT_ROOT);
-            if (resources == null || resources.isEmpty()) {
-                return Collections.emptyList();
-            }
-            Collections.sort(resources);
-            String rangeStart = resources.get(0);
-            String rangeEnd = resources.get(resources.size() - 1);
-            return store.getAllResources(rangeStart, rangeEnd, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER);
-        } catch (IOException e) {
-            logger.error("error get all Jobs:", e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public List<ExecutablePO> getJobs() throws PersistentException {
-        try {
-            final List<String> jobIds = store.listResources(JOB_PATH_ROOT);
-            if (jobIds == null || jobIds.isEmpty()) {
-                return Collections.emptyList();
-            }
-            Collections.sort(jobIds);
-            String rangeStart = jobIds.get(0);
-            String rangeEnd = jobIds.get(jobIds.size() - 1);
-            return store.getAllResources(rangeStart, rangeEnd, ExecutablePO.class, JOB_SERIALIZER);
-        } catch (IOException e) {
-            logger.error("error get all Jobs:", e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public List<String> getJobIds() throws PersistentException {
-        try {
-            ArrayList<String> resources = store.listResources(JOB_PATH_ROOT);
-            if (resources == null) {
-                return Collections.emptyList();
-            }
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(resources.size());
-            for (String path : resources) {
-                result.add(path.substring(path.lastIndexOf("/") + 1));
-            }
-            return result;
-        } catch (IOException e) {
-            logger.error("error get all Jobs:", e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public ExecutablePO getJob(String uuid) throws PersistentException {
-        try {
-            return readJobResource(pathOfJob(uuid));
-        } catch (IOException e) {
-            logger.error("error get job:" + uuid, e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public ExecutablePO addJob(ExecutablePO job) throws PersistentException {
-        try {
-            if (getJob(job.getUuid()) != null) {
-                throw new IllegalArgumentException("job id:" + job.getUuid() + " already exists");
-            }
-            writeJobResource(pathOfJob(job), job);
-            return job;
-        } catch (IOException e) {
-            logger.error("error save job:" + job.getUuid(), e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public void deleteJob(String uuid) throws PersistentException {
-        try {
-            store.deleteResource(pathOfJob(uuid));
-        } catch (IOException e) {
-            logger.error("error delete job:" + uuid, e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public ExecutableOutputPO getJobOutput(String uuid) throws PersistentException {
-        try {
-            ExecutableOutputPO result = readJobOutputResource(pathOfJobOutput(uuid));
-            if (result == null) {
-                result = new ExecutableOutputPO();
-                result.setUuid(uuid);
-                return result;
-            }
-            return result;
-        } catch (IOException e) {
-            logger.error("error get job output id:" + uuid, e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public void addJobOutput(ExecutableOutputPO output) throws PersistentException {
-        try {
-            output.setLastModified(0);
-            writeJobOutputResource(pathOfJobOutput(output.getUuid()), output);
-        } catch (IOException e) {
-            logger.error("error update job output id:" + output.getUuid(), e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public void updateJobOutput(ExecutableOutputPO output) throws PersistentException {
-        try {
-            final long ts = writeJobOutputResource(pathOfJobOutput(output.getUuid()), output);
-            output.setLastModified(ts);
-        } catch (IOException e) {
-            logger.error("error update job output id:" + output.getUuid(), e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public void deleteJobOutput(String uuid) throws PersistentException {
-        try {
-            store.deleteResource(pathOfJobOutput(uuid));
-        } catch (IOException e) {
-            logger.error("error delete job:" + uuid, e);
-            throw new PersistentException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java b/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
deleted file mode 100644
index 4dacd8a..0000000
--- a/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.dao;
-
-import java.util.Map;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-
-/**
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class ExecutableOutputPO extends RootPersistentEntity {
-
-    @JsonProperty("content")
-    private String content;
-
-    @JsonProperty("status")
-    private String status = "READY";
-
-    @JsonProperty("info")
-    private Map<String, String> info = Maps.newHashMap();
-
-    public String getContent() {
-        return content;
-    }
-
-    public void setContent(String content) {
-        this.content = content;
-    }
-
-    public String getStatus() {
-        return status;
-    }
-
-    public void setStatus(String status) {
-        this.status = status;
-    }
-
-    public Map<String, String> getInfo() {
-        return info;
-    }
-
-    public void setInfo(Map<String, String> info) {
-        this.info = info;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java b/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
deleted file mode 100644
index 6a17b29..0000000
--- a/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.dao;
-
-import java.util.List;
-import java.util.Map;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-
-/**
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class ExecutablePO extends RootPersistentEntity {
-
-    @JsonProperty("name")
-    private String name;
-
-    @JsonProperty("tasks")
-    private List<ExecutablePO> tasks;
-
-    @JsonProperty("type")
-    private String type;
-
-    @JsonProperty("params")
-    private Map<String, String> params = Maps.newHashMap();
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public List<ExecutablePO> getTasks() {
-        return tasks;
-    }
-
-    public void setTasks(List<ExecutablePO> tasks) {
-        this.tasks = tasks;
-    }
-
-    public String getType() {
-        return type;
-    }
-
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    public Map<String, String> getParams() {
-        return params;
-    }
-
-    public void setParams(Map<String, String> params) {
-        this.params = params;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java b/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
index 1aeb50f..b03cb5f 100644
--- a/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
@@ -27,8 +27,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-
-import org.apache.kylin.job.tools.LZOSupportnessChecker;
+import org.apache.kylin.storage.hbase.util.LZOSupportnessChecker;
 
 /**
  * <p/>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
deleted file mode 100644
index 2eb9b31..0000000
--- a/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.engine;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.tools.OptionsHelper;
-import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-
-/**
- * @author ysong1
- */
-public class JobEngineConfig {
-    private static final Logger logger = LoggerFactory.getLogger(JobEngineConfig.class);
-    public static String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf";
-    public static String HIVE_CONF_FILENAME = "kylin_hive_conf";
-
-    private static File getJobConfig(String fileName) {
-        String path = System.getProperty(KylinConfig.KYLIN_CONF);
-        if (StringUtils.isNotEmpty(path)) {
-            return new File(path, fileName);
-        }
-
-        path = KylinConfig.getKylinHome();
-        if (StringUtils.isNotEmpty(path)) {
-            return new File(path + File.separator + "conf", fileName);
-        }
-        return null;
-    }
-
-    private String getHadoopJobConfFilePath(RealizationCapacity capaticy, boolean appendSuffix) throws IOException {
-        String hadoopJobConfFile;
-        if (appendSuffix) {
-            hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + "_" + capaticy.toString().toLowerCase() + ".xml");
-        } else {
-            hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + ".xml");
-        }
-
-        File jobConfig = getJobConfig(hadoopJobConfFile);
-        if (jobConfig == null || !jobConfig.exists()) {
-            logger.warn("fail to locate " + hadoopJobConfFile + ", trying to locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
-            jobConfig = getJobConfig(HADOOP_JOB_CONF_FILENAME + ".xml");
-            if (jobConfig == null || !jobConfig.exists()) {
-                logger.error("fail to locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
-                throw new RuntimeException("fail to locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
-            }
-        }
-        return OptionsHelper.convertToFileURL(jobConfig.getAbsolutePath());
-    }
-
-    public String getHadoopJobConfFilePath(RealizationCapacity capaticy) throws IOException {
-        String path = getHadoopJobConfFilePath(capaticy, true);
-        if (!StringUtils.isEmpty(path)) {
-            logger.info("Chosen job conf is : " + path);
-            return path;
-        } else {
-            path = getHadoopJobConfFilePath(capaticy, false);
-            if (!StringUtils.isEmpty(path)) {
-                logger.info("Chosen job conf is : " + path);
-                return path;
-            }
-        }
-        return "";
-    }
-
-
-    public String getHiveConfFilePath() throws IOException {
-        String hiveConfFile = (HIVE_CONF_FILENAME + ".xml");
-
-        File jobConfig = getJobConfig(hiveConfFile);
-        if (jobConfig == null || !jobConfig.exists()) {
-
-            logger.error("fail to locate " + HIVE_CONF_FILENAME + ".xml");
-            throw new RuntimeException("fail to locate " + HIVE_CONF_FILENAME + ".xml");
-        }
-        return OptionsHelper.convertToFileURL(jobConfig.getAbsolutePath());
-    }
-
-    // there should be no setters
-    private final KylinConfig config;
-
-    public JobEngineConfig(KylinConfig config) {
-        this.config = config;
-    }
-
-    public KylinConfig getConfig() {
-        return config;
-    }
-
-    public String getHdfsWorkingDirectory() {
-        return config.getHdfsWorkingDirectory();
-    }
-    
-    /**
-     * @return the maxConcurrentJobLimit
-     */
-    public int getMaxConcurrentJobLimit() {
-        return config.getMaxConcurrentJobLimit();
-    }
-
-    /**
-     * @return the timeZone
-     */
-    public String getTimeZone() {
-        return config.getTimeZone();
-    }
-
-    /**
-     * @return the adminDls
-     */
-    public String getAdminDls() {
-        return config.getAdminDls();
-    }
-
-    /**
-     * @return the jobStepTimeout
-     */
-    public long getJobStepTimeout() {
-        return config.getJobStepTimeout();
-    }
-
-    /**
-     * @return the asyncJobCheckInterval
-     */
-    public int getAsyncJobCheckInterval() {
-        return config.getYarnStatusCheckIntervalSeconds();
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see java.lang.Object#hashCode()
-     */
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((config == null) ? 0 : config.hashCode());
-        return result;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see java.lang.Object#equals(java.lang.Object)
-     */
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        JobEngineConfig other = (JobEngineConfig) obj;
-        if (config == null) {
-            if (other.config != null)
-                return false;
-        } else if (!config.equals(other.config))
-            return false;
-        return true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java b/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
deleted file mode 100644
index 8544fff..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- */
-public class ExecuteException extends Exception {
-
-    private static final long serialVersionUID = 5677121412192984281L;
-
-    public ExecuteException() {
-    }
-
-    public ExecuteException(String message) {
-        super(message);
-    }
-
-    public ExecuteException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public ExecuteException(Throwable cause) {
-        super(cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java b/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
deleted file mode 100644
index f19b0ca..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- */
-public class IllegalStateTranferException extends RuntimeException {
-
-    private static final long serialVersionUID = 8466551519300132702L;
-
-    public IllegalStateTranferException() {
-    }
-
-    public IllegalStateTranferException(String message) {
-        super(message);
-    }
-
-    public IllegalStateTranferException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public IllegalStateTranferException(Throwable cause) {
-        super(cause);
-    }
-
-    public IllegalStateTranferException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
-        super(message, cause, enableSuppression, writableStackTrace);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/exception/JobException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/JobException.java b/job/src/main/java/org/apache/kylin/job/exception/JobException.java
deleted file mode 100644
index ba4c52a..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/JobException.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- * @author xduo
- * 
- */
-public class JobException extends Exception {
-
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * 
-     */
-    public JobException() {
-        super();
-    }
-
-    /**
-     * @param message
-     * @param cause
-     */
-    public JobException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    /**
-     * @param message
-     */
-    public JobException(String message) {
-        super(message);
-    }
-
-    /**
-     * @param cause
-     */
-    public JobException(Throwable cause) {
-        super(cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/exception/LockException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/LockException.java b/job/src/main/java/org/apache/kylin/job/exception/LockException.java
deleted file mode 100644
index cf43ac9..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/LockException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- */
-public class LockException extends Exception {
-    private static final long serialVersionUID = 2072745879281754945L;
-
-    public LockException() {
-    }
-
-    public LockException(String message) {
-        super(message);
-    }
-
-    public LockException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public LockException(Throwable cause) {
-        super(cause);
-    }
-
-    public LockException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
-        super(message, cause, enableSuppression, writableStackTrace);
-    }
-}