You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/11/24 10:14:32 UTC

kylin git commit: KYLIN-277 add API for pause and rollback job

Repository: kylin
Updated Branches:
  refs/heads/master 827205f17 -> e1d5b2938


KYLIN-277 add API for pause and rollback job


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e1d5b293
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e1d5b293
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e1d5b293

Branch: refs/heads/master
Commit: e1d5b29385a5c044fe7f60d27d59baf82499b3de
Parents: 827205f
Author: shaofengshi <sh...@apache.org>
Authored: Thu Nov 24 18:14:27 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Nov 24 18:14:27 2016 +0800

----------------------------------------------------------------------
 .../kylin/job/execution/AbstractExecutable.java |  2 +-
 .../kylin/job/execution/ExecutableManager.java  | 18 +++++++++++++++++
 .../kylin/job/execution/ExecutableState.java    |  2 ++
 .../engine/mr/steps/SaveStatisticsStep.java     |  1 -
 .../kylin/rest/controller/JobController.java    | 21 +++++++++++++++++++-
 .../apache/kylin/rest/service/JobService.java   |  7 +++++++
 6 files changed, 48 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d5b293/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 551241b..cd9b033 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -82,7 +82,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
 
     protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
         setEndTime(System.currentTimeMillis());
-        if (!isDiscarded()) {
+        if (!isDiscarded() && !isRunnable()) {
             if (result.succeed()) {
                 getManager().updateJobOutput(getId(), ExecutableState.SUCCEED, null, result.output());
             } else {

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d5b293/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 52d4d1c..4351e31 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -292,6 +292,24 @@ public class ExecutableManager {
         updateJobOutput(jobId, ExecutableState.DISCARDED, null, null);
     }
 
+
+    public void rollbackJob(String jobId, String stepId) {
+        AbstractExecutable job = getJob(jobId);
+        if (job == null) {
+            return;
+        }
+
+        if (job instanceof DefaultChainedExecutable) {
+            List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
+            for (AbstractExecutable task : tasks) {
+                if (task.getId().compareTo(stepId) >= 0) {
+                    logger.debug("rollback task : " + task);
+                    updateJobOutput(task.getId(), ExecutableState.READY, null, null);
+                }
+            }
+        }
+    }
+
     public void pauseJob(String jobId) {
         AbstractExecutable job = getJob(jobId);
         if (job == null) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d5b293/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
index 0684eff..910bd7e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
@@ -69,6 +69,8 @@ public enum ExecutableState {
         VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.STOPPED);
 
 
+        //rollback
+        VALID_STATE_TRANSFER.put(ExecutableState.SUCCEED, ExecutableState.READY);
 
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d5b293/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index 7718bfb..79346a5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -73,7 +73,6 @@ public class SaveStatisticsStep extends AbstractExecutable {
                 rs.putResource(statisticsFileName, is, System.currentTimeMillis());
             } finally {
                 IOUtils.closeStream(is);
-                fs.delete(statisticsFilePath, true);
             }
 
             decideCubingAlgorithm(newSegment, kylinConf);

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d5b293/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 1af4394..12f9e2e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -134,7 +134,7 @@ public class JobController extends BasicController {
     }
 
     /**
-     * Cancel a job
+     * Cancel/discard a job
      * 
      * @return
      * @throws IOException
@@ -175,6 +175,25 @@ public class JobController extends BasicController {
 
     }
 
+    /**
+     * Rollback a job to the given step
+     *
+     * @return
+     * @throws IOException
+     */
+    @RequestMapping(value = "/{jobId}/steps/{stepId}/rollback", method = { RequestMethod.PUT })
+    @ResponseBody
+    public JobInstance rollback(@PathVariable String jobId, @PathVariable String stepId) {
+        try {
+            final JobInstance jobInstance = jobService.getJobInstance(jobId);
+            jobService.rollbackJob(jobInstance, stepId);
+            return jobService.getJobInstance(jobId);
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage(), e);
+            throw new InternalErrorException(e);
+        }
+    }
+
     public void setJobService(JobService jobService) {
         this.jobService = jobService;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d5b293/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 19902f0..d6caddb 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -460,6 +460,13 @@ public class JobService extends BasicService implements InitializingBean {
     }
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
+    public void rollbackJob(JobInstance job, String stepId) throws IOException, JobException {
+        lockSegment(job.getRelatedSegment());
+
+        getExecutableManager().rollbackJob(job.getId(), stepId);
+    }
+
+    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
     public JobInstance cancelJob(JobInstance job) throws IOException, JobException {
         if (null == job.getRelatedCube() || null == getCubeManager().getCube(job.getRelatedCube())) {
             getExecutableManager().discardJob(job.getId());