You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/06/26 12:29:19 UTC
kylin git commit: KYLIN-1811 Error step may be skipped sometimes when
resume a cube job
Repository: kylin
Updated Branches:
refs/heads/master 1015daedf -> 5bf6582f6
KYLIN-1811 Error step may be skipped sometimes when resume a cube job
Signed-off-by: Yang Li <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5bf6582f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5bf6582f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5bf6582f
Branch: refs/heads/master
Commit: 5bf6582f66bedb5d25baaa0d82ae193892b97e0c
Parents: 1015dae
Author: Ma,Gang <ga...@ebay.com>
Authored: Wed Jun 22 11:07:19 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Jun 26 20:21:35 2016 +0800
----------------------------------------------------------------------
.../kylin/job/execution/DefaultChainedExecutable.java | 14 ++++++++++++++
.../apache/kylin/job/manager/ExecutableManager.java | 2 +-
2 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/5bf6582f/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 7403715..b130f5b 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -46,6 +46,13 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
final int size = executables.size();
for (int i = 0; i < size; ++i) {
Executable subTask = executables.get(i);
+ ExecutableState state = subTask.getStatus();
+ if (state == ExecutableState.RUNNING){
+ // there is already running subtask, no need to start a new subtask
+ break;
+ } else if (state == ExecutableState.ERROR){
+ throw new IllegalStateException("invalid subtask state, subtask:" + subTask.getName() + ", state:" + subTask.getStatus());
+ }
if (subTask.isRunnable()) {
return subTask.execute(context);
}
@@ -53,6 +60,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
return new ExecuteResult(ExecuteResult.State.SUCCEED, null);
}
+
@Override
protected void onExecuteStart(ExecutableContext executableContext) {
Map<String, String> info = Maps.newHashMap();
@@ -74,6 +82,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
List<? extends Executable> jobs = getTasks();
boolean allSucceed = true;
boolean hasError = false;
+ boolean hasRunning = false;
for (Executable task : jobs) {
final ExecutableState status = task.getStatus();
if (status == ExecutableState.ERROR) {
@@ -82,6 +91,9 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
if (status != ExecutableState.SUCCEED) {
allSucceed = false;
}
+ if (status == ExecutableState.RUNNING) {
+ hasRunning = true;
+ }
}
if (allSucceed) {
setEndTime(System.currentTimeMillis());
@@ -91,6 +103,8 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
setEndTime(System.currentTimeMillis());
jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, null);
notifyUserStatusChange(executableContext, ExecutableState.ERROR);
+ } else if (hasRunning){
+ jobService.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
} else {
jobService.updateJobOutput(getId(), ExecutableState.READY, null, null);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5bf6582f/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
index 7b4e0f0..3a19486 100644
--- a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
@@ -246,7 +246,6 @@ public class ExecutableManager {
if (job == null) {
return;
}
- updateJobOutput(jobId, ExecutableState.READY, null, null);
if (job instanceof DefaultChainedExecutable) {
List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
for (AbstractExecutable task : tasks) {
@@ -256,6 +255,7 @@ public class ExecutableManager {
}
}
}
+ updateJobOutput(jobId, ExecutableState.READY, null, null);
}
public void discardJob(String jobId) {