You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/06/22 22:57:33 UTC

[14/50] [abbrv] helix git commit: Support cancel tasks with synchronized check task status

Support cancel tasks with synchronized check task status

Currently, in Helix, cancel and stop a job does not check subtasks status. In this rb:
1. Add new API to support sync stopping a workflow/queue
2. Controller side check subtasks are stopped before mark job status.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/470b514f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/470b514f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/470b514f

Branch: refs/heads/master
Commit: 470b514f3cc1f6d879938112bc862ab2ba22378d
Parents: 42273bb
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Feb 9 14:06:57 2017 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Feb 9 14:06:57 2017 -0800

----------------------------------------------------------------------
 .../org/apache/helix/task/JobRebalancer.java    | 49 ++++++++++++++++----
 .../java/org/apache/helix/task/TaskDriver.java  | 36 +++++++++++++-
 .../org/apache/helix/task/TaskRebalancer.java   |  5 +-
 .../java/org/apache/helix/task/TaskState.java   |  5 +-
 .../apache/helix/integration/task/MockTask.java |  7 ++-
 .../task/TestTaskRebalancerStopResume.java      | 42 +++++++++++++++++
 6 files changed, 128 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/470b514f/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index bd7e819..bed81cd 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -19,15 +19,6 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-import org.apache.helix.*;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.*;
-import org.apache.log4j.Logger;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -40,6 +31,23 @@ import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
 /**
  * Custom rebalancer implementation for the {@code Job} in task model.
  */
@@ -191,10 +199,16 @@ public class JobRebalancer extends TaskRebalancer {
     TargetState jobTgtState = workflowConfig.getTargetState();
     // Update running status in workflow context
     if (jobTgtState == TargetState.STOP) {
-      workflowCtx.setJobState(jobResource, TaskState.STOPPED);
+      if (checkJobStopped(jobCtx)) {
+        workflowCtx.setJobState(jobResource, TaskState.STOPPED);
+      } else {
+        workflowCtx.setJobState(jobResource, TaskState.STOPPING);
+      }
       // Workflow has been stopped if all in progress jobs are stopped
       if (isWorkflowStopped(workflowCtx, workflowConfig)) {
         workflowCtx.setWorkflowState(TaskState.STOPPED);
+      } else {
+        workflowCtx.setWorkflowState(TaskState.STOPPING);
       }
     } else {
       workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
@@ -657,6 +671,21 @@ public class JobRebalancer extends TaskRebalancer {
   }
 
   /**
+   * Check whether tasks are not in final states
+   * @param jobContext The job context
+   * @return           False if still tasks not in final state. Otherwise return true
+   */
+  private boolean checkJobStopped(JobContext jobContext) {
+    for (int partition : jobContext.getPartitionSet()) {
+      TaskPartitionState taskState = jobContext.getPartitionState(partition);
+      if (taskState != null && taskState.equals(TaskPartitionState.RUNNING)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
    * Computes the partition name given the resource name and partition id.
    */
   private String pName(String resource, int pId) {

http://git-wip-us.apache.org/repos/asf/helix/blob/470b514f/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 5e39e17..c922b18 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -692,12 +692,44 @@ public class TaskDriver {
   }
 
   /**
-   * Public method to stop a workflow/queue.
+   * Public async method to stop a workflow/queue.
+   *
+   * This call only send STOP command to Helix, it does not check
+   * whether the workflow (all jobs) has been stopped yet.
    *
    * @param workflow
    */
-  public void stop(String workflow) {
+  public void stop(String workflow) throws InterruptedException {
+    setWorkflowTargetState(workflow, TargetState.STOP);
+  }
+
+  /**
+   * Public sync method to stop a workflow/queue with timeout
+   *
+   * Basically the workflow and all of its jobs has been stopped if this method return success.
+   *
+   * @param workflow  The workflow name
+   * @param timeout   The timeout for stopping workflow/queue in milisecond
+   */
+  public void waitToStop(String workflow, long timeout) throws InterruptedException {
     setWorkflowTargetState(workflow, TargetState.STOP);
+    long endTime = System.currentTimeMillis() + timeout;
+
+    while (System.currentTimeMillis() <= endTime) {
+      WorkflowContext workflowContext = getWorkflowContext(workflow);
+
+      if (workflowContext == null || !workflowContext.getWorkflowState()
+          .equals(TaskState.STOPPED)) {
+        Thread.sleep(1000);
+      } else {
+        // Successfully stopped
+        return;
+      }
+    }
+
+    // Failed to stop with timeout
+    throw new HelixException(String
+        .format("Fail to stop the workflow/queue %s with in %d milliseconds.", workflow, timeout));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/470b514f/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index d4ac1b8..27741ca 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -114,9 +114,10 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
   protected boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
     for (String job : cfg.getJobDag().getAllNodes()) {
       TaskState jobState = ctx.getJobState(job);
-      if (jobState != null && jobState != TaskState.COMPLETED && jobState != TaskState.FAILED
-          && jobState != TaskState.STOPPED)
+      if (jobState != null && (jobState.equals(TaskState.IN_PROGRESS) || jobState
+          .equals(TaskState.STOPPING))) {
         return false;
+      }
     }
     return true;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/470b514f/helix-core/src/main/java/org/apache/helix/task/TaskState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
index 1000a9b..4e12f2d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
@@ -27,7 +27,6 @@ public enum TaskState {
    * The task has yet to start
    */
   NOT_STARTED,
-
   /**
    * The task is in progress.
    */
@@ -37,6 +36,10 @@ public enum TaskState {
    */
   STOPPED,
   /**
+   * The task is in stopping process. Will complete if subtasks are stopped or completed
+   */
+  STOPPING,
+  /**
    * The task has failed. It cannot be resumed.
    */
   FAILED,

http://git-wip-us.apache.org/repos/asf/helix/blob/470b514f/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
index 948e8f3..0502f8e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
@@ -36,7 +36,9 @@ public class MockTask extends UserContentStore implements Task {
   public static final String ERROR_MESSAGE = "ErrorMessage";
   public static final String FAILURE_COUNT_BEFORE_SUCCESS = "FailureCountBeforeSuccess";
   public static final String SUCCESS_COUNT_BEFORE_FAIL = "SuccessCountBeforeFail";
+  public static final String NOT_ALLOW_TO_CANCEL = "NotAllowToCancel";
   private final long _delay;
+  private volatile boolean _notAllowToCancel;
   private volatile boolean _canceled;
   private TaskResult.Status _taskResultStatus;
   private boolean _throwException;
@@ -57,6 +59,9 @@ public class MockTask extends UserContentStore implements Task {
     }
 
     _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 100L;
+    _notAllowToCancel = cfg.containsKey(NOT_ALLOW_TO_CANCEL)
+        ? Boolean.parseBoolean(cfg.get(NOT_ALLOW_TO_CANCEL))
+        : false;
     _taskResultStatus = cfg.containsKey(TASK_RESULT_STATUS) ?
         TaskResult.Status.valueOf(cfg.get(TASK_RESULT_STATUS)) :
         TaskResult.Status.COMPLETED;
@@ -77,7 +82,7 @@ public class MockTask extends UserContentStore implements Task {
     long expiry = System.currentTimeMillis() + _delay;
     long timeLeft;
     while (System.currentTimeMillis() < expiry) {
-      if (_canceled) {
+      if (_canceled && !_notAllowToCancel) {
         timeLeft = expiry - System.currentTimeMillis();
         return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
             : timeLeft));

http://git-wip-us.apache.org/repos/asf/helix/blob/470b514f/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index bc02148..2b0d38c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -21,6 +21,7 @@ package org.apache.helix.integration.task;
 
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,8 +37,10 @@ import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.ScheduleConfig;
+import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
@@ -46,6 +49,7 @@ import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -479,6 +483,44 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
     System.out.println("END " + queueName + " at " + new Date(System.currentTimeMillis()));
   }
 
+  @Test
+  public void testStopWorkflowInStoppingState() throws InterruptedException {
+    final String workflowName = TestHelper.getTestMethodName();
+
+    // Create a workflow
+    Workflow.Builder builder = new Workflow.Builder(workflowName);
+
+    // Add 2 jobs
+    Map<String, String> jobCommandConfigMap = new HashMap<String, String>();
+    jobCommandConfigMap.put(MockTask.TIMEOUT_CONFIG, "1000000");
+    jobCommandConfigMap.put(MockTask.NOT_ALLOW_TO_CANCEL, String.valueOf(true));
+    List<TaskConfig> taskConfigs = ImmutableList
+        .of(new TaskConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTaskId("testTask")
+            .build());
+    JobConfig.Builder job1 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+   .addTaskConfigs(taskConfigs)
+        .setJobCommandConfigMap(jobCommandConfigMap);
+    String job1Name = "Job1";
+
+    JobConfig.Builder job2 =
+        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).addTaskConfigs(taskConfigs);
+    String job2Name = "Job2";
+
+    builder.addJob(job1Name, job1);
+    builder.addJob(job2Name, job2);
+
+    _driver.start(builder.build());
+    Thread.sleep(2000);
+    _driver.stop(workflowName);
+    _driver.pollForWorkflowState(workflowName, TaskState.STOPPING);
+
+    // Expect job and workflow stuck in STOPPING state.
+    WorkflowContext workflowContext = _driver.getWorkflowContext(workflowName);
+    Assert.assertEquals(
+        workflowContext.getJobState(TaskUtil.getNamespacedJobName(workflowName, job1Name)),
+        TaskState.STOPPING);
+  }
+
   private void verifyJobDeleted(String queueName, String jobName) throws Exception {
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();