You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/08 09:05:15 UTC
kylin git commit: KYLIN-2169 Refactor AbstractExecutable to respect
KylinConfig
Repository: kylin
Updated Branches:
refs/heads/master 637581fb7 -> 9615b4ea6
KYLIN-2169 Refactor AbstractExecutable to respect KylinConfig
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9615b4ea
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9615b4ea
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9615b4ea
Branch: refs/heads/master
Commit: 9615b4ea6f817606c93df73dcafdcb151f4e8632
Parents: 637581f
Author: Li Yang <li...@apache.org>
Authored: Tue Nov 8 15:47:33 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Tue Nov 8 17:02:38 2016 +0800
----------------------------------------------------------------------
.../kylin/job/common/ShellExecutable.java | 2 +-
.../kylin/job/execution/AbstractExecutable.java | 37 +-
.../job/execution/DefaultChainedExecutable.java | 1 -
.../kylin/job/execution/ExecutableManager.java | 372 ++++++++++++++++++
.../job/impl/threadpool/DefaultScheduler.java | 2 +-
.../kylin/job/manager/ExecutableManager.java | 377 -------------------
.../apache/kylin/job/ExecutableManagerTest.java | 2 +-
.../job/impl/threadpool/BaseSchedulerTest.java | 2 +-
.../org/apache/kylin/engine/mr/CubingJob.java | 2 +-
.../engine/mr/common/MapReduceExecutable.java | 16 +-
.../apache/kylin/engine/mr/steps/CuboidJob.java | 2 +-
.../kylin/engine/mr/steps/InMemCuboidJob.java | 2 +-
.../engine/mr/steps/SaveStatisticsStep.java | 2 +-
.../mr/steps/UpdateCubeInfoAfterBuildStep.java | 2 +-
.../mr/steps/UpdateCubeInfoAfterMergeStep.java | 2 +-
.../kylin/provision/BuildCubeWithEngine.java | 2 +-
.../kylin/provision/BuildCubeWithStream.java | 2 +-
.../apache/kylin/rest/service/BasicService.java | 2 +-
.../storage/hbase/util/StorageCleanupJob.java | 2 +-
.../apache/kylin/tool/JobInstanceExtractor.java | 2 +-
.../apache/kylin/tool/StorageCleanupJob.java | 2 +-
21 files changed, 419 insertions(+), 416 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
index 111c1ba..a68f242 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
@@ -52,7 +52,7 @@ public class ShellExecutable extends AbstractExecutable {
logger.info("executing:" + getCmd());
final ShellExecutableLogger logger = new ShellExecutableLogger();
final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), logger);
- executableManager.addJobInfo(getId(), logger.getInfo());
+ getManager().addJobInfo(getId(), logger.getInfo());
return new ExecuteResult(result.getFirst() == 0 ? ExecuteResult.State.SUCCEED : ExecuteResult.State.FAILED, result.getSecond());
} catch (IOException e) {
logger.error("job:" + getId() + " execute finished with exception", e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 90e4d3c..f7b8a7c 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -32,7 +32,6 @@ import org.apache.kylin.common.util.MailService;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.exception.PersistentException;
import org.apache.kylin.job.impl.threadpool.DefaultContext;
-import org.apache.kylin.job.manager.ExecutableManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,43 +52,51 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class);
protected int retry = 0;
+ private KylinConfig config;
private String name;
private String id;
private Map<String, String> params = Maps.newHashMap();
- protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-
public AbstractExecutable() {
setId(UUID.randomUUID().toString());
}
+
+ void initConfig(KylinConfig config) {
+ Preconditions.checkState(this.config == null || this.config == config);
+ this.config = config;
+ }
+
+ protected ExecutableManager getManager() {
+ return ExecutableManager.getInstance(config);
+ }
protected void onExecuteStart(ExecutableContext executableContext) {
Map<String, String> info = Maps.newHashMap();
info.put(START_TIME, Long.toString(System.currentTimeMillis()));
- executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
+ getManager().updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
}
protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
setEndTime(System.currentTimeMillis());
if (!isDiscarded()) {
if (result.succeed()) {
- executableManager.updateJobOutput(getId(), ExecutableState.SUCCEED, null, result.output());
+ getManager().updateJobOutput(getId(), ExecutableState.SUCCEED, null, result.output());
} else {
- executableManager.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output());
+ getManager().updateJobOutput(getId(), ExecutableState.ERROR, null, result.output());
}
}
}
protected void onExecuteError(Throwable exception, ExecutableContext executableContext) {
if (!isDiscarded()) {
- executableManager.addJobInfo(getId(), END_TIME, Long.toString(System.currentTimeMillis()));
+ getManager().addJobInfo(getId(), END_TIME, Long.toString(System.currentTimeMillis()));
String output = null;
if (exception != null) {
final StringWriter out = new StringWriter();
exception.printStackTrace(new PrintWriter(out));
output = out.toString();
}
- executableManager.updateJobOutput(getId(), ExecutableState.ERROR, null, output);
+ getManager().updateJobOutput(getId(), ExecutableState.ERROR, null, output);
}
}
@@ -190,7 +197,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
@Override
public final ExecutableState getStatus() {
- return executableManager.getOutput(this.getId()).getState();
+ return getManager().getOutput(this.getId()).getState();
}
@Override
@@ -211,7 +218,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
}
public final long getLastModified() {
- return executableManager.getOutput(getId()).getLastModified();
+ return getOutput().getLastModified();
}
public final void setSubmitter(String submitter) {
@@ -298,11 +305,11 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
@Override
public final Output getOutput() {
- return executableManager.getOutput(getId());
+ return getManager().getOutput(getId());
}
protected long getExtraInfoAsLong(String key, long defaultValue) {
- return getExtraInfoAsLong(executableManager.getOutput(getId()), key, defaultValue);
+ return getExtraInfoAsLong(getOutput(), key, defaultValue);
}
public static long getStartTime(Output output) {
@@ -334,11 +341,11 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
}
protected final void addExtraInfo(String key, String value) {
- executableManager.addJobInfo(getId(), key, value);
+ getManager().addJobInfo(getId(), key, value);
}
protected final Map<String, String> getExtraInfo() {
- return executableManager.getOutput(getId()).getExtra();
+ return getOutput().getExtra();
}
public final void setStartTime(long time) {
@@ -366,7 +373,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
*
* */
protected final boolean isDiscarded() {
- final ExecutableState status = executableManager.getOutput(getId()).getState();
+ final ExecutableState status = getOutput().getState();
return status == ExecutableState.DISCARDED;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 39a5f4f..edc8189 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -23,7 +23,6 @@ import java.util.Map;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.manager.ExecutableManager;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
new file mode 100644
index 0000000..0901443
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.execution;
+
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.job.dao.ExecutableDao;
+import org.apache.kylin.job.dao.ExecutableOutputPO;
+import org.apache.kylin.job.dao.ExecutablePO;
+import org.apache.kylin.job.exception.IllegalStateTranferException;
+import org.apache.kylin.job.exception.PersistentException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ */
+public class ExecutableManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(ExecutableManager.class);
+ private static final ConcurrentHashMap<KylinConfig, ExecutableManager> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableManager>();
+
+ private final KylinConfig config;
+ private final ExecutableDao executableDao;
+
+ public static ExecutableManager getInstance(KylinConfig config) {
+ ExecutableManager r = CACHE.get(config);
+ if (r == null) {
+ synchronized (ExecutableManager.class) {
+ r = CACHE.get(config);
+ if (r == null) {
+ r = new ExecutableManager(config);
+ CACHE.put(config, r);
+ if (CACHE.size() > 1) {
+ logger.warn("More than one singleton exist");
+ }
+ }
+ }
+ }
+ return r;
+ }
+
+ private ExecutableManager(KylinConfig config) {
+ logger.info("Using metadata url: " + config);
+ this.config = config;
+ this.executableDao = ExecutableDao.getInstance(config);
+ }
+
+ public void addJob(AbstractExecutable executable) {
+ try {
+ executable.initConfig(config);
+ executableDao.addJob(parse(executable));
+ addJobOutput(executable);
+ } catch (PersistentException e) {
+ logger.error("fail to submit job:" + executable.getId(), e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void addJobOutput(AbstractExecutable executable) throws PersistentException {
+ ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
+ executableOutputPO.setUuid(executable.getId());
+ executableDao.addJobOutput(executableOutputPO);
+ if (executable instanceof DefaultChainedExecutable) {
+ for (AbstractExecutable subTask : ((DefaultChainedExecutable) executable).getTasks()) {
+ addJobOutput(subTask);
+ }
+ }
+ }
+
+ //for ut
+ public void deleteJob(String jobId) {
+ try {
+ executableDao.deleteJob(jobId);
+ } catch (PersistentException e) {
+ logger.error("fail to delete job:" + jobId, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public AbstractExecutable getJob(String uuid) {
+ try {
+ return parseTo(executableDao.getJob(uuid));
+ } catch (PersistentException e) {
+ logger.error("fail to get job:" + uuid, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Output getOutput(String uuid) {
+ try {
+ final ExecutableOutputPO jobOutput = executableDao.getJobOutput(uuid);
+ Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + uuid);
+ return parseOutput(jobOutput);
+ } catch (PersistentException e) {
+ logger.error("fail to get job output:" + uuid, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private DefaultOutput parseOutput(ExecutableOutputPO jobOutput) {
+ final DefaultOutput result = new DefaultOutput();
+ result.setExtra(jobOutput.getInfo());
+ result.setState(ExecutableState.valueOf(jobOutput.getStatus()));
+ result.setVerboseMsg(jobOutput.getContent());
+ result.setLastModified(jobOutput.getLastModified());
+ return result;
+ }
+
+ public Map<String, Output> getAllOutputs() {
+ try {
+ final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
+ HashMap<String, Output> result = Maps.newHashMap();
+ for (ExecutableOutputPO jobOutput : jobOutputs) {
+ result.put(jobOutput.getId(), parseOutput(jobOutput));
+ }
+ return result;
+ } catch (PersistentException e) {
+ logger.error("fail to get all job output:", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Map<String, Output> getAllOutputs(long timeStartInMillis, long timeEndInMillis) {
+ try {
+ final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(timeStartInMillis, timeEndInMillis);
+ HashMap<String, Output> result = Maps.newHashMap();
+ for (ExecutableOutputPO jobOutput : jobOutputs) {
+ result.put(jobOutput.getId(), parseOutput(jobOutput));
+ }
+ return result;
+ } catch (PersistentException e) {
+ logger.error("fail to get all job output:", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public List<AbstractExecutable> getAllExecutables() {
+ try {
+ List<AbstractExecutable> ret = Lists.newArrayList();
+ for (ExecutablePO po : executableDao.getJobs()) {
+ try {
+ AbstractExecutable ae = parseTo(po);
+ ret.add(ae);
+ } catch (IllegalArgumentException e) {
+ logger.error("error parsing one executabePO: ", e);
+ }
+ }
+ return ret;
+ } catch (PersistentException e) {
+ logger.error("error get All Jobs", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public List<AbstractExecutable> getAllExecutables(long timeStartInMillis, long timeEndInMillis) {
+ try {
+ List<AbstractExecutable> ret = Lists.newArrayList();
+ for (ExecutablePO po : executableDao.getJobs(timeStartInMillis, timeEndInMillis)) {
+ try {
+ AbstractExecutable ae = parseTo(po);
+ ret.add(ae);
+ } catch (IllegalArgumentException e) {
+ logger.error("error parsing one executabePO: ", e);
+ }
+ }
+ return ret;
+ } catch (PersistentException e) {
+ logger.error("error get All Jobs", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public List<String> getAllJobIds() {
+ try {
+ return executableDao.getJobIds();
+ } catch (PersistentException e) {
+ logger.error("error get All Job Ids", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void updateAllRunningJobsToError() {
+ try {
+ final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
+ for (ExecutableOutputPO executableOutputPO : jobOutputs) {
+ if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) {
+ executableOutputPO.setStatus(ExecutableState.ERROR.toString());
+ executableDao.updateJobOutput(executableOutputPO);
+ }
+ }
+ } catch (PersistentException e) {
+ logger.error("error reset job status from RUNNING to ERROR", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void resumeAllRunningJobs() {
+ try {
+ final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
+ for (ExecutableOutputPO executableOutputPO : jobOutputs) {
+ if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) {
+ executableOutputPO.setStatus(ExecutableState.READY.toString());
+ executableDao.updateJobOutput(executableOutputPO);
+ }
+ }
+ } catch (PersistentException e) {
+ logger.error("error reset job status from RUNNING to READY", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void resumeJob(String jobId) {
+ AbstractExecutable job = getJob(jobId);
+ if (job == null) {
+ return;
+ }
+ if (job instanceof DefaultChainedExecutable) {
+ List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
+ for (AbstractExecutable task : tasks) {
+ if (task.getStatus() == ExecutableState.ERROR) {
+ updateJobOutput(task.getId(), ExecutableState.READY, null, null);
+ break;
+ }
+ }
+ }
+ updateJobOutput(jobId, ExecutableState.READY, null, null);
+ }
+
+ public void discardJob(String jobId) {
+ AbstractExecutable job = getJob(jobId);
+ if (job instanceof DefaultChainedExecutable) {
+ List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
+ for (AbstractExecutable task : tasks) {
+ if (!task.getStatus().isFinalState()) {
+ updateJobOutput(task.getId(), ExecutableState.DISCARDED, null, null);
+ }
+ }
+ }
+ updateJobOutput(jobId, ExecutableState.DISCARDED, null, null);
+ }
+
+ public void updateJobOutput(String jobId, ExecutableState newStatus, Map<String, String> info, String output) {
+ try {
+ final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId);
+ Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + jobId);
+ ExecutableState oldStatus = ExecutableState.valueOf(jobOutput.getStatus());
+ if (newStatus != null && oldStatus != newStatus) {
+ if (!ExecutableState.isValidStateTransfer(oldStatus, newStatus)) {
+ throw new IllegalStateTranferException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus + ", job id: " + jobId);
+ }
+ jobOutput.setStatus(newStatus.toString());
+ }
+ if (info != null) {
+ jobOutput.setInfo(info);
+ }
+ if (output != null) {
+ jobOutput.setContent(output);
+ }
+ executableDao.updateJobOutput(jobOutput);
+ logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus);
+ } catch (PersistentException e) {
+ logger.error("error change job:" + jobId + " to " + newStatus.toString());
+ throw new RuntimeException(e);
+ }
+ }
+
+ //for migration only
+ //TODO delete when migration finished
+ public void resetJobOutput(String jobId, ExecutableState state, String output) {
+ try {
+ final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId);
+ jobOutput.setStatus(state.toString());
+ if (output != null) {
+ jobOutput.setContent(output);
+ }
+ executableDao.updateJobOutput(jobOutput);
+ } catch (PersistentException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void addJobInfo(String id, Map<String, String> info) {
+ if (info == null) {
+ return;
+ }
+ try {
+ ExecutableOutputPO output = executableDao.getJobOutput(id);
+ Preconditions.checkArgument(output != null, "there is no related output for job id:" + id);
+ output.getInfo().putAll(info);
+ executableDao.updateJobOutput(output);
+ } catch (PersistentException e) {
+ logger.error("error update job info, id:" + id + " info:" + info.toString());
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void addJobInfo(String id, String key, String value) {
+ Map<String, String> info = Maps.newHashMap();
+ info.put(key, value);
+ addJobInfo(id, info);
+ }
+
+ private static ExecutablePO parse(AbstractExecutable executable) {
+ ExecutablePO result = new ExecutablePO();
+ result.setName(executable.getName());
+ result.setUuid(executable.getId());
+ result.setType(executable.getClass().getName());
+ result.setParams(executable.getParams());
+ if (executable instanceof ChainedExecutable) {
+ List<ExecutablePO> tasks = Lists.newArrayList();
+ for (AbstractExecutable task : ((ChainedExecutable) executable).getTasks()) {
+ tasks.add(parse(task));
+ }
+ result.setTasks(tasks);
+ }
+ return result;
+ }
+
+ private AbstractExecutable parseTo(ExecutablePO executablePO) {
+ if (executablePO == null) {
+ logger.warn("executablePO is null");
+ return null;
+ }
+ String type = executablePO.getType();
+ try {
+ Class<? extends AbstractExecutable> clazz = ClassUtil.forName(type, AbstractExecutable.class);
+ Constructor<? extends AbstractExecutable> constructor = clazz.getConstructor();
+ AbstractExecutable result = constructor.newInstance();
+ result.initConfig(config);
+ result.setId(executablePO.getUuid());
+ result.setName(executablePO.getName());
+ result.setParams(executablePO.getParams());
+ List<ExecutablePO> tasks = executablePO.getTasks();
+ if (tasks != null && !tasks.isEmpty()) {
+ Preconditions.checkArgument(result instanceof ChainedExecutable);
+ for (ExecutablePO subTask : tasks) {
+ ((ChainedExecutable) result).addTask(parseTo(subTask));
+ }
+ }
+ return result;
+ } catch (ReflectiveOperationException e) {
+ throw new IllegalStateException("cannot parse this job:" + executablePO.getId(), e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 1ea3be0..9d5f7ba 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -36,10 +36,10 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
import org.apache.kylin.job.lock.JobLock;
-import org.apache.kylin.job.manager.ExecutableManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
deleted file mode 100644
index d42b924..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.manager;
-
-import java.lang.reflect.Constructor;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.dao.ExecutableDao;
-import org.apache.kylin.job.dao.ExecutableOutputPO;
-import org.apache.kylin.job.dao.ExecutablePO;
-import org.apache.kylin.job.exception.IllegalStateTranferException;
-import org.apache.kylin.job.exception.PersistentException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ChainedExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.DefaultOutput;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.Output;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- */
-public class ExecutableManager {
-
- private static final Logger logger = LoggerFactory.getLogger(ExecutableManager.class);
- private static final ConcurrentHashMap<KylinConfig, ExecutableManager> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableManager>();
- @SuppressWarnings("unused")
- private final KylinConfig config;
-
- private ExecutableDao executableDao;
-
- public static ExecutableManager getInstance(KylinConfig config) {
- ExecutableManager r = CACHE.get(config);
- if (r == null) {
- synchronized (ExecutableManager.class) {
- r = CACHE.get(config);
- if (r == null) {
- r = new ExecutableManager(config);
- CACHE.put(config, r);
- if (CACHE.size() > 1) {
- logger.warn("More than one singleton exist");
- }
- }
- }
- }
- return r;
- }
-
- private ExecutableManager(KylinConfig config) {
- logger.info("Using metadata url: " + config);
- this.config = config;
- this.executableDao = ExecutableDao.getInstance(config);
- }
-
- public void addJob(AbstractExecutable executable) {
- try {
- executableDao.addJob(parse(executable));
- addJobOutput(executable);
- } catch (PersistentException e) {
- logger.error("fail to submit job:" + executable.getId(), e);
- throw new RuntimeException(e);
- }
- }
-
- private void addJobOutput(AbstractExecutable executable) throws PersistentException {
- ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
- executableOutputPO.setUuid(executable.getId());
- executableDao.addJobOutput(executableOutputPO);
- if (executable instanceof DefaultChainedExecutable) {
- for (AbstractExecutable subTask : ((DefaultChainedExecutable) executable).getTasks()) {
- addJobOutput(subTask);
- }
- }
- }
-
- //for ut
- public void deleteJob(String jobId) {
- try {
- executableDao.deleteJob(jobId);
- } catch (PersistentException e) {
- logger.error("fail to delete job:" + jobId, e);
- throw new RuntimeException(e);
- }
- }
-
- public AbstractExecutable getJob(String uuid) {
- try {
- return parseTo(executableDao.getJob(uuid));
- } catch (PersistentException e) {
- logger.error("fail to get job:" + uuid, e);
- throw new RuntimeException(e);
- }
- }
-
- public Output getOutput(String uuid) {
- try {
- final ExecutableOutputPO jobOutput = executableDao.getJobOutput(uuid);
- Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + uuid);
- return parseOutput(jobOutput);
- } catch (PersistentException e) {
- logger.error("fail to get job output:" + uuid, e);
- throw new RuntimeException(e);
- }
- }
-
- private DefaultOutput parseOutput(ExecutableOutputPO jobOutput) {
- final DefaultOutput result = new DefaultOutput();
- result.setExtra(jobOutput.getInfo());
- result.setState(ExecutableState.valueOf(jobOutput.getStatus()));
- result.setVerboseMsg(jobOutput.getContent());
- result.setLastModified(jobOutput.getLastModified());
- return result;
- }
-
- public Map<String, Output> getAllOutputs() {
- try {
- final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
- HashMap<String, Output> result = Maps.newHashMap();
- for (ExecutableOutputPO jobOutput : jobOutputs) {
- result.put(jobOutput.getId(), parseOutput(jobOutput));
- }
- return result;
- } catch (PersistentException e) {
- logger.error("fail to get all job output:", e);
- throw new RuntimeException(e);
- }
- }
-
- public Map<String, Output> getAllOutputs(long timeStartInMillis, long timeEndInMillis) {
- try {
- final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(timeStartInMillis, timeEndInMillis);
- HashMap<String, Output> result = Maps.newHashMap();
- for (ExecutableOutputPO jobOutput : jobOutputs) {
- result.put(jobOutput.getId(), parseOutput(jobOutput));
- }
- return result;
- } catch (PersistentException e) {
- logger.error("fail to get all job output:", e);
- throw new RuntimeException(e);
- }
- }
-
- public List<AbstractExecutable> getAllExecutables() {
- try {
- List<AbstractExecutable> ret = Lists.newArrayList();
- for (ExecutablePO po : executableDao.getJobs()) {
- try {
- AbstractExecutable ae = parseTo(po);
- ret.add(ae);
- } catch (IllegalArgumentException e) {
- logger.error("error parsing one executabePO: ", e);
- }
- }
- return ret;
- } catch (PersistentException e) {
- logger.error("error get All Jobs", e);
- throw new RuntimeException(e);
- }
- }
-
- public List<AbstractExecutable> getAllExecutables(long timeStartInMillis, long timeEndInMillis) {
- try {
- List<AbstractExecutable> ret = Lists.newArrayList();
- for (ExecutablePO po : executableDao.getJobs(timeStartInMillis, timeEndInMillis)) {
- try {
- AbstractExecutable ae = parseTo(po);
- ret.add(ae);
- } catch (IllegalArgumentException e) {
- logger.error("error parsing one executabePO: ", e);
- }
- }
- return ret;
- } catch (PersistentException e) {
- logger.error("error get All Jobs", e);
- throw new RuntimeException(e);
- }
- }
-
- public List<String> getAllJobIds() {
- try {
- return executableDao.getJobIds();
- } catch (PersistentException e) {
- logger.error("error get All Job Ids", e);
- throw new RuntimeException(e);
- }
- }
-
- public void updateAllRunningJobsToError() {
- try {
- final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
- for (ExecutableOutputPO executableOutputPO : jobOutputs) {
- if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) {
- executableOutputPO.setStatus(ExecutableState.ERROR.toString());
- executableDao.updateJobOutput(executableOutputPO);
- }
- }
- } catch (PersistentException e) {
- logger.error("error reset job status from RUNNING to ERROR", e);
- throw new RuntimeException(e);
- }
- }
-
- public void resumeAllRunningJobs() {
- try {
- final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
- for (ExecutableOutputPO executableOutputPO : jobOutputs) {
- if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) {
- executableOutputPO.setStatus(ExecutableState.READY.toString());
- executableDao.updateJobOutput(executableOutputPO);
- }
- }
- } catch (PersistentException e) {
- logger.error("error reset job status from RUNNING to READY", e);
- throw new RuntimeException(e);
- }
- }
-
- public void resumeJob(String jobId) {
- AbstractExecutable job = getJob(jobId);
- if (job == null) {
- return;
- }
- if (job instanceof DefaultChainedExecutable) {
- List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
- for (AbstractExecutable task : tasks) {
- if (task.getStatus() == ExecutableState.ERROR) {
- updateJobOutput(task.getId(), ExecutableState.READY, null, null);
- break;
- }
- }
- }
- updateJobOutput(jobId, ExecutableState.READY, null, null);
- }
-
- public void discardJob(String jobId) {
- AbstractExecutable job = getJob(jobId);
- if (job instanceof DefaultChainedExecutable) {
- List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
- for (AbstractExecutable task : tasks) {
- if (!task.getStatus().isFinalState()) {
- updateJobOutput(task.getId(), ExecutableState.DISCARDED, null, null);
- }
- }
- }
- updateJobOutput(jobId, ExecutableState.DISCARDED, null, null);
- }
-
- public void updateJobOutput(String jobId, ExecutableState newStatus, Map<String, String> info, String output) {
- try {
- final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId);
- Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + jobId);
- ExecutableState oldStatus = ExecutableState.valueOf(jobOutput.getStatus());
- if (newStatus != null && oldStatus != newStatus) {
- if (!ExecutableState.isValidStateTransfer(oldStatus, newStatus)) {
- throw new IllegalStateTranferException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus + ", job id: " + jobId);
- }
- jobOutput.setStatus(newStatus.toString());
- }
- if (info != null) {
- jobOutput.setInfo(info);
- }
- if (output != null) {
- jobOutput.setContent(output);
- }
- executableDao.updateJobOutput(jobOutput);
- logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus);
- } catch (PersistentException e) {
- logger.error("error change job:" + jobId + " to " + newStatus.toString());
- throw new RuntimeException(e);
- }
- }
-
- //for migration only
- //TODO delete when migration finished
- public void resetJobOutput(String jobId, ExecutableState state, String output) {
- try {
- final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId);
- jobOutput.setStatus(state.toString());
- if (output != null) {
- jobOutput.setContent(output);
- }
- executableDao.updateJobOutput(jobOutput);
- } catch (PersistentException e) {
- throw new RuntimeException(e);
- }
- }
-
- public void addJobInfo(String id, Map<String, String> info) {
- if (info == null) {
- return;
- }
- try {
- ExecutableOutputPO output = executableDao.getJobOutput(id);
- Preconditions.checkArgument(output != null, "there is no related output for job id:" + id);
- output.getInfo().putAll(info);
- executableDao.updateJobOutput(output);
- } catch (PersistentException e) {
- logger.error("error update job info, id:" + id + " info:" + info.toString());
- throw new RuntimeException(e);
- }
- }
-
- public void addJobInfo(String id, String key, String value) {
- Map<String, String> info = Maps.newHashMap();
- info.put(key, value);
- addJobInfo(id, info);
- }
-
- private static ExecutablePO parse(AbstractExecutable executable) {
- ExecutablePO result = new ExecutablePO();
- result.setName(executable.getName());
- result.setUuid(executable.getId());
- result.setType(executable.getClass().getName());
- result.setParams(executable.getParams());
- if (executable instanceof ChainedExecutable) {
- List<ExecutablePO> tasks = Lists.newArrayList();
- for (AbstractExecutable task : ((ChainedExecutable) executable).getTasks()) {
- tasks.add(parse(task));
- }
- result.setTasks(tasks);
- }
- return result;
- }
-
- private static AbstractExecutable parseTo(ExecutablePO executablePO) {
- if (executablePO == null) {
- logger.warn("executablePO is null");
- return null;
- }
- String type = executablePO.getType();
- try {
- Class<? extends AbstractExecutable> clazz = ClassUtil.forName(type, AbstractExecutable.class);
- Constructor<? extends AbstractExecutable> constructor = clazz.getConstructor();
- AbstractExecutable result = constructor.newInstance();
- result.setId(executablePO.getUuid());
- result.setName(executablePO.getName());
- result.setParams(executablePO.getParams());
- List<ExecutablePO> tasks = executablePO.getTasks();
- if (tasks != null && !tasks.isEmpty()) {
- Preconditions.checkArgument(result instanceof ChainedExecutable);
- for (ExecutablePO subTask : tasks) {
- ((ChainedExecutable) result).addTask(parseTo(subTask));
- }
- }
- return result;
- } catch (ReflectiveOperationException e) {
- throw new IllegalStateException("cannot parse this job:" + executablePO.getId(), e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
index 1eed361..2868f08 100644
--- a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
@@ -31,8 +31,8 @@ import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ChainedExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.manager.ExecutableManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index 97c9f8d..fdf5252 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -26,9 +26,9 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.lock.MockJobLock;
-import org.apache.kylin.job.manager.ExecutableManager;
import org.junit.After;
import org.junit.Before;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 1a0113d..bce0433 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -277,7 +277,7 @@ public class CubingJob extends DefaultChainedExecutable {
}
for (AbstractExecutable child : tasks) {
- Output output = executableManager.getOutput(child.getId());
+ Output output = getManager().getOutput(child.getId());
String value = output.getExtra().get(key);
if (value != null)
return value;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index 7ccd524..a26d4ff 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -38,6 +38,7 @@ import org.apache.kylin.job.constant.JobStepStatusEnum;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.job.execution.Output;
@@ -63,11 +64,11 @@ public class MapReduceExecutable extends AbstractExecutable {
@Override
protected void onExecuteStart(ExecutableContext executableContext) {
- final Output output = executableManager.getOutput(getId());
+ final Output output = getOutput();
if (output.getExtra().containsKey(START_TIME)) {
final String mrJobId = output.getExtra().get(ExecutableConstants.MR_JOB_ID);
if (mrJobId == null) {
- executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+ getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
return;
}
try {
@@ -77,7 +78,7 @@ public class MapReduceExecutable extends AbstractExecutable {
//remove previous mr job info
super.onExecuteStart(executableContext);
} else {
- executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+ getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
}
} catch (IOException e) {
logger.warn("error get hadoop status");
@@ -99,7 +100,8 @@ public class MapReduceExecutable extends AbstractExecutable {
Preconditions.checkNotNull(params);
try {
Job job;
- final Map<String, String> extra = executableManager.getOutput(getId()).getExtra();
+ ExecutableManager mgr = getManager();
+ final Map<String, String> extra = mgr.getOutput(getId()).getExtra();
if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) {
Configuration conf = HadoopUtil.getCurrentConfiguration();
job = new Cluster(conf).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
@@ -149,19 +151,19 @@ public class MapReduceExecutable extends AbstractExecutable {
JobStepStatusEnum newStatus = HadoopJobStatusChecker.checkStatus(job, output);
if (status == JobStepStatusEnum.KILLED) {
- executableManager.updateJobOutput(getId(), ExecutableState.ERROR, hadoopCmdOutput.getInfo(), "killed by admin");
+ mgr.updateJobOutput(getId(), ExecutableState.ERROR, hadoopCmdOutput.getInfo(), "killed by admin");
return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin");
}
if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) {
final long waitTime = System.currentTimeMillis() - getStartTime();
setMapReduceWaitTime(waitTime);
}
- executableManager.addJobInfo(getId(), hadoopCmdOutput.getInfo());
+ mgr.addJobInfo(getId(), hadoopCmdOutput.getInfo());
status = newStatus;
if (status.isComplete()) {
final Map<String, String> info = hadoopCmdOutput.getInfo();
readCounters(hadoopCmdOutput, info);
- executableManager.addJobInfo(getId(), info);
+ mgr.addJobInfo(getId(), info);
if (status == JobStepStatusEnum.FINISHED) {
return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index 6b0c86e..9edc82e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -43,7 +43,7 @@ import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.job.exception.JobException;
-import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 013f2c9..a5ea1e9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -40,7 +40,7 @@ import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
-import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index 8777af7..23e81bc 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -122,7 +122,7 @@ public class SaveStatisticsStep extends AbstractExecutable {
}
logger.info("The cube algorithm for " + seg + " is " + alg);
- CubingJob cubingJob = (CubingJob) executableManager.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
+ CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
cubingJob.setAlgorithm(alg);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index 4e1be57..f7af42e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -59,7 +59,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
- CubingJob cubingJob = (CubingJob) executableManager.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
+ CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
long sourceCount = cubingJob.findSourceRecordCount();
long sourceSizeBytes = cubingJob.findSourceSizeBytes();
long cubeSizeBytes = cubingJob.findCubeSizeBytes();
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index 6e8e5ed..d2fa73e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -54,7 +54,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams()));
}
- CubingJob cubingJob = (CubingJob) executableManager.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
+ CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
long cubeSizeBytes = cubingJob.findCubeSizeBytes();
// collect source statistics
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 156b1f6..c3902a2 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -52,9 +52,9 @@ import org.apache.kylin.job.DeployUtil;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.source.ISource;
import org.apache.kylin.source.SourceFactory;
import org.apache.kylin.source.SourcePartition;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 9804292..c2f53e1 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -49,9 +49,9 @@ import org.apache.kylin.job.DeployUtil;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.job.streaming.Kafka10DataLoader;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.streaming.StreamingConfig;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
index 170c395..9f14deb 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -31,9 +31,9 @@ import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.metadata.streaming.StreamingManager;
import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.badquery.BadQueryHistoryManager;
import org.apache.kylin.metadata.project.ProjectInstance;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index e66eaec..08471a3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -52,8 +52,8 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java b/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java
index 1023a8b..068dbda 100644
--- a/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java
@@ -36,9 +36,9 @@ import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.constant.JobStepStatusEnum;
import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.realization.RealizationType;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
index 4252e74..3c0ce1b 100644
--- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
+++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java
@@ -53,8 +53,8 @@ import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.apache.kylin.source.hive.HiveClientFactory;
import org.apache.kylin.source.hive.HiveCmdBuilder;