You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2020/12/03 01:43:39 UTC

[helix] branch master updated: Mark tasks as DROPPED if assigned participant is not live (#1568)

This is an automated email from the ASF dual-hosted git repository.

alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d2b0de  Mark tasks as DROPPED if assigned participant is not live (#1568)
3d2b0de is described below

commit 3d2b0de955fbf95df062a7958852487535eabdd7
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Wed Dec 2 17:43:31 2020 -0800

    Mark tasks as DROPPED if assigned participant is not live (#1568)
    
    In this commit, when the workflows or jobs are being stopped,
    if the assigned participant becomes offline for the task, the
    task would be marked as DROPPED which allows the workflows
    and jobs to go to the STOPPED state.
---
 .../apache/helix/task/AbstractTaskDispatcher.java  |   2 +-
 .../java/org/apache/helix/task/JobDispatcher.java  |  18 +-
 .../task/TestStoppingQueueFailToStop.java          | 126 ------------
 .../task/TestStoppingWorkflowAndJob.java           | 213 +++++++++++++++++++++
 4 files changed, 231 insertions(+), 128 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 76359fe..90019b8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -837,7 +837,7 @@ public abstract class AbstractTaskDispatcher {
    * @param state
    * @return
    */
-  private boolean isTaskNotInTerminalState(TaskPartitionState state) {
+  protected static boolean isTaskNotInTerminalState(TaskPartitionState state) {
     return state != TaskPartitionState.COMPLETED && state != TaskPartitionState.TASK_ABORTED
         && state != TaskPartitionState.ERROR;
   }
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 218fe61..928382d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -161,6 +161,9 @@ public class JobDispatcher extends AbstractTaskDispatcher {
       // TIMING_OUT/FAILING/ABORTING job can't be stopped, because all tasks are being aborted
       // Update running status in workflow context
       if (jobTgtState == TargetState.STOP) {
+        // If the assigned instance is no longer live, so mark it as DROPPED in the context
+        markPartitionsWithoutLiveInstance(jobCtx, liveInstances);
+        
         if (jobState != TaskState.NOT_STARTED && TaskUtil.checkJobStopped(jobCtx)) {
           workflowCtx.setJobState(jobName, TaskState.STOPPED);
         } else {
@@ -198,7 +201,6 @@ public class JobDispatcher extends AbstractTaskDispatcher {
       Collection<String> liveInstances, CurrentStateOutput currStateOutput,
       WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
       WorkflowControllerDataProvider cache) {
-
     // Used to keep track of tasks that have already been assigned to instances.
     // InstanceName -> Set of task partitions assigned to that instance in this iteration
     Map<String, Set<Integer>> assignedPartitions = new HashMap<>();
@@ -525,4 +527,18 @@ public class JobDispatcher extends AbstractTaskDispatcher {
       }
     }
   }
+
+  protected void markPartitionsWithoutLiveInstance(JobContext jobCtx,
+      Collection<String> liveInstances) {
+    for (int partitionNumber : jobCtx.getPartitionSet()) {
+      TaskPartitionState state = jobCtx.getPartitionState(partitionNumber);
+      if (isTaskNotInTerminalState(state)) {
+        String assignedParticipant = jobCtx.getAssignedParticipant(partitionNumber);
+        if (assignedParticipant != null && !liveInstances.contains(assignedParticipant)) {
+          // The assigned instance is no longer live, so mark it as DROPPED in the context
+          jobCtx.setPartitionState(partitionNumber, TaskPartitionState.DROPPED);
+        }
+      }
+    }
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStoppingQueueFailToStop.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestStoppingQueueFailToStop.java
deleted file mode 100644
index 744dbda..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestStoppingQueueFailToStop.java
+++ /dev/null
@@ -1,126 +0,0 @@
-package org.apache.helix.integration.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.TestHelper;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.model.MasterSlaveSMD;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.task.JobConfig;
-import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskFactory;
-import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
-import org.apache.helix.task.TaskUtil;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-
-/**
- * Test to check if waitToStop method correctly throws an Exception if Queue stuck in STOPPING
- * state.
- */
-public class TestStoppingQueueFailToStop extends TaskTestBase {
-  private static final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB;
-  private CountDownLatch latch = new CountDownLatch(1);
-
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    _numPartitions = 1;
-    _numNodes = 3;
-    super.beforeClass();
-
-    // Stop participants that have been started in super class
-    for (int i = 0; i < _numNodes; i++) {
-      super.stopParticipant(i);
-      Assert.assertFalse(_participants[i].isConnected());
-    }
-
-    // Start new participants that have new TaskStateModel (NewMockTask) information
-    _participants = new MockParticipantManager[_numNodes];
-    for (int i = 0; i < _numNodes; i++) {
-      Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
-      taskFactoryReg.put(NewMockTask.TASK_COMMAND, NewMockTask::new);
-      String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
-      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
-      // Register a Task state model factory.
-      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
-      stateMachine.registerStateModelFactory("Task",
-          new TaskStateModelFactory(_participants[i], taskFactoryReg));
-      _participants[i].syncStart();
-    }
-  }
-
-  @Test
-  public void testStoppingQueueFailToStop() throws Exception {
-    String jobQueueName = TestHelper.getTestMethodName();
-    JobConfig.Builder jobBuilder0 =
-        new JobConfig.Builder().setWorkflow(jobQueueName).setTargetResource(DATABASE)
-            .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
-            .setCommand(MockTask.TASK_COMMAND)
-            .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "100000"));
-
-    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
-    jobQueue.enqueueJob("JOB0", jobBuilder0);
-    _driver.start(jobQueue.build());
-    _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, "JOB0"),
-        TaskState.IN_PROGRESS);
-    boolean exceptionHappened = false;
-    try {
-      _driver.waitToStop(jobQueueName, 5000L);
-    } catch (HelixException e) {
-      exceptionHappened = true;
-    }
-    _driver.pollForWorkflowState(jobQueueName, TaskState.STOPPING);
-    Assert.assertTrue(exceptionHappened);
-    latch.countDown();
-  }
-
-  /**
-   * A mock task that extents MockTask class and stuck in running when cancel is called.
-   */
-  private class NewMockTask extends MockTask {
-
-    NewMockTask(TaskCallbackContext context) {
-      super(context);
-    }
-
-    @Override
-    public void cancel() {
-      try {
-        latch.await();
-      } catch (Exception e) {
-        // Pass
-      }
-      super.cancel();
-    }
-  }
-}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStoppingWorkflowAndJob.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestStoppingWorkflowAndJob.java
new file mode 100644
index 0000000..ad24dc1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestStoppingWorkflowAndJob.java
@@ -0,0 +1,213 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+/**
+ * Test to check if workflow correctly behaves when it's target state is set to STOP and the tasks
+ * have some delay in their cancellation.
+ */
+public class TestStoppingWorkflowAndJob extends TaskTestBase {
+  private static final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB;
+  private CountDownLatch latch = new CountDownLatch(1);
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numPartitions = 1;
+    _numNodes = 3;
+    super.beforeClass();
+
+    // Stop participants that have been started in super class
+    for (int i = 0; i < _numNodes; i++) {
+      super.stopParticipant(i);
+      Assert.assertFalse(_participants[i].isConnected());
+    }
+
+    // Start new participants that have new TaskStateModel (NewMockTask) information
+    _participants = new MockParticipantManager[_numNodes];
+    for (int i = 0; i < _numNodes; i++) {
+      startParticipantAndRegisterNewMockTask(i);
+    }
+  }
+
+  @Test
+  public void testStoppingQueueFailToStop() throws Exception {
+    // Test to check if waitToStop method correctly throws an Exception if Queue stuck in STOPPING
+    // state.
+    String jobQueueName = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder0 =
+        new JobConfig.Builder().setWorkflow(jobQueueName).setTargetResource(DATABASE)
+            .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+            .setCommand(MockTask.TASK_COMMAND)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "100000"));
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+    jobQueue.enqueueJob("JOB0", jobBuilder0);
+    _driver.start(jobQueue.build());
+    _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, "JOB0"),
+        TaskState.IN_PROGRESS);
+    boolean exceptionHappened = false;
+    try {
+      _driver.waitToStop(jobQueueName, 5000L);
+    } catch (HelixException e) {
+      exceptionHappened = true;
+    }
+    _driver.pollForWorkflowState(jobQueueName, TaskState.STOPPING);
+    Assert.assertTrue(exceptionHappened);
+    latch.countDown();
+  }
+
+  @Test(dependsOnMethods = "testStoppingQueueFailToStop")
+  public void testStopWorkflowInstanceOffline() throws Exception {
+    // Test to check if workflow and jobs go to STOPPED state when the assigned participants of the
+    // tasks are not live anymore.
+    latch = new CountDownLatch(1);
+    int numberOfTasks = 10;
+
+    // Stop all participant except participant0
+    for (int i = 1; i < _numNodes; i++) {
+      super.stopParticipant(i);
+      Assert.assertFalse(_participants[i].isConnected());
+    }
+
+    // Start a new workflow
+    String workflowName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setWorkflow(workflowName).setNumberOfTasks(numberOfTasks)
+            .setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+    Workflow.Builder workflowBuilder =
+        new Workflow.Builder(workflowName).addJob(jobName, jobBuilder);
+
+    // Start the workflow and make sure it goes to IN_PROGRESS state.
+    _driver.start(workflowBuilder.build());
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.IN_PROGRESS);
+
+    // Make sure all of the tasks within the job are assigned and have RUNNING state
+    String participant0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
+    for (int i = 0; i < numberOfTasks; i++) {
+      final int taskNumber = i;
+      Assert.assertTrue(TestHelper.verify(() -> (TaskPartitionState.RUNNING
+          .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName))
+              .getPartitionState(taskNumber))
+          && participant0
+              .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName))
+                  .getAssignedParticipant(taskNumber))),
+          TestHelper.WAIT_DURATION));
+    }
+
+    // STOP the workflow and make sure it goes to STOPPING state
+    _driver.stop(workflowName);
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.STOPPING);
+
+    // Stop the remaining participant and the controller
+    _controller.syncStop();
+    super.stopParticipant(0);
+
+    // Start the controller and make sure the workflow and job go to STOPPED state
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, CONTROLLER_PREFIX);
+    _controller.syncStart();
+
+    // Start other participants to allow controller process the workflows. Otherwise due to no
+    // global capacity, the workflows will not be processed
+    for (int i = 1; i < _numNodes; i++) {
+      startParticipantAndRegisterNewMockTask(i);
+    }
+
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+        TaskState.STOPPED);
+    _driver.pollForWorkflowState(workflowName, TaskState.STOPPED);
+
+    // Make sure all of the tasks within the job are assigned and have DROPPED state
+    for (int i = 0; i < numberOfTasks; i++) {
+      final int taskNumber = i;
+      Assert.assertTrue(TestHelper.verify(() -> (TaskPartitionState.DROPPED
+          .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName))
+              .getPartitionState(taskNumber))
+          && participant0
+              .equals(_driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName))
+                  .getAssignedParticipant(taskNumber))),
+          TestHelper.WAIT_DURATION));
+    }
+    latch.countDown();
+  }
+
+  private void startParticipantAndRegisterNewMockTask(int participantIndex) {
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+    taskFactoryReg.put(NewMockTask.TASK_COMMAND, NewMockTask::new);
+    String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + participantIndex);
+    _participants[participantIndex] =
+        new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+    // Register a Task state model factory.
+    StateMachineEngine stateMachine = _participants[participantIndex].getStateMachineEngine();
+    stateMachine.registerStateModelFactory("Task",
+        new TaskStateModelFactory(_participants[participantIndex], taskFactoryReg));
+    _participants[participantIndex].syncStart();
+  }
+
+  /**
+   * A mock task that extents MockTask class and stuck in running when cancel is called.
+   */
+  private class NewMockTask extends MockTask {
+
+    NewMockTask(TaskCallbackContext context) {
+      super(context);
+    }
+
+    @Override
+    public void cancel() {
+      try {
+        latch.await();
+      } catch (Exception e) {
+        // Pass
+      }
+      super.cancel();
+    }
+  }
+}