You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/06/26 12:29:19 UTC

kylin git commit: KYLIN-1811 Error step may be skipped sometimes when resume a cube job

Repository: kylin
Updated Branches:
  refs/heads/master 1015daedf -> 5bf6582f6


KYLIN-1811 Error step may be skipped sometimes when resume a cube job

Signed-off-by: Yang Li <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5bf6582f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5bf6582f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5bf6582f

Branch: refs/heads/master
Commit: 5bf6582f66bedb5d25baaa0d82ae193892b97e0c
Parents: 1015dae
Author: Ma,Gang <ga...@ebay.com>
Authored: Wed Jun 22 11:07:19 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Jun 26 20:21:35 2016 +0800

----------------------------------------------------------------------
 .../kylin/job/execution/DefaultChainedExecutable.java | 14 ++++++++++++++
 .../apache/kylin/job/manager/ExecutableManager.java   |  2 +-
 2 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/5bf6582f/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 7403715..b130f5b 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -46,6 +46,13 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
         final int size = executables.size();
         for (int i = 0; i < size; ++i) {
             Executable subTask = executables.get(i);
+            ExecutableState state = subTask.getStatus();
+            if (state == ExecutableState.RUNNING){
+                // there is already running subtask, no need to start a new subtask
+                break;
+            } else if (state == ExecutableState.ERROR){
+                throw new IllegalStateException("invalid subtask state, subtask:" + subTask.getName() + ", state:" + subTask.getStatus());
+            }
             if (subTask.isRunnable()) {
                 return subTask.execute(context);
             }
@@ -53,6 +60,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
         return new ExecuteResult(ExecuteResult.State.SUCCEED, null);
     }
 
+
     @Override
     protected void onExecuteStart(ExecutableContext executableContext) {
         Map<String, String> info = Maps.newHashMap();
@@ -74,6 +82,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
             List<? extends Executable> jobs = getTasks();
             boolean allSucceed = true;
             boolean hasError = false;
+            boolean hasRunning = false;
             for (Executable task : jobs) {
                 final ExecutableState status = task.getStatus();
                 if (status == ExecutableState.ERROR) {
@@ -82,6 +91,9 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
                 if (status != ExecutableState.SUCCEED) {
                     allSucceed = false;
                 }
+                if (status == ExecutableState.RUNNING) {
+                    hasRunning = true;
+                }
             }
             if (allSucceed) {
                 setEndTime(System.currentTimeMillis());
@@ -91,6 +103,8 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
                 setEndTime(System.currentTimeMillis());
                 jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, null);
                 notifyUserStatusChange(executableContext, ExecutableState.ERROR);
+            } else if (hasRunning){
+                jobService.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
             } else {
                 jobService.updateJobOutput(getId(), ExecutableState.READY, null, null);
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5bf6582f/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
index 7b4e0f0..3a19486 100644
--- a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
@@ -246,7 +246,6 @@ public class ExecutableManager {
         if (job == null) {
             return;
         }
-        updateJobOutput(jobId, ExecutableState.READY, null, null);
         if (job instanceof DefaultChainedExecutable) {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
             for (AbstractExecutable task : tasks) {
@@ -256,6 +255,7 @@ public class ExecutableManager {
                 }
             }
         }
+        updateJobOutput(jobId, ExecutableState.READY, null, null);
     }
 
     public void discardJob(String jobId) {