You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2020/10/06 20:23:20 UTC

[helix] branch master updated: Fix currentState not being removed when pending message exists (#1422)

This is an automated email from the ASF dual-hosted git repository.

alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 563a6c1  Fix currentState not being removed when pending message exists (#1422)
563a6c1 is described below

commit 563a6c1356ea62905f3318fbd145fc5d19419499
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Tue Oct 6 13:23:10 2020 -0700

    Fix currentState not being removed when pending message exists (#1422)
    
    This commit makes sure that the controller drops the current state of
    the jobs that have just finished and have pending messages.
---
 .../apache/helix/task/AbstractTaskDispatcher.java  | 36 +++++++------
 .../task/TestDropCurrentStateRunningTask.java      | 62 ++++++++++++++++++++++
 2 files changed, 82 insertions(+), 16 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 6e86f52..35c4c39 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -122,30 +122,37 @@ public abstract class AbstractTaskDispatcher {
       Set<Integer> donePartitions = new TreeSet<>();
       for (int pId : pSet) {
         final String pName = pName(jobResource, pId);
-        TaskPartitionState currState = updateJobContextAndGetTaskCurrentState(currStateOutput,
+        TaskPartitionState currState = getTaskCurrentState(currStateOutput,
             jobResource, pId, pName, instance, jobCtx, jobTgtState);
 
-        if (!instance.equals(jobCtx.getAssignedParticipant(pId))) {
-          LOG.warn(
-              "Instance {} does not match the assigned participant for pId {} in the job context. Skipping task scheduling.",
-              instance, pId);
-          continue;
-        }
-
         // Check for pending state transitions on this (partition, instance). If there is a pending
         // state transition, we prioritize this pending state transition and set the assignment from
         // this pending state transition, essentially "waiting" until this pending message clears
+        // If there is a pending message, we should not continue to update the context because from
+        // controller prospective, state transition has not been completed yet if pending message
+        // still existed.
+        // If context gets updated here, controller might remove the job from RunTimeJobDAG which
+        // can cause the task's CurrentState not being removed when there is a pending message for
+        // that task.
         Message pendingMessage =
             currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance);
-        if (pendingMessage != null && !pendingMessage.getToState().equals(currState.name())) {
-          // If there is a pending message whose destination state is different from the current
-          // state, just make the same assignment as the pending message. This is essentially
-          // "waiting" until this state transition is complete
+        if (pendingMessage != null) {
           processTaskWithPendingMessage(pId, pName, instance, pendingMessage, jobState, currState,
               paMap, assignedPartitions);
           continue;
         }
 
+        // Update job context based on current state
+        updatePartitionInformationInJobContext(currStateOutput, jobResource, currState, jobCtx,
+            pId, pName, instance);
+
+        if (!instance.equals(jobCtx.getAssignedParticipant(pId))) {
+          LOG.warn(
+              "Instance {} does not match the assigned participant for pId {} in the job context. Skipping task scheduling.",
+              instance, pId);
+          continue;
+        }
+
         // Get AssignableInstance for this instance and TaskConfig for releasing resources
         String quotaType = jobCfg.getJobType();
         String taskId;
@@ -365,7 +372,7 @@ public abstract class AbstractTaskDispatcher {
     }
   }
 
-  private TaskPartitionState updateJobContextAndGetTaskCurrentState(
+  private TaskPartitionState getTaskCurrentState(
       CurrentStateOutput currentStateOutput, String jobResource, Integer pId, String pName,
       String instance, JobContext jobCtx, TargetState jobTgtState) {
     String currentStateString =
@@ -389,9 +396,6 @@ public abstract class AbstractTaskDispatcher {
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
-    // Update job context based on current state
-    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
-        pId, pName, instance);
     return currentState;
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java
index c29cc45..f3e3dcf 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java
@@ -28,6 +28,7 @@ import org.apache.helix.ZkTestHelper;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
@@ -47,6 +48,7 @@ public class TestDropCurrentStateRunningTask extends TaskTestBase {
   @BeforeClass
   public void beforeClass() throws Exception {
     _numNodes = 3;
+    _numPartitions = 1;
     super.beforeClass();
 
     // Stop participants that have been started in super class
@@ -146,5 +148,65 @@ public class TestDropCurrentStateRunningTask extends TaskTestBase {
                         && _manager.getHelixDataAccessor().getBaseDataAccessor()
                             .get(currentStatePathP1, new Stat(), AccessOption.PERSISTENT) == null),
                     TestHelper.WAIT_DURATION));
+
+    _driver.waitToStop(workflowName, TestHelper.WAIT_DURATION);
+  }
+
+  @Test(dependsOnMethods = "testDropCurrentStateRunningTask")
+  public void testJobCurrentStateDroppedAfterCompletion() throws Exception {
+    // Stop participants that have been started in previous test and start one of them
+    for (int i = 0; i < _numNodes; i++) {
+      super.stopParticipant(i);
+      Assert.assertFalse(_participants[i].isConnected());
+    }
+    _participants = new MockParticipantManager[_numNodes];
+    startParticipant(0);
+
+    String jobQueueName = TestHelper.getTestMethodName();
+
+    JobConfig.Builder jobBuilderCompleted =
+        JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG).setMaxAttemptsPerTask(1)
+            .setWorkflow(jobQueueName)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "10"));
+
+    // Task gets timed out in 10 seconds because the the default value is 10 seconds in
+    // DEFAULT_JOB_CONFIG
+    JobConfig.Builder jobBuilderTimedOut =
+        JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG).setMaxAttemptsPerTask(1)
+            .setWorkflow(jobQueueName)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "100000"));
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName, 0, 100);
+
+    for (int i = 0; i < 20; i++) {
+      jobQueue.enqueueJob("job" + i, jobBuilderCompleted);
+    }
+
+    jobQueue.enqueueJob("job" + 20, jobBuilderTimedOut);
+
+    _driver.start(jobQueue.build());
+
+    for (int i = 0; i < 20; i++) {
+      _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, "job" + i),
+          TaskState.COMPLETED);
+    }
+    _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, "job" + 20),
+        TaskState.FAILED);
+
+    String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
+    ZkClient clientP0 = (ZkClient) _participants[0].getZkClient();
+    String sessionIdP0 = ZkTestHelper.getSessionId(clientP0);
+
+    for (int i = 0; i < 21; i++) {
+      String currentStatePathP0 =
+          "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0 + "/CURRENTSTATES/" + sessionIdP0 + "/"
+              + TaskUtil.getNamespacedJobName(jobQueueName, "job" + i);
+      boolean isCurrentStateRemoved = TestHelper.verify(() -> {
+        ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor()
+            .get(currentStatePathP0, new Stat(), AccessOption.PERSISTENT);
+        return record == null;
+      }, TestHelper.WAIT_DURATION);
+      Assert.assertTrue(isCurrentStateRemoved);
+    }
   }
 }