You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/04/24 05:26:54 UTC
[kylin] branch master updated: KYLIN-3936 MR/Spark task will still
run after the job is stopped
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new cf2de69 KYLIN-3936 MR/Spark task will still run after the job is stopped
cf2de69 is described below
commit cf2de6968b0e46365a65c8c121f0552b71e22cdf
Author: Guangxu Cheng <gx...@apache.org>
AuthorDate: Wed Apr 24 11:07:04 2019 +0800
KYLIN-3936 MR/Spark task will still run after the job is stopped
---
.../apache/kylin/job/execution/ExecutableManager.java | 19 ++++++++++++++++++-
.../apache/kylin/rest/controller/JobController.java | 3 ++-
.../org/apache/kylin/rest/service/JobService.java | 10 +++++++---
3 files changed, 27 insertions(+), 5 deletions(-)
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 5837bd5..e8f2512 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -375,7 +375,8 @@ public class ExecutableManager {
} else {
logger.warn("The job " + jobId + " has been discarded.");
}
- return;
+ throw new IllegalStateException(
+ "The job " + job.getId() + " has already been finished and cannot be discarded.");
}
if (job instanceof DefaultChainedExecutable) {
List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
@@ -415,6 +416,22 @@ public class ExecutableManager {
return;
}
+ if (!(job.getStatus() == ExecutableState.READY
+ || job.getStatus() == ExecutableState.RUNNING)) {
+ logger.warn("The status of job " + jobId + " is " + job.getStatus().toString()
+ + ". It's final state and cannot be transfer to be stopped!!!");
+ throw new IllegalStateException(
+ "The job " + job.getId() + " has already been finished and cannot be stopped.");
+ }
+ if (job instanceof DefaultChainedExecutable) {
+ List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
+ for (AbstractExecutable task : tasks) {
+ if (!task.getStatus().isFinalState()) {
+ updateJobOutput(task.getId(), ExecutableState.STOPPED, null, null);
+ break;
+ }
+ }
+ }
updateJobOutput(jobId, ExecutableState.STOPPED, null, null);
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
index f79281b..d6d612d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -180,7 +180,8 @@ public class JobController extends BasicController {
try {
final JobInstance jobInstance = jobService.getJobInstance(jobId);
- return jobService.pauseJob(jobInstance);
+ jobService.pauseJob(jobInstance);
+ return jobService.getJobInstance(jobId);
} catch (Exception e) {
logger.error(e.getLocalizedMessage(), e);
throw new InternalErrorException(e);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 309735f..46f0143 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -658,11 +658,15 @@ public class JobService extends BasicService implements InitializingBean {
}
}
- public JobInstance pauseJob(JobInstance job) {
+ public void pauseJob(JobInstance job) {
aclEvaluate.checkProjectOperationPermission(job);
+ logger.info("Pause job [" + job.getId() + "] trigger by "
+ + SecurityContextHolder.getContext().getAuthentication().getName());
+ if (job.getStatus().isComplete()) {
+ throw new IllegalStateException(
+ "The job " + job.getId() + " has already been finished and cannot be stopped.");
+ }
getExecutableManager().pauseJob(job.getId());
- job.setStatus(JobStatusEnum.STOPPED);
- return job;
}
public void dropJob(JobInstance job) {