You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2018/09/21 22:43:02 UTC

helix git commit: Fix a bug in stopping workflows

Repository: helix
Updated Branches:
  refs/heads/master 1b4e0bbb8 -> 41ff38670


Fix a bug in stopping workflows

There was an edge case that came to our attention lately in the stop-workflow logic. The symptom we observed was that when a workflow is cancelled via TaskDriver, its tasks would flip-flop between STOPPED and RUNNING states. This was reproducible. Upon analysis, what was happening was that in AbstractTaskDispatcher, when the current Task's state is STOPPED, sometimes the Participant would try to process a message going to COMPLETE/ERROR state, and since there is no direct transition in the Task state model going from STOPPED to COMPLETE/ERROR, it was first going to the intermediate state of RUNNING. In short, STOPPED->RUNNING was taking place, and when it hits the RUNNING state, the Controller would send it back to STOPPED since the target state for the workflow is STOP.

The fix for it was to simply add a check on the current Task state and the target state of the parent workflow before we pass the requested state assignment in AbstractTaskDispatcher. A test was added to ensure that workflows' tasks are stopped properly when stopped via TaskDriver.

Changelist:
    1. Add a check in AbstractTaskDispatcher before passing the requested state transition assignment that if the current Task state is STOPPED and the target state for the workflow is STOP, do not make the assignment and just continue (NOP)
    2. Add tests in TestStopWorkflow
    3. Ensure that quota resource is released at stop time for tasks
    4. Identify a unreachable code snippet


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/41ff3867
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/41ff3867
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/41ff3867

Branch: refs/heads/master
Commit: 41ff38670345e03c3274cb5f56468513026eacb8
Parents: 1b4e0bb
Author: Hunter Lee <hu...@linkedin.com>
Authored: Fri Aug 3 11:18:59 2018 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Sep 21 15:41:56 2018 -0700

----------------------------------------------------------------------
 .../helix/task/AbstractTaskDispatcher.java      |  93 ++++---
 .../integration/task/TestStopWorkflow.java      | 246 ++++++++++++++++++-
 2 files changed, 304 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/41ff3867/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 9a5b899..bf33d77 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -64,17 +64,34 @@ public abstract class AbstractTaskDispatcher {
         TaskPartitionState currState = updateJobContextAndGetTaskCurrentState(currStateOutput,
             jobResource, pId, pName, instance, jobCtx);
 
-        // Check for pending state transitions on this (partition, instance).
+        // Check for pending state transitions on this (partition, instance). If there is a pending
+        // state transition, we prioritize this pending state transition and set the assignment from
+        // this pending state transition, essentially "waiting" until this pending message clears
         Message pendingMessage =
             currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance);
 
         if (pendingMessage != null && !pendingMessage.getToState().equals(currState.name())) {
+          // If there is a pending message whose destination state is different from the current
+          // state, just make the same assignment as the pending message. This is essentially
+          // "waiting" until this state transition is complete
           processTaskWithPendingMessage(prevTaskToInstanceStateAssignment, pId, pName, instance,
               pendingMessage, jobState, currState, paMap, assignedPartitions);
           continue;
         }
 
-        // Process any requested state transitions.
+        // Get AssignableInstance for this instance and TaskConfig for releasing resources
+        String quotaType = jobCfg.getJobType();
+        AssignableInstance assignableInstance = assignableInstanceMap.get(instance);
+        String taskId;
+        if (TaskUtil.isGenericTaskJob(jobCfg)) {
+          taskId = jobCtx.getTaskIdForPartition(pId);
+        } else {
+          taskId = pName;
+        }
+        TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
+
+        // Process any requested state transitions. If there is a requested state transition, just
+        // "wait" until this state transition is complete
         String requestedStateStr =
             currStateOutput.getRequestedState(jobResource, new Partition(pName), instance);
         if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
@@ -85,6 +102,14 @@ public abstract class AbstractTaskDispatcher {
                 requestedState, instance));
           }
 
+          // For STOPPED tasks, if the targetState is STOP, we should not honor requestedState
+          // transition and make it a NOP
+          if (currState == TaskPartitionState.STOPPED && jobTgtState == TargetState.STOP) {
+            // This task is STOPPED and not going to be re-run, so release this task
+            assignableInstance.release(taskConfig, quotaType);
+            continue;
+          }
+
           paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
           assignedPartitions.add(pId);
           if (LOG.isDebugEnabled()) {
@@ -95,17 +120,6 @@ public abstract class AbstractTaskDispatcher {
           continue;
         }
 
-        // Get AssignableInstance for this instance and TaskConfig for releasing resources
-        String quotaType = jobCfg.getJobType();
-        AssignableInstance assignableInstance = assignableInstanceMap.get(instance);
-        String taskId;
-        if (TaskUtil.isGenericTaskJob(jobCfg)) {
-          taskId = jobCtx.getTaskIdForPartition(pId);
-        } else {
-          taskId = pName;
-        }
-        TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
-
         switch (currState) {
           case RUNNING: {
             TaskPartitionState nextState = TaskPartitionState.RUNNING;
@@ -128,23 +142,29 @@ public abstract class AbstractTaskDispatcher {
             }
           }
           break;
-          case STOPPED: {
-            TaskPartitionState nextState;
-            if (jobTgtState.equals(TargetState.START)) {
-              nextState = TaskPartitionState.RUNNING;
-            } else {
-              nextState = TaskPartitionState.STOPPED;
-              // This task is STOPPED and not going to be re-run, so release this task
-              assignableInstance.release(taskConfig, quotaType);
-            }
+        case STOPPED: {
+          // TODO: This case statement might be unreachable code - Hunter
+          // This code may need to be removed because once a task is STOPPED and its workflow's
+          // targetState is STOP, we do not assign that stopped task. Not assigning means it will
+          // not be included in previousAssignment map in the next rebalance. If it is not in
+          // prevInstanceToTaskAssignments, it will never hit this part of the code
+          // When the parent workflow is to be resumed (target state is START), then it will just be
+          // assigned as if it were being assigned for the first time
+          TaskPartitionState nextState;
+          if (jobTgtState.equals(TargetState.START)) {
+            nextState = TaskPartitionState.RUNNING;
+          } else {
+            nextState = TaskPartitionState.STOPPED;
+            // This task is STOPPED and not going to be re-run, so release this task
+            assignableInstance.release(taskConfig, quotaType);
+          }
+          paMap.put(pId, new JobRebalancer.PartitionAssignment(instance, nextState.name()));
+          assignedPartitions.add(pId);
 
-            paMap.put(pId, new JobRebalancer.PartitionAssignment(instance, nextState.name()));
-            assignedPartitions.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
-                  nextState, instance));
-            }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, nextState, instance));
           }
+        }
           break;
           case COMPLETED: {
             // The task has completed on this partition. Mark as such in the context object.
@@ -288,11 +308,25 @@ public abstract class AbstractTaskDispatcher {
     return currentState;
   }
 
+  /**
+   * Create an assignment based on an already-existing pending message. This effectively lets the
+   * Controller to "wait" until the pending state transition has been processed.
+   * @param prevAssignment
+   * @param pId
+   * @param pName
+   * @param instance
+   * @param pendingMessage
+   * @param jobState
+   * @param currState
+   * @param paMap
+   * @param assignedPartitions
+   */
   private void processTaskWithPendingMessage(ResourceAssignment prevAssignment, Integer pId,
       String pName, String instance, Message pendingMessage, TaskState jobState,
       TaskPartitionState currState, Map<Integer, PartitionAssignment> paMap,
       Set<Integer> assignedPartitions) {
 
+    // stateMap is a mapping of Instance -> TaskPartitionState (String)
     Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName));
     if (stateMap != null) {
       String prevState = stateMap.get(instance);
@@ -783,7 +817,8 @@ public abstract class AbstractTaskDispatcher {
         failedJobs++;
         if (!cfg.isJobQueue() && failedJobs > cfg.getFailureThreshold()) {
           ctx.setWorkflowState(TaskState.FAILED);
-          LOG.info("Workflow {} reached the failure threshold, so setting its state to FAILED.", cfg.getWorkflowId());
+          LOG.info("Workflow {} reached the failure threshold, so setting its state to FAILED.",
+              cfg.getWorkflowId());
           for (String jobToFail : cfg.getJobDag().getAllNodes()) {
             if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
               ctx.setJobState(jobToFail, TaskState.ABORTED);

http://git-wip-us.apache.org/repos/asf/helix/blob/41ff3867/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
index 8b23f56..4a25a57 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
@@ -1,17 +1,33 @@
 package org.apache.helix.integration.task;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskResult;
 import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-
 public class TestStopWorkflow extends TaskTestBase {
+  private boolean _taskFinishFlag = false;
+
   @BeforeClass
   public void beforeClass() throws Exception {
     _numPartitions = 1;
@@ -22,8 +38,7 @@ public class TestStopWorkflow extends TaskTestBase {
   public void testStopWorkflow() throws InterruptedException {
     String jobQueueName = TestHelper.getTestMethodName();
     JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
-        .setMaxAttemptsPerTask(1)
-        .setWorkflow(jobQueueName)
+        .setMaxAttemptsPerTask(1).setWorkflow(jobQueueName)
         .setJobCommandConfigMap(ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, "1"));
 
     JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
@@ -35,11 +50,230 @@ public class TestStopWorkflow extends TaskTestBase {
     _driver.pollForJobState(jobQueueName,
         TaskUtil.getNamespacedJobName(jobQueueName, "job2_will_fail"), TaskState.FAILED);
 
-    Assert.assertTrue(_driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.IN_PROGRESS));
+    Assert.assertTrue(
+        _driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.IN_PROGRESS));
 
     // Now stop the workflow, and it should be stopped because all jobs have completed or failed.
     _driver.waitToStop(jobQueueName, 4000);
+    _driver.pollForWorkflowState(jobQueueName, TaskState.STOPPED);
+
+    Assert.assertTrue(
+        _driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.STOPPED));
+  }
+
+  /**
+   * Tests that stopping a workflow does result in its task ending up in STOPPED state.
+   * @throws InterruptedException
+   */
+  @Test
+  public void testStopTask() throws InterruptedException {
+    stopTestSetup(1);
+
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
+    WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(workflowName);
+    configBuilder.setAllowOverlapJobAssignment(true);
+    workflowBuilder.setWorkflowConfig(configBuilder.build());
+
+    for (int i = 0; i < 1; i++) {
+      List<TaskConfig> taskConfigs = new ArrayList<>();
+      taskConfigs.add(new TaskConfig("StopTask", new HashMap<String, String>()));
+      JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand("Dummy")
+          .addTaskConfigs(taskConfigs).setJobCommandConfigMap(new HashMap<String, String>());
+      workflowBuilder.addJob("JOB" + i, jobConfigBulider);
+    }
+
+    _driver.start(workflowBuilder.build());
+    _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS);
+
+    // Stop the workflow
+    _driver.stop(workflowName);
+    _driver.pollForWorkflowState(workflowName, TaskState.STOPPED);
+
+    Assert.assertEquals(_driver.getWorkflowContext(_manager, workflowName).getWorkflowState(),
+        TaskState.STOPPED);
+  }
+
+  /**
+   * Tests that stop() indeed frees up quotas for tasks belonging to the stopped workflow.
+   * @throws InterruptedException
+   */
+  @Test
+  public void testStopTaskForQuota() throws InterruptedException {
+    stopTestSetup(1);
+
+    String workflowNameToStop = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilderToStop = new Workflow.Builder(workflowNameToStop);
+    WorkflowConfig.Builder configBuilderToStop = new WorkflowConfig.Builder(workflowNameToStop);
+    configBuilderToStop.setAllowOverlapJobAssignment(true);
+    workflowBuilderToStop.setWorkflowConfig(configBuilderToStop.build());
+
+    // First create 50 jobs so that all 40 threads will be taken up
+    for (int i = 0; i < 50; i++) {
+      List<TaskConfig> taskConfigs = new ArrayList<>();
+      taskConfigs.add(new TaskConfig("StopTask", new HashMap<String, String>()));
+      JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand("Dummy")
+          .addTaskConfigs(taskConfigs).setJobCommandConfigMap(new HashMap<String, String>());
+      workflowBuilderToStop.addJob("JOB" + i, jobConfigBulider);
+    }
+
+    _driver.start(workflowBuilderToStop.build());
+    _driver.pollForWorkflowState(workflowNameToStop, TaskState.IN_PROGRESS);
+
+    // Stop the workflow
+    _driver.stop(workflowNameToStop);
+
+    _driver.pollForWorkflowState(workflowNameToStop, TaskState.STOPPED);
+    Assert.assertEquals(_driver.getWorkflowContext(_manager, workflowNameToStop).getWorkflowState(),
+        TaskState.STOPPED); // Check that the workflow has been stopped
+
+    // Generate another workflow to be completed this time around
+    String workflowToComplete = TestHelper.getTestMethodName() + "ToComplete";
+    Workflow.Builder workflowBuilderToComplete = new Workflow.Builder(workflowToComplete);
+    WorkflowConfig.Builder configBuilderToComplete = new WorkflowConfig.Builder(workflowToComplete);
+    configBuilderToComplete.setAllowOverlapJobAssignment(true);
+    workflowBuilderToComplete.setWorkflowConfig(configBuilderToComplete.build());
+
+    // Create 20 jobs that should complete
+    for (int i = 0; i < 20; i++) {
+      List<TaskConfig> taskConfigs = new ArrayList<>();
+      taskConfigs.add(new TaskConfig("CompleteTask", new HashMap<String, String>()));
+      JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand("Dummy")
+          .addTaskConfigs(taskConfigs).setJobCommandConfigMap(new HashMap<String, String>());
+      workflowBuilderToComplete.addJob("JOB" + i, jobConfigBulider);
+    }
+
+    // Start the workflow to be completed
+    _driver.start(workflowBuilderToComplete.build());
+    _driver.pollForWorkflowState(workflowToComplete, TaskState.COMPLETED);
+    Assert.assertEquals(_driver.getWorkflowContext(_manager, workflowToComplete).getWorkflowState(),
+        TaskState.COMPLETED);
+  }
+
+  /**
+   * Test that there is no thread leak when stopping and resuming.
+   * @throws InterruptedException
+   */
+  @Test
+  public void testResumeTaskForQuota() throws InterruptedException {
+    stopTestSetup(1);
+
+    String workflowName_1 = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder_1 = new Workflow.Builder(workflowName_1);
+    WorkflowConfig.Builder configBuilder_1 = new WorkflowConfig.Builder(workflowName_1);
+    configBuilder_1.setAllowOverlapJobAssignment(true);
+    workflowBuilder_1.setWorkflowConfig(configBuilder_1.build());
+
+    // 30 jobs run first
+    for (int i = 0; i < 30; i++) {
+      List<TaskConfig> taskConfigs = new ArrayList<>();
+      taskConfigs.add(new TaskConfig("StopTask", new HashMap<String, String>()));
+      JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand("Dummy")
+          .addTaskConfigs(taskConfigs).setJobCommandConfigMap(new HashMap<String, String>());
+      workflowBuilder_1.addJob("JOB" + i, jobConfigBulider);
+    }
+
+    _driver.start(workflowBuilder_1.build());
+
+    Thread.sleep(2000L); // Sleep until each task really is in progress
+    _driver.stop(workflowName_1);
+    _driver.pollForWorkflowState(workflowName_1, TaskState.STOPPED);
+
+    _taskFinishFlag = false;
+    _driver.resume(workflowName_1);
+    Thread.sleep(2000L); // Sleep until each task really is in progress
+
+    // By now there should only be 30 threads occupied
+
+    String workflowName_2 = TestHelper.getTestMethodName() + "_2";
+    Workflow.Builder workflowBuilder_2 = new Workflow.Builder(workflowName_2);
+    WorkflowConfig.Builder configBuilder_2 = new WorkflowConfig.Builder(workflowName_2);
+    configBuilder_2.setAllowOverlapJobAssignment(true);
+    workflowBuilder_2.setWorkflowConfig(configBuilder_2.build());
+
+    // Try to run 10 jobs that complete
+    int numJobs = 10;
+    for (int i = 0; i < numJobs; i++) {
+      List<TaskConfig> taskConfigs = new ArrayList<>();
+      taskConfigs.add(new TaskConfig("CompleteTask", new HashMap<String, String>()));
+      JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand("Dummy")
+          .addTaskConfigs(taskConfigs).setJobCommandConfigMap(new HashMap<String, String>());
+      workflowBuilder_2.addJob("JOB" + i, jobConfigBulider);
+    }
+
+    // If these jobs complete successfully, then that means there is no thread leak
+    _driver.start(workflowBuilder_2.build());
+    Assert.assertEquals(_driver.pollForWorkflowState(workflowName_2, TaskState.COMPLETED),
+        TaskState.COMPLETED);
+  }
+
+  /**
+   * Sets up an environment to make stop task testing easy. Shuts down all Participants and starts
+   * only one Participant.
+   */
+  private void stopTestSetup(int numNodes) {
+    // Set task callbacks
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    TaskFactory taskFactory = new TaskFactory() {
+      @Override
+      public Task createNewTask(TaskCallbackContext context) {
+        return new StopTask(context);
+      }
+    };
+    TaskFactory taskFactoryComplete = new TaskFactory() {
+      @Override
+      public Task createNewTask(TaskCallbackContext context) {
+        return new MockTask(context);
+      }
+    };
+    taskFactoryReg.put("StopTask", taskFactory);
+    taskFactoryReg.put("CompleteTask", taskFactoryComplete);
+
+    stopParticipants();
+
+    for (int i = 0; i < numNodes; i++) {
+      String instanceName = _participants[i].getInstanceName();
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task",
+          new TaskStateModelFactory(_participants[i], taskFactoryReg));
+
+      _participants[i].syncStart();
+    }
+  }
+
+  /**
+   * A mock task class that models a short-lived task to be stopped.
+   */
+  private class StopTask extends MockTask {
+    StopTask(TaskCallbackContext context) {
+      super(context);
+    }
+
+    @Override
+    public TaskResult run() {
+      while (!_taskFinishFlag) {
+        try {
+          Thread.sleep(1000L);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+
+      // This wait is to prevent the task from completing before being stopped
+      try {
+        Thread.sleep(500L);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+
+      return new TaskResult(TaskResult.Status.COMPLETED, "");
+    }
 
-    Assert.assertTrue(_driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.STOPPED));
+    @Override
+    public void cancel() {
+      _taskFinishFlag = true;
+    }
   }
-}
\ No newline at end of file
+}