You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/10/04 01:49:45 UTC

[4/7] helix git commit: Be able to stop workflow when no job is running.

Be able to stop workflow when no job is running.

Currently, to stop a workflow, the target state of the workflow is set to STOP, then when each job(as a resource in ideal state) was processed in job rebalancer, it will check whether all the jobs in the workflow is done(not in IN_PROGRESS or STOPPING) and set the workflow state to be STOP.
However, if all the jobs are already done, there’s no job in ideal state to process, so the workflow state never gets a chance to be set to STOP.

This commit adds a check in workflow rebalancer to set the state when all jobs are already done.

A test is added to test specifically this case.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/408082a3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/408082a3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/408082a3

Branch: refs/heads/master
Commit: 408082a33d91f84556c3da31232fb6d4097b4371
Parents: 94f3961
Author: Weihan Kong <wk...@linkedin.com>
Authored: Mon Feb 13 13:52:16 2017 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Tue Oct 3 15:08:33 2017 -0700

----------------------------------------------------------------------
 .../apache/helix/task/WorkflowRebalancer.java   |  4 ++
 .../integration/task/TestStopWorkflow.java      | 45 ++++++++++++++++++++
 2 files changed, 49 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/408082a3/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 830f93a..8e72f7a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -76,6 +76,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
 
     if (targetState == TargetState.STOP) {
       LOG.info("Workflow " + workflow + "is marked as stopped.");
+      if (isWorkflowStopped(workflowCtx, workflowCfg)) {
+        workflowCtx.setWorkflowState(TaskState.STOPPED);
+        TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
+      }
       return buildEmptyAssignment(workflow, currStateOutput);
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/408082a3/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
new file mode 100644
index 0000000..b641698
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
@@ -0,0 +1,45 @@
+package org.apache.helix.integration.task;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestStopWorkflow extends TaskTestBase {
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numParitions = 1;
+    super.beforeClass();
+  }
+
+  @Test
+  public void testStopWorkflow() throws InterruptedException {
+    String jobQueueName = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setMaxAttemptsPerTask(1)
+        .setWorkflow(jobQueueName)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, "1"));
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+    jobQueue.enqueueJob("job1_will_succeed", jobBuilder);
+    jobQueue.enqueueJob("job2_will_fail", jobBuilder);
+    _driver.start(jobQueue.build());
+
+    // job1 should succeed and job2 should fail, wait until that happens
+    _driver.pollForJobState(jobQueueName,
+        TaskUtil.getNamespacedJobName(jobQueueName, "job2_will_fail"), TaskState.FAILED);
+
+    Assert.assertTrue(_driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.IN_PROGRESS));
+
+    // Now stop the workflow, and it should be stopped because all jobs have completed or failed.
+    _driver.waitToStop(jobQueueName, 4000);
+
+    Assert.assertTrue(_driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.STOPPED));
+  }
+}