You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/04/24 05:26:54 UTC

[kylin] branch master updated: KYLIN-3936 MR/Spark task will still run after the job is stopped

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new cf2de69  KYLIN-3936 MR/Spark task will still run after the job is stopped
cf2de69 is described below

commit cf2de6968b0e46365a65c8c121f0552b71e22cdf
Author: Guangxu Cheng <gx...@apache.org>
AuthorDate: Wed Apr 24 11:07:04 2019 +0800

    KYLIN-3936 MR/Spark task will still run after the job is stopped
---
 .../apache/kylin/job/execution/ExecutableManager.java | 19 ++++++++++++++++++-
 .../apache/kylin/rest/controller/JobController.java   |  3 ++-
 .../org/apache/kylin/rest/service/JobService.java     | 10 +++++++---
 3 files changed, 27 insertions(+), 5 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 5837bd5..e8f2512 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -375,7 +375,8 @@ public class ExecutableManager {
             } else {
                 logger.warn("The job " + jobId + " has been discarded.");
             }
-            return;
+            throw new IllegalStateException(
+                "The job " + job.getId() + " has already been finished and cannot be discarded.");
         }
         if (job instanceof DefaultChainedExecutable) {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
@@ -415,6 +416,22 @@ public class ExecutableManager {
             return;
         }
 
+        if (!(job.getStatus() == ExecutableState.READY
+            || job.getStatus() == ExecutableState.RUNNING)) {
+            logger.warn("The status of job " + jobId + " is " + job.getStatus().toString()
+                + ". It's final state and cannot be transfer to be stopped!!!");
+            throw new IllegalStateException(
+                "The job " + job.getId() + " has already been finished and cannot be stopped.");
+        }
+        if (job instanceof DefaultChainedExecutable) {
+            List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
+            for (AbstractExecutable task : tasks) {
+                if (!task.getStatus().isFinalState()) {
+                    updateJobOutput(task.getId(), ExecutableState.STOPPED, null, null);
+                    break;
+                }
+            }
+        }
         updateJobOutput(jobId, ExecutableState.STOPPED, null, null);
     }
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
index f79281b..d6d612d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -180,7 +180,8 @@ public class JobController extends BasicController {
 
         try {
             final JobInstance jobInstance = jobService.getJobInstance(jobId);
-            return jobService.pauseJob(jobInstance);
+            jobService.pauseJob(jobInstance);
+            return jobService.getJobInstance(jobId);
         } catch (Exception e) {
             logger.error(e.getLocalizedMessage(), e);
             throw new InternalErrorException(e);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 309735f..46f0143 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -658,11 +658,15 @@ public class JobService extends BasicService implements InitializingBean {
         }
     }
 
-    public JobInstance pauseJob(JobInstance job) {
+    public void pauseJob(JobInstance job) {
         aclEvaluate.checkProjectOperationPermission(job);
+        logger.info("Pause job [" + job.getId() + "] trigger by "
+            + SecurityContextHolder.getContext().getAuthentication().getName());
+        if (job.getStatus().isComplete()) {
+          throw new IllegalStateException(
+              "The job " + job.getId() + " has already been finished and cannot be stopped.");
+        }
         getExecutableManager().pauseJob(job.getId());
-        job.setStatus(JobStatusEnum.STOPPED);
-        return job;
     }
 
     public void dropJob(JobInstance job) {