You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/08 09:05:15 UTC

kylin git commit: KYLIN-2169 Refactor AbstractExecutable to respect KylinConfig

Repository: kylin
Updated Branches:
  refs/heads/master 637581fb7 -> 9615b4ea6


KYLIN-2169 Refactor AbstractExecutable to respect KylinConfig


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9615b4ea
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9615b4ea
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9615b4ea

Branch: refs/heads/master
Commit: 9615b4ea6f817606c93df73dcafdcb151f4e8632
Parents: 637581f
Author: Li Yang <li...@apache.org>
Authored: Tue Nov 8 15:47:33 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Tue Nov 8 17:02:38 2016 +0800

----------------------------------------------------------------------
 .../kylin/job/common/ShellExecutable.java       |   2 +-
 .../kylin/job/execution/AbstractExecutable.java |  37 +-
 .../job/execution/DefaultChainedExecutable.java |   1 -
 .../kylin/job/execution/ExecutableManager.java  | 372 ++++++++++++++++++
 .../job/impl/threadpool/DefaultScheduler.java   |   2 +-
 .../kylin/job/manager/ExecutableManager.java    | 377 -------------------
 .../apache/kylin/job/ExecutableManagerTest.java |   2 +-
 .../job/impl/threadpool/BaseSchedulerTest.java  |   2 +-
 .../org/apache/kylin/engine/mr/CubingJob.java   |   2 +-
 .../engine/mr/common/MapReduceExecutable.java   |  16 +-
 .../apache/kylin/engine/mr/steps/CuboidJob.java |   2 +-
 .../kylin/engine/mr/steps/InMemCuboidJob.java   |   2 +-
 .../engine/mr/steps/SaveStatisticsStep.java     |   2 +-
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java  |   2 +-
 .../mr/steps/UpdateCubeInfoAfterMergeStep.java  |   2 +-
 .../kylin/provision/BuildCubeWithEngine.java    |   2 +-
 .../kylin/provision/BuildCubeWithStream.java    |   2 +-
 .../apache/kylin/rest/service/BasicService.java |   2 +-
 .../storage/hbase/util/StorageCleanupJob.java   |   2 +-
 .../apache/kylin/tool/JobInstanceExtractor.java |   2 +-
 .../apache/kylin/tool/StorageCleanupJob.java    |   2 +-
 21 files changed, 419 insertions(+), 416 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
index 111c1ba..a68f242 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
@@ -52,7 +52,7 @@ public class ShellExecutable extends AbstractExecutable {
             logger.info("executing:" + getCmd());
             final ShellExecutableLogger logger = new ShellExecutableLogger();
             final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), logger);
-            executableManager.addJobInfo(getId(), logger.getInfo());
+            getManager().addJobInfo(getId(), logger.getInfo());
             return new ExecuteResult(result.getFirst() == 0 ? ExecuteResult.State.SUCCEED : ExecuteResult.State.FAILED, result.getSecond());
         } catch (IOException e) {
             logger.error("job:" + getId() + " execute finished with exception", e);

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 90e4d3c..f7b8a7c 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -32,7 +32,6 @@ import org.apache.kylin.common.util.MailService;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.exception.PersistentException;
 import org.apache.kylin.job.impl.threadpool.DefaultContext;
-import org.apache.kylin.job.manager.ExecutableManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,43 +52,51 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
     protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class);
     protected int retry = 0;
 
+    private KylinConfig config;
     private String name;
     private String id;
     private Map<String, String> params = Maps.newHashMap();
 
-    protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-
     public AbstractExecutable() {
         setId(UUID.randomUUID().toString());
     }
+    
+    void initConfig(KylinConfig config) {
+        Preconditions.checkState(this.config == null || this.config == config);
+        this.config = config;
+    }
+    
+    protected ExecutableManager getManager() {
+        return ExecutableManager.getInstance(config);
+    }
 
     protected void onExecuteStart(ExecutableContext executableContext) {
         Map<String, String> info = Maps.newHashMap();
         info.put(START_TIME, Long.toString(System.currentTimeMillis()));
-        executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
+        getManager().updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
     }
 
     protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
         setEndTime(System.currentTimeMillis());
         if (!isDiscarded()) {
             if (result.succeed()) {
-                executableManager.updateJobOutput(getId(), ExecutableState.SUCCEED, null, result.output());
+                getManager().updateJobOutput(getId(), ExecutableState.SUCCEED, null, result.output());
             } else {
-                executableManager.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output());
+                getManager().updateJobOutput(getId(), ExecutableState.ERROR, null, result.output());
             }
         }
     }
 
     protected void onExecuteError(Throwable exception, ExecutableContext executableContext) {
         if (!isDiscarded()) {
-            executableManager.addJobInfo(getId(), END_TIME, Long.toString(System.currentTimeMillis()));
+            getManager().addJobInfo(getId(), END_TIME, Long.toString(System.currentTimeMillis()));
             String output = null;
             if (exception != null) {
                 final StringWriter out = new StringWriter();
                 exception.printStackTrace(new PrintWriter(out));
                 output = out.toString();
             }
-            executableManager.updateJobOutput(getId(), ExecutableState.ERROR, null, output);
+            getManager().updateJobOutput(getId(), ExecutableState.ERROR, null, output);
         }
     }
 
@@ -190,7 +197,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
 
     @Override
     public final ExecutableState getStatus() {
-        return executableManager.getOutput(this.getId()).getState();
+        return getManager().getOutput(this.getId()).getState();
     }
 
     @Override
@@ -211,7 +218,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
     }
 
     public final long getLastModified() {
-        return executableManager.getOutput(getId()).getLastModified();
+        return getOutput().getLastModified();
     }
 
     public final void setSubmitter(String submitter) {
@@ -298,11 +305,11 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
 
     @Override
     public final Output getOutput() {
-        return executableManager.getOutput(getId());
+        return getManager().getOutput(getId());
     }
 
     protected long getExtraInfoAsLong(String key, long defaultValue) {
-        return getExtraInfoAsLong(executableManager.getOutput(getId()), key, defaultValue);
+        return getExtraInfoAsLong(getOutput(), key, defaultValue);
     }
 
     public static long getStartTime(Output output) {
@@ -334,11 +341,11 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
     }
 
     protected final void addExtraInfo(String key, String value) {
-        executableManager.addJobInfo(getId(), key, value);
+        getManager().addJobInfo(getId(), key, value);
     }
 
     protected final Map<String, String> getExtraInfo() {
-        return executableManager.getOutput(getId()).getExtra();
+        return getOutput().getExtra();
     }
 
     public final void setStartTime(long time) {
@@ -366,7 +373,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
     *
     * */
     protected final boolean isDiscarded() {
-        final ExecutableState status = executableManager.getOutput(getId()).getState();
+        final ExecutableState status = getOutput().getState();
         return status == ExecutableState.DISCARDED;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 39a5f4f..edc8189 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -23,7 +23,6 @@ import java.util.Map;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.manager.ExecutableManager;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
new file mode 100644
index 0000000..0901443
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.execution;
+
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.job.dao.ExecutableDao;
+import org.apache.kylin.job.dao.ExecutableOutputPO;
+import org.apache.kylin.job.dao.ExecutablePO;
+import org.apache.kylin.job.exception.IllegalStateTranferException;
+import org.apache.kylin.job.exception.PersistentException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ */
+public class ExecutableManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(ExecutableManager.class);
+    private static final ConcurrentHashMap<KylinConfig, ExecutableManager> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableManager>();
+    
+    private final KylinConfig config;
+    private final ExecutableDao executableDao;
+
+    public static ExecutableManager getInstance(KylinConfig config) {
+        ExecutableManager r = CACHE.get(config);
+        if (r == null) {
+            synchronized (ExecutableManager.class) {
+                r = CACHE.get(config);
+                if (r == null) {
+                    r = new ExecutableManager(config);
+                    CACHE.put(config, r);
+                    if (CACHE.size() > 1) {
+                        logger.warn("More than one singleton exist");
+                    }
+                }
+            }
+        }
+        return r;
+    }
+
+    private ExecutableManager(KylinConfig config) {
+        logger.info("Using metadata url: " + config);
+        this.config = config;
+        this.executableDao = ExecutableDao.getInstance(config);
+    }
+
+    public void addJob(AbstractExecutable executable) {
+        try {
+            executable.initConfig(config);
+            executableDao.addJob(parse(executable));
+            addJobOutput(executable);
+        } catch (PersistentException e) {
+            logger.error("fail to submit job:" + executable.getId(), e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void addJobOutput(AbstractExecutable executable) throws PersistentException {
+        ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
+        executableOutputPO.setUuid(executable.getId());
+        executableDao.addJobOutput(executableOutputPO);
+        if (executable instanceof DefaultChainedExecutable) {
+            for (AbstractExecutable subTask : ((DefaultChainedExecutable) executable).getTasks()) {
+                addJobOutput(subTask);
+            }
+        }
+    }
+
+    //for ut
+    public void deleteJob(String jobId) {
+        try {
+            executableDao.deleteJob(jobId);
+        } catch (PersistentException e) {
+            logger.error("fail to delete job:" + jobId, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public AbstractExecutable getJob(String uuid) {
+        try {
+            return parseTo(executableDao.getJob(uuid));
+        } catch (PersistentException e) {
+            logger.error("fail to get job:" + uuid, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Output getOutput(String uuid) {
+        try {
+            final ExecutableOutputPO jobOutput = executableDao.getJobOutput(uuid);
+            Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + uuid);
+            return parseOutput(jobOutput);
+        } catch (PersistentException e) {
+            logger.error("fail to get job output:" + uuid, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private DefaultOutput parseOutput(ExecutableOutputPO jobOutput) {
+        final DefaultOutput result = new DefaultOutput();
+        result.setExtra(jobOutput.getInfo());
+        result.setState(ExecutableState.valueOf(jobOutput.getStatus()));
+        result.setVerboseMsg(jobOutput.getContent());
+        result.setLastModified(jobOutput.getLastModified());
+        return result;
+    }
+
+    public Map<String, Output> getAllOutputs() {
+        try {
+            final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
+            HashMap<String, Output> result = Maps.newHashMap();
+            for (ExecutableOutputPO jobOutput : jobOutputs) {
+                result.put(jobOutput.getId(), parseOutput(jobOutput));
+            }
+            return result;
+        } catch (PersistentException e) {
+            logger.error("fail to get all job output:", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Map<String, Output> getAllOutputs(long timeStartInMillis, long timeEndInMillis) {
+        try {
+            final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(timeStartInMillis, timeEndInMillis);
+            HashMap<String, Output> result = Maps.newHashMap();
+            for (ExecutableOutputPO jobOutput : jobOutputs) {
+                result.put(jobOutput.getId(), parseOutput(jobOutput));
+            }
+            return result;
+        } catch (PersistentException e) {
+            logger.error("fail to get all job output:", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public List<AbstractExecutable> getAllExecutables() {
+        try {
+            List<AbstractExecutable> ret = Lists.newArrayList();
+            for (ExecutablePO po : executableDao.getJobs()) {
+                try {
+                    AbstractExecutable ae = parseTo(po);
+                    ret.add(ae);
+                } catch (IllegalArgumentException e) {
+                    logger.error("error parsing one executabePO: ", e);
+                }
+            }
+            return ret;
+        } catch (PersistentException e) {
+            logger.error("error get All Jobs", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public List<AbstractExecutable> getAllExecutables(long timeStartInMillis, long timeEndInMillis) {
+        try {
+            List<AbstractExecutable> ret = Lists.newArrayList();
+            for (ExecutablePO po : executableDao.getJobs(timeStartInMillis, timeEndInMillis)) {
+                try {
+                    AbstractExecutable ae = parseTo(po);
+                    ret.add(ae);
+                } catch (IllegalArgumentException e) {
+                    logger.error("error parsing one executabePO: ", e);
+                }
+            }
+            return ret;
+        } catch (PersistentException e) {
+            logger.error("error get All Jobs", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public List<String> getAllJobIds() {
+        try {
+            return executableDao.getJobIds();
+        } catch (PersistentException e) {
+            logger.error("error get All Job Ids", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void updateAllRunningJobsToError() {
+        try {
+            final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
+            for (ExecutableOutputPO executableOutputPO : jobOutputs) {
+                if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) {
+                    executableOutputPO.setStatus(ExecutableState.ERROR.toString());
+                    executableDao.updateJobOutput(executableOutputPO);
+                }
+            }
+        } catch (PersistentException e) {
+            logger.error("error reset job status from RUNNING to ERROR", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void resumeAllRunningJobs() {
+        try {
+            final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
+            for (ExecutableOutputPO executableOutputPO : jobOutputs) {
+                if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) {
+                    executableOutputPO.setStatus(ExecutableState.READY.toString());
+                    executableDao.updateJobOutput(executableOutputPO);
+                }
+            }
+        } catch (PersistentException e) {
+            logger.error("error reset job status from RUNNING to READY", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void resumeJob(String jobId) {
+        AbstractExecutable job = getJob(jobId);
+        if (job == null) {
+            return;
+        }
+        if (job instanceof DefaultChainedExecutable) {
+            List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
+            for (AbstractExecutable task : tasks) {
+                if (task.getStatus() == ExecutableState.ERROR) {
+                    updateJobOutput(task.getId(), ExecutableState.READY, null, null);
+                    break;
+                }
+            }
+        }
+        updateJobOutput(jobId, ExecutableState.READY, null, null);
+    }
+
+    public void discardJob(String jobId) {
+        AbstractExecutable job = getJob(jobId);
+        if (job instanceof DefaultChainedExecutable) {
+            List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
+            for (AbstractExecutable task : tasks) {
+                if (!task.getStatus().isFinalState()) {
+                    updateJobOutput(task.getId(), ExecutableState.DISCARDED, null, null);
+                }
+            }
+        }
+        updateJobOutput(jobId, ExecutableState.DISCARDED, null, null);
+    }
+
+    public void updateJobOutput(String jobId, ExecutableState newStatus, Map<String, String> info, String output) {
+        try {
+            final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId);
+            Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + jobId);
+            ExecutableState oldStatus = ExecutableState.valueOf(jobOutput.getStatus());
+            if (newStatus != null && oldStatus != newStatus) {
+                if (!ExecutableState.isValidStateTransfer(oldStatus, newStatus)) {
+                    throw new IllegalStateTranferException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus + ", job id: " + jobId);
+                }
+                jobOutput.setStatus(newStatus.toString());
+            }
+            if (info != null) {
+                jobOutput.setInfo(info);
+            }
+            if (output != null) {
+                jobOutput.setContent(output);
+            }
+            executableDao.updateJobOutput(jobOutput);
+            logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus);
+        } catch (PersistentException e) {
+            logger.error("error change job:" + jobId + " to " + newStatus.toString());
+            throw new RuntimeException(e);
+        }
+    }
+
+    //for migration only
+    //TODO delete when migration finished
+    public void resetJobOutput(String jobId, ExecutableState state, String output) {
+        try {
+            final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId);
+            jobOutput.setStatus(state.toString());
+            if (output != null) {
+                jobOutput.setContent(output);
+            }
+            executableDao.updateJobOutput(jobOutput);
+        } catch (PersistentException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void addJobInfo(String id, Map<String, String> info) {
+        if (info == null) {
+            return;
+        }
+        try {
+            ExecutableOutputPO output = executableDao.getJobOutput(id);
+            Preconditions.checkArgument(output != null, "there is no related output for job id:" + id);
+            output.getInfo().putAll(info);
+            executableDao.updateJobOutput(output);
+        } catch (PersistentException e) {
+            logger.error("error update job info, id:" + id + "  info:" + info.toString());
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void addJobInfo(String id, String key, String value) {
+        Map<String, String> info = Maps.newHashMap();
+        info.put(key, value);
+        addJobInfo(id, info);
+    }
+
+    private static ExecutablePO parse(AbstractExecutable executable) {
+        ExecutablePO result = new ExecutablePO();
+        result.setName(executable.getName());
+        result.setUuid(executable.getId());
+        result.setType(executable.getClass().getName());
+        result.setParams(executable.getParams());
+        if (executable instanceof ChainedExecutable) {
+            List<ExecutablePO> tasks = Lists.newArrayList();
+            for (AbstractExecutable task : ((ChainedExecutable) executable).getTasks()) {
+                tasks.add(parse(task));
+            }
+            result.setTasks(tasks);
+        }
+        return result;
+    }
+
+    private AbstractExecutable parseTo(ExecutablePO executablePO) {
+        if (executablePO == null) {
+            logger.warn("executablePO is null");
+            return null;
+        }
+        String type = executablePO.getType();
+        try {
+            Class<? extends AbstractExecutable> clazz = ClassUtil.forName(type, AbstractExecutable.class);
+            Constructor<? extends AbstractExecutable> constructor = clazz.getConstructor();
+            AbstractExecutable result = constructor.newInstance();
+            result.initConfig(config);
+            result.setId(executablePO.getUuid());
+            result.setName(executablePO.getName());
+            result.setParams(executablePO.getParams());
+            List<ExecutablePO> tasks = executablePO.getTasks();
+            if (tasks != null && !tasks.isEmpty()) {
+                Preconditions.checkArgument(result instanceof ChainedExecutable);
+                for (ExecutablePO subTask : tasks) {
+                    ((ChainedExecutable) result).addTask(parseTo(subTask));
+                }
+            }
+            return result;
+        } catch (ReflectiveOperationException e) {
+            throw new IllegalStateException("cannot parse this job:" + executablePO.getId(), e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 1ea3be0..9d5f7ba 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -36,10 +36,10 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.exception.SchedulerException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
 import org.apache.kylin.job.lock.JobLock;
-import org.apache.kylin.job.manager.ExecutableManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
deleted file mode 100644
index d42b924..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.manager;
-
-import java.lang.reflect.Constructor;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.dao.ExecutableDao;
-import org.apache.kylin.job.dao.ExecutableOutputPO;
-import org.apache.kylin.job.dao.ExecutablePO;
-import org.apache.kylin.job.exception.IllegalStateTranferException;
-import org.apache.kylin.job.exception.PersistentException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ChainedExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.DefaultOutput;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.Output;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- */
-public class ExecutableManager {
-
-    private static final Logger logger = LoggerFactory.getLogger(ExecutableManager.class);
-    private static final ConcurrentHashMap<KylinConfig, ExecutableManager> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableManager>();
-    @SuppressWarnings("unused")
-    private final KylinConfig config;
-
-    private ExecutableDao executableDao;
-
-    public static ExecutableManager getInstance(KylinConfig config) {
-        ExecutableManager r = CACHE.get(config);
-        if (r == null) {
-            synchronized (ExecutableManager.class) {
-                r = CACHE.get(config);
-                if (r == null) {
-                    r = new ExecutableManager(config);
-                    CACHE.put(config, r);
-                    if (CACHE.size() > 1) {
-                        logger.warn("More than one singleton exist");
-                    }
-                }
-            }
-        }
-        return r;
-    }
-
-    private ExecutableManager(KylinConfig config) {
-        logger.info("Using metadata url: " + config);
-        this.config = config;
-        this.executableDao = ExecutableDao.getInstance(config);
-    }
-
-    public void addJob(AbstractExecutable executable) {
-        try {
-            executableDao.addJob(parse(executable));
-            addJobOutput(executable);
-        } catch (PersistentException e) {
-            logger.error("fail to submit job:" + executable.getId(), e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    private void addJobOutput(AbstractExecutable executable) throws PersistentException {
-        ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
-        executableOutputPO.setUuid(executable.getId());
-        executableDao.addJobOutput(executableOutputPO);
-        if (executable instanceof DefaultChainedExecutable) {
-            for (AbstractExecutable subTask : ((DefaultChainedExecutable) executable).getTasks()) {
-                addJobOutput(subTask);
-            }
-        }
-    }
-
-    //for ut
-    public void deleteJob(String jobId) {
-        try {
-            executableDao.deleteJob(jobId);
-        } catch (PersistentException e) {
-            logger.error("fail to delete job:" + jobId, e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public AbstractExecutable getJob(String uuid) {
-        try {
-            return parseTo(executableDao.getJob(uuid));
-        } catch (PersistentException e) {
-            logger.error("fail to get job:" + uuid, e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public Output getOutput(String uuid) {
-        try {
-            final ExecutableOutputPO jobOutput = executableDao.getJobOutput(uuid);
-            Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + uuid);
-            return parseOutput(jobOutput);
-        } catch (PersistentException e) {
-            logger.error("fail to get job output:" + uuid, e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    private DefaultOutput parseOutput(ExecutableOutputPO jobOutput) {
-        final DefaultOutput result = new DefaultOutput();
-        result.setExtra(jobOutput.getInfo());
-        result.setState(ExecutableState.valueOf(jobOutput.getStatus()));
-        result.setVerboseMsg(jobOutput.getContent());
-        result.setLastModified(jobOutput.getLastModified());
-        return result;
-    }
-
-    public Map<String, Output> getAllOutputs() {
-        try {
-            final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
-            HashMap<String, Output> result = Maps.newHashMap();
-            for (ExecutableOutputPO jobOutput : jobOutputs) {
-                result.put(jobOutput.getId(), parseOutput(jobOutput));
-            }
-            return result;
-        } catch (PersistentException e) {
-            logger.error("fail to get all job output:", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public Map<String, Output> getAllOutputs(long timeStartInMillis, long timeEndInMillis) {
-        try {
-            final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(timeStartInMillis, timeEndInMillis);
-            HashMap<String, Output> result = Maps.newHashMap();
-            for (ExecutableOutputPO jobOutput : jobOutputs) {
-                result.put(jobOutput.getId(), parseOutput(jobOutput));
-            }
-            return result;
-        } catch (PersistentException e) {
-            logger.error("fail to get all job output:", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public List<AbstractExecutable> getAllExecutables() {
-        try {
-            List<AbstractExecutable> ret = Lists.newArrayList();
-            for (ExecutablePO po : executableDao.getJobs()) {
-                try {
-                    AbstractExecutable ae = parseTo(po);
-                    ret.add(ae);
-                } catch (IllegalArgumentException e) {
-                    logger.error("error parsing one executabePO: ", e);
-                }
-            }
-            return ret;
-        } catch (PersistentException e) {
-            logger.error("error get All Jobs", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public List<AbstractExecutable> getAllExecutables(long timeStartInMillis, long timeEndInMillis) {
-        try {
-            List<AbstractExecutable> ret = Lists.newArrayList();
-            for (ExecutablePO po : executableDao.getJobs(timeStartInMillis, timeEndInMillis)) {
-                try {
-                    AbstractExecutable ae = parseTo(po);
-                    ret.add(ae);
-                } catch (IllegalArgumentException e) {
-                    logger.error("error parsing one executabePO: ", e);
-                }
-            }
-            return ret;
-        } catch (PersistentException e) {
-            logger.error("error get All Jobs", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public List<String> getAllJobIds() {
-        try {
-            return executableDao.getJobIds();
-        } catch (PersistentException e) {
-            logger.error("error get All Job Ids", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void updateAllRunningJobsToError() {
-        try {
-            final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
-            for (ExecutableOutputPO executableOutputPO : jobOutputs) {
-                if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) {
-                    executableOutputPO.setStatus(ExecutableState.ERROR.toString());
-                    executableDao.updateJobOutput(executableOutputPO);
-                }
-            }
-        } catch (PersistentException e) {
-            logger.error("error reset job status from RUNNING to ERROR", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void resumeAllRunningJobs() {
-        try {
-            final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
-            for (ExecutableOutputPO executableOutputPO : jobOutputs) {
-                if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) {
-                    executableOutputPO.setStatus(ExecutableState.READY.toString());
-                    executableDao.updateJobOutput(executableOutputPO);
-                }
-            }
-        } catch (PersistentException e) {
-            logger.error("error reset job status from RUNNING to READY", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void resumeJob(String jobId) {
-        AbstractExecutable job = getJob(jobId);
-        if (job == null) {
-            return;
-        }
-        if (job instanceof DefaultChainedExecutable) {
-            List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
-            for (AbstractExecutable task : tasks) {
-                if (task.getStatus() == ExecutableState.ERROR) {
-                    updateJobOutput(task.getId(), ExecutableState.READY, null, null);
-                    break;
-                }
-            }
-        }
-        updateJobOutput(jobId, ExecutableState.READY, null, null);
-    }
-
-    public void discardJob(String jobId) {
-        AbstractExecutable job = getJob(jobId);
-        if (job instanceof DefaultChainedExecutable) {
-            List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
-            for (AbstractExecutable task : tasks) {
-                if (!task.getStatus().isFinalState()) {
-                    updateJobOutput(task.getId(), ExecutableState.DISCARDED, null, null);
-                }
-            }
-        }
-        updateJobOutput(jobId, ExecutableState.DISCARDED, null, null);
-    }
-
-    public void updateJobOutput(String jobId, ExecutableState newStatus, Map<String, String> info, String output) {
-        try {
-            final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId);
-            Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + jobId);
-            ExecutableState oldStatus = ExecutableState.valueOf(jobOutput.getStatus());
-            if (newStatus != null && oldStatus != newStatus) {
-                if (!ExecutableState.isValidStateTransfer(oldStatus, newStatus)) {
-                    throw new IllegalStateTranferException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus + ", job id: " + jobId);
-                }
-                jobOutput.setStatus(newStatus.toString());
-            }
-            if (info != null) {
-                jobOutput.setInfo(info);
-            }
-            if (output != null) {
-                jobOutput.setContent(output);
-            }
-            executableDao.updateJobOutput(jobOutput);
-            logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus);
-        } catch (PersistentException e) {
-            logger.error("error change job:" + jobId + " to " + newStatus.toString());
-            throw new RuntimeException(e);
-        }
-    }
-
-    //for migration only
-    //TODO delete when migration finished
-    public void resetJobOutput(String jobId, ExecutableState state, String output) {
-        try {
-            final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId);
-            jobOutput.setStatus(state.toString());
-            if (output != null) {
-                jobOutput.setContent(output);
-            }
-            executableDao.updateJobOutput(jobOutput);
-        } catch (PersistentException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void addJobInfo(String id, Map<String, String> info) {
-        if (info == null) {
-            return;
-        }
-        try {
-            ExecutableOutputPO output = executableDao.getJobOutput(id);
-            Preconditions.checkArgument(output != null, "there is no related output for job id:" + id);
-            output.getInfo().putAll(info);
-            executableDao.updateJobOutput(output);
-        } catch (PersistentException e) {
-            logger.error("error update job info, id:" + id + "  info:" + info.toString());
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void addJobInfo(String id, String key, String value) {
-        Map<String, String> info = Maps.newHashMap();
-        info.put(key, value);
-        addJobInfo(id, info);
-    }
-
-    private static ExecutablePO parse(AbstractExecutable executable) {
-        ExecutablePO result = new ExecutablePO();
-        result.setName(executable.getName());
-        result.setUuid(executable.getId());
-        result.setType(executable.getClass().getName());
-        result.setParams(executable.getParams());
-        if (executable instanceof ChainedExecutable) {
-            List<ExecutablePO> tasks = Lists.newArrayList();
-            for (AbstractExecutable task : ((ChainedExecutable) executable).getTasks()) {
-                tasks.add(parse(task));
-            }
-            result.setTasks(tasks);
-        }
-        return result;
-    }
-
-    private static AbstractExecutable parseTo(ExecutablePO executablePO) {
-        if (executablePO == null) {
-            logger.warn("executablePO is null");
-            return null;
-        }
-        String type = executablePO.getType();
-        try {
-            Class<? extends AbstractExecutable> clazz = ClassUtil.forName(type, AbstractExecutable.class);
-            Constructor<? extends AbstractExecutable> constructor = clazz.getConstructor();
-            AbstractExecutable result = constructor.newInstance();
-            result.setId(executablePO.getUuid());
-            result.setName(executablePO.getName());
-            result.setParams(executablePO.getParams());
-            List<ExecutablePO> tasks = executablePO.getTasks();
-            if (tasks != null && !tasks.isEmpty()) {
-                Preconditions.checkArgument(result instanceof ChainedExecutable);
-                for (ExecutablePO subTask : tasks) {
-                    ((ChainedExecutable) result).addTask(parseTo(subTask));
-                }
-            }
-            return result;
-        } catch (ReflectiveOperationException e) {
-            throw new IllegalStateException("cannot parse this job:" + executablePO.getId(), e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
index 1eed361..2868f08 100644
--- a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
@@ -31,8 +31,8 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ChainedExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.manager.ExecutableManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index 97c9f8d..fdf5252 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -26,9 +26,9 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.lock.MockJobLock;
-import org.apache.kylin.job.manager.ExecutableManager;
 import org.junit.After;
 import org.junit.Before;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 1a0113d..bce0433 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -277,7 +277,7 @@ public class CubingJob extends DefaultChainedExecutable {
         }
 
         for (AbstractExecutable child : tasks) {
-            Output output = executableManager.getOutput(child.getId());
+            Output output = getManager().getOutput(child.getId());
             String value = output.getExtra().get(key);
             if (value != null)
                 return value;

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index 7ccd524..a26d4ff 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -38,6 +38,7 @@ import org.apache.kylin.job.constant.JobStepStatusEnum;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.job.execution.Output;
@@ -63,11 +64,11 @@ public class MapReduceExecutable extends AbstractExecutable {
 
     @Override
     protected void onExecuteStart(ExecutableContext executableContext) {
-        final Output output = executableManager.getOutput(getId());
+        final Output output = getOutput();
         if (output.getExtra().containsKey(START_TIME)) {
             final String mrJobId = output.getExtra().get(ExecutableConstants.MR_JOB_ID);
             if (mrJobId == null) {
-                executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+                getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
                 return;
             }
             try {
@@ -77,7 +78,7 @@ public class MapReduceExecutable extends AbstractExecutable {
                     //remove previous mr job info
                     super.onExecuteStart(executableContext);
                 } else {
-                    executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+                    getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
                 }
             } catch (IOException e) {
                 logger.warn("error get hadoop status");
@@ -99,7 +100,8 @@ public class MapReduceExecutable extends AbstractExecutable {
         Preconditions.checkNotNull(params);
         try {
             Job job;
-            final Map<String, String> extra = executableManager.getOutput(getId()).getExtra();
+            ExecutableManager mgr = getManager();
+            final Map<String, String> extra = mgr.getOutput(getId()).getExtra();
             if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) {
                 Configuration conf = HadoopUtil.getCurrentConfiguration();
                 job = new Cluster(conf).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
@@ -149,19 +151,19 @@ public class MapReduceExecutable extends AbstractExecutable {
 
                 JobStepStatusEnum newStatus = HadoopJobStatusChecker.checkStatus(job, output);
                 if (status == JobStepStatusEnum.KILLED) {
-                    executableManager.updateJobOutput(getId(), ExecutableState.ERROR, hadoopCmdOutput.getInfo(), "killed by admin");
+                    mgr.updateJobOutput(getId(), ExecutableState.ERROR, hadoopCmdOutput.getInfo(), "killed by admin");
                     return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin");
                 }
                 if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) {
                     final long waitTime = System.currentTimeMillis() - getStartTime();
                     setMapReduceWaitTime(waitTime);
                 }
-                executableManager.addJobInfo(getId(), hadoopCmdOutput.getInfo());
+                mgr.addJobInfo(getId(), hadoopCmdOutput.getInfo());
                 status = newStatus;
                 if (status.isComplete()) {
                     final Map<String, String> info = hadoopCmdOutput.getInfo();
                     readCounters(hadoopCmdOutput, info);
-                    executableManager.addJobInfo(getId(), info);
+                    mgr.addJobInfo(getId(), info);
 
                     if (status == JobStepStatusEnum.FINISHED) {
                         return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index 6b0c86e..9edc82e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -43,7 +43,7 @@ import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.job.exception.JobException;
-import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 013f2c9..a5ea1e9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -40,7 +40,7 @@ import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
-import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index 8777af7..23e81bc 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -122,7 +122,7 @@ public class SaveStatisticsStep extends AbstractExecutable {
         }
         logger.info("The cube algorithm for " + seg + " is " + alg);
 
-        CubingJob cubingJob = (CubingJob) executableManager.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
+        CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
         cubingJob.setAlgorithm(alg);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index 4e1be57..f7af42e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -59,7 +59,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
         final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
         final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
 
-        CubingJob cubingJob = (CubingJob) executableManager.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
+        CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
         long sourceCount = cubingJob.findSourceRecordCount();
         long sourceSizeBytes = cubingJob.findSourceSizeBytes();
         long cubeSizeBytes = cubingJob.findCubeSizeBytes();

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index 6e8e5ed..d2fa73e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -54,7 +54,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
             return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams()));
         }
 
-        CubingJob cubingJob = (CubingJob) executableManager.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
+        CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
         long cubeSizeBytes = cubingJob.findCubeSizeBytes();
 
         // collect source statistics

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 156b1f6..c3902a2 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -52,9 +52,9 @@ import org.apache.kylin.job.DeployUtil;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.SourceFactory;
 import org.apache.kylin.source.SourcePartition;

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 9804292..c2f53e1 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -49,9 +49,9 @@ import org.apache.kylin.job.DeployUtil;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.kylin.job.streaming.Kafka10DataLoader;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.streaming.StreamingConfig;

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
index 170c395..9f14deb 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -31,9 +31,9 @@ import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.metadata.streaming.StreamingManager;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.badquery.BadQueryHistoryManager;
 import org.apache.kylin.metadata.project.ProjectInstance;

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index e66eaec..08471a3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -52,8 +52,8 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java b/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java
index 1023a8b..068dbda 100644
--- a/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java
@@ -36,9 +36,9 @@ import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobStepStatusEnum;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.realization.RealizationType;

http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
index 4252e74..3c0ce1b 100644
--- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
+++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
@@ -53,8 +53,8 @@ import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.apache.kylin.source.hive.HiveClientFactory;
 import org.apache.kylin.source.hive.HiveCmdBuilder;