You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/11/24 10:14:32 UTC
kylin git commit: KYLIN-277 add API for pause and rollback job
Repository: kylin
Updated Branches:
refs/heads/master 827205f17 -> e1d5b2938
KYLIN-277 add API for pause and rollback job
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e1d5b293
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e1d5b293
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e1d5b293
Branch: refs/heads/master
Commit: e1d5b29385a5c044fe7f60d27d59baf82499b3de
Parents: 827205f
Author: shaofengshi <sh...@apache.org>
Authored: Thu Nov 24 18:14:27 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Nov 24 18:14:27 2016 +0800
----------------------------------------------------------------------
.../kylin/job/execution/AbstractExecutable.java | 2 +-
.../kylin/job/execution/ExecutableManager.java | 18 +++++++++++++++++
.../kylin/job/execution/ExecutableState.java | 2 ++
.../engine/mr/steps/SaveStatisticsStep.java | 1 -
.../kylin/rest/controller/JobController.java | 21 +++++++++++++++++++-
.../apache/kylin/rest/service/JobService.java | 7 +++++++
6 files changed, 48 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d5b293/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 551241b..cd9b033 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -82,7 +82,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
setEndTime(System.currentTimeMillis());
- if (!isDiscarded()) {
+ if (!isDiscarded() && !isRunnable()) {
if (result.succeed()) {
getManager().updateJobOutput(getId(), ExecutableState.SUCCEED, null, result.output());
} else {
http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d5b293/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 52d4d1c..4351e31 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -292,6 +292,24 @@ public class ExecutableManager {
updateJobOutput(jobId, ExecutableState.DISCARDED, null, null);
}
+
+ public void rollbackJob(String jobId, String stepId) {
+ AbstractExecutable job = getJob(jobId);
+ if (job == null) {
+ return;
+ }
+
+ if (job instanceof DefaultChainedExecutable) {
+ List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
+ for (AbstractExecutable task : tasks) {
+ if (task.getId().compareTo(stepId) >= 0) {
+ logger.debug("rollback task : " + task);
+ updateJobOutput(task.getId(), ExecutableState.READY, null, null);
+ }
+ }
+ }
+ }
+
public void pauseJob(String jobId) {
AbstractExecutable job = getJob(jobId);
if (job == null) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d5b293/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
index 0684eff..910bd7e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
@@ -69,6 +69,8 @@ public enum ExecutableState {
VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.STOPPED);
+ //rollback
+ VALID_STATE_TRANSFER.put(ExecutableState.SUCCEED, ExecutableState.READY);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d5b293/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index 7718bfb..79346a5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -73,7 +73,6 @@ public class SaveStatisticsStep extends AbstractExecutable {
rs.putResource(statisticsFileName, is, System.currentTimeMillis());
} finally {
IOUtils.closeStream(is);
- fs.delete(statisticsFilePath, true);
}
decideCubingAlgorithm(newSegment, kylinConf);
http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d5b293/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 1af4394..12f9e2e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -134,7 +134,7 @@ public class JobController extends BasicController {
}
/**
- * Cancel a job
+ * Cancel/discard a job
*
* @return
* @throws IOException
@@ -175,6 +175,25 @@ public class JobController extends BasicController {
}
+ /**
+ * Rollback a job to the given step
+ *
+ * @return
+ * @throws IOException
+ */
+ @RequestMapping(value = "/{jobId}/steps/{stepId}/rollback", method = { RequestMethod.PUT })
+ @ResponseBody
+ public JobInstance rollback(@PathVariable String jobId, @PathVariable String stepId) {
+ try {
+ final JobInstance jobInstance = jobService.getJobInstance(jobId);
+ jobService.rollbackJob(jobInstance, stepId);
+ return jobService.getJobInstance(jobId);
+ } catch (Exception e) {
+ logger.error(e.getLocalizedMessage(), e);
+ throw new InternalErrorException(e);
+ }
+ }
+
public void setJobService(JobService jobService) {
this.jobService = jobService;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d5b293/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 19902f0..d6caddb 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -460,6 +460,13 @@ public class JobService extends BasicService implements InitializingBean {
}
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
+ public void rollbackJob(JobInstance job, String stepId) throws IOException, JobException {
+ lockSegment(job.getRelatedSegment());
+
+ getExecutableManager().rollbackJob(job.getId(), stepId);
+ }
+
+ @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
public JobInstance cancelJob(JobInstance job) throws IOException, JobException {
if (null == job.getRelatedCube() || null == getCubeManager().getCube(job.getRelatedCube())) {
getExecutableManager().discardJob(job.getId());