You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/10/06 02:39:29 UTC

[GitHub] [helix] NealSun96 commented on a change in pull request #1422: Fix currentState not being removed when pending message existed

NealSun96 commented on a change in pull request #1422:
URL: https://github.com/apache/helix/pull/1422#discussion_r499974159



##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -122,30 +122,37 @@ public void updatePreviousAssignedTasksStatus(
       Set<Integer> donePartitions = new TreeSet<>();
       for (int pId : pSet) {
         final String pName = pName(jobResource, pId);
-        TaskPartitionState currState = updateJobContextAndGetTaskCurrentState(currStateOutput,
+        TaskPartitionState currState = getTaskCurrentState(currStateOutput,
             jobResource, pId, pName, instance, jobCtx, jobTgtState);
 
-        if (!instance.equals(jobCtx.getAssignedParticipant(pId))) {
-          LOG.warn(
-              "Instance {} does not match the assigned participant for pId {} in the job context. Skipping task scheduling.",
-              instance, pId);
-          continue;
-        }
-
         // Check for pending state transitions on this (partition, instance). If there is a pending
         // state transition, we prioritize this pending state transition and set the assignment from
         // this pending state transition, essentially "waiting" until this pending message clears
+        // If there is a pending message, we should not continue to update the context because from
+        // controller prospective, state transition has not been completed yet if pending message
+        // still existed.
+        // If context gets updated here, controller might remove the job from RunTimeJobDAG which
+        // can cause the task's CurrentState not being removed when there is a pending message for
+        // that task.
         Message pendingMessage =
             currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance);
-        if (pendingMessage != null && !pendingMessage.getToState().equals(currState.name())) {
-          // If there is a pending message whose destination state is different from the current
-          // state, just make the same assignment as the pending message. This is essentially
-          // "waiting" until this state transition is complete
+        if (pendingMessage != null) {
           processTaskWithPendingMessage(pId, pName, instance, pendingMessage, jobState, currState,
               paMap, assignedPartitions);
           continue;
         }
 

Review comment:
       General comment: how did the legacy pipeline help masking this problem before? Or, was it not related to the legacy pipeline, but related to how we delete CurrentStates? The legacy pipeline logic processes the inflight jobs again, but it shouldn't make a difference? 

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -122,30 +122,37 @@ public void updatePreviousAssignedTasksStatus(
       Set<Integer> donePartitions = new TreeSet<>();
       for (int pId : pSet) {
         final String pName = pName(jobResource, pId);
-        TaskPartitionState currState = updateJobContextAndGetTaskCurrentState(currStateOutput,
+        TaskPartitionState currState = getTaskCurrentState(currStateOutput,
             jobResource, pId, pName, instance, jobCtx, jobTgtState);
 
-        if (!instance.equals(jobCtx.getAssignedParticipant(pId))) {
-          LOG.warn(
-              "Instance {} does not match the assigned participant for pId {} in the job context. Skipping task scheduling.",
-              instance, pId);
-          continue;
-        }
-
         // Check for pending state transitions on this (partition, instance). If there is a pending
         // state transition, we prioritize this pending state transition and set the assignment from
         // this pending state transition, essentially "waiting" until this pending message clears
+        // If there is a pending message, we should not continue to update the context because from
+        // controller prospective, state transition has not been completed yet if pending message
+        // still existed.
+        // If context gets updated here, controller might remove the job from RunTimeJobDAG which
+        // can cause the task's CurrentState not being removed when there is a pending message for
+        // that task.
         Message pendingMessage =
             currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance);
-        if (pendingMessage != null && !pendingMessage.getToState().equals(currState.name())) {

Review comment:
       It looks like it's just about the order of jobContext updating vs processing pending messages, if I understood you correctly. If that's the case, is there a reason why we removed the second section of the if statement here, as @jiajunwang mentioned?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org