You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/05/04 16:34:38 UTC

[GitHub] [helix] alirezazamani opened a new pull request #994: Remove the scheduling decision based on PreviousAssignment

alirezazamani opened a new pull request #994:
URL: https://github.com/apache/helix/pull/994


   ### Issues
   - [x] My PR addresses the following Helix issues and references them in the PR title:
   Fixes #984 
   
   ### Description
   - [x] Here are some details about my PR, including screenshots of any UI changes:
   In this commit, several parts of the code have been removed in order to avoid
   making scheduling decision based on PreviousAssignment. Instead we are relying on
   the CurrentState. 
   In order to facilitate this, the tasks that reaches the terminal state 
   (Error, Time_out, Aborted and completed) will be dropped and rescheduled in later
   pipeline if needed (If they need to be rescheduled). This helps us to reduce the
   number of existing current states for each task. 
   
   ### Tests
   - [x] The following is the result of the "mvn test" command on the appropriate module:
   [INFO] Results:
   [INFO] 
   [ERROR] Failures: 
   [ERROR]   TestWorkflowTermination.testWorkflowJobFail:251->verifyWorkflowCleanup:257 expected:<true> but was:<false>
   [ERROR]   TestWorkflowTimeout.testWorkflowTimeoutWhenWorkflowCompleted:116 expected:<true> but was:<false>
   [INFO] 
   [ERROR] Tests run: 1144, Failures: 2, Errors: 0, Skipped: 0
   [INFO] 
   [INFO] ------------------------------------------------------------------------
   [INFO] BUILD FAILURE
   [INFO] ------------------------------------------------------------------------
   [INFO] Total time:  01:21 h
   [INFO] Finished at: 2020-05-01T15:25:20-07:00
   [INFO] ------------------------------------------------------------------------
   
   The failed tests has passed when I ran it individually. 
   
   ### Commits
   
   - [x] My commits all reference appropriate Apache Helix GitHub issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Code Quality
   
   - [x] My diff has been formatted using helix-style.xml
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] dasahcc commented on a change in pull request #994: Remove the scheduling decision based on PreviousAssignment

Posted by GitBox <gi...@apache.org>.
dasahcc commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r428323885



##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -242,16 +230,16 @@ public void updatePreviousAssignedTasksStatus(
         }
           break;
         case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the context object.
-          donePartitions.add(pId);
+          // The task has completed on this partition. Drop it from the instance and add it to assignedPartitions in
+          // order to avoid scheduling it again in this pipeline.

Review comment:
       Also, I dont think you will get DROPPED state as current state, right? Because the current state will be deleted from the current state if the to state is DROPPED after it completes state transition.

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -242,16 +230,16 @@ public void updatePreviousAssignedTasksStatus(
         }
           break;
         case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the context object.
-          donePartitions.add(pId);
+          // The task has completed on this partition. Drop it from the instance and add it to assignedPartitions in
+          // order to avoid scheduling it again in this pipeline.

Review comment:
       The concern for doing this is that will this reflect the change to JobContext? So if final state will all be DROPPED. It is hard for us to determine whether the tasks is completed or it requires another retry? Because if task has problem in the middle of running, it will be DROPPED as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #994: Remove the scheduling decision based on PreviousAssignment

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r428930744



##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -121,17 +120,6 @@ public void updatePreviousAssignedTasksStatus(
               instance, pId);
           continue;
         }

Review comment:
       This shouldn't be an issue because this check is specifically added before to bypass the scenario where we have two current states which non of the markPartitionCompleted/markPartitionError will not be hit because it is either Running or Dropped. Good question though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on pull request #994: Remove the scheduling decision based on PreviousAssignment

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on pull request #994:
URL: https://github.com/apache/helix/pull/994#issuecomment-630352986


   > I have a general suggestion. Let's do the renaming in another PR. Then we can have clear understand what is the logical change.
   
   There are not that much name changes. It is basically prevAssignment to currentState. The rest are similar. If you still think the name change is necessary, I can change the names back to original. Please let me know.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #994: Remove the scheduling decision based on PreviousAssignment

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r428931451



##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the current state and context
+    // information because it will prevent controller to assign the task to the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want to update
+    // context as long as current state existed. We just want to update context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {
+      jobCtx.setPartitionState(pId, currentState);
+      String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
+      if (taskMsg != null) {
+        jobCtx.setPartitionInfo(pId, taskMsg);
+      }

Review comment:
       The participant read the message, handle state change and then delete the message. So when messages get deleted from ZK after ST is completed. So we should be safe here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #994: Remove the scheduling decision based on PreviousAssignment

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r426807892



##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current state and context
+    // information because it will prevent controller to stuck in race condition while there is two
+    // current states. In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want to update
+    // context as long as current state existed. We just want to update context information

Review comment:
       Here we check the if current state has been changed and it is not equal to context information. This check is necessary because we do not need to update the context in every pipeline. Also, the more important part of this check is for the fields such as finish time. Let’s say we have a delay retry time for the task. This functionality depends on the finish time in the context. If we update finish time as long as we have current state (note that we set it to current time), we will never schedule the task with delay retry time. Because finish time in the context and delay retry time will be messed up. That is why we only update the context at the first time that the current state has been changed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #994: Remove the scheduling decision based on PreviousAssignment

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r428327174



##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -242,16 +230,16 @@ public void updatePreviousAssignedTasksStatus(
         }
           break;
         case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the context object.
-          donePartitions.add(pId);
+          // The task has completed on this partition. Drop it from the instance and add it to assignedPartitions in
+          // order to avoid scheduling it again in this pipeline.

Review comment:
       Exactly. Since we do not get DROPPED state in current state, we will not mark the the task as DROPPED in job context. If the task is completed and we send COMPLETE to DROPPED message, then the participant will DROP the currentstate (meaning it will be removed from currentState) and this function will not see the task on the instance again because we are based on CurrentState now. Hence, we will not mark the task DROPPED in jobContext in this function. Basically since in this function context follows the currentState, then DROPPED context will not be possible (because DROPPED currentState is also not possible). Right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #994: Remove the scheduling decision based on PreviousAssignment

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r430580638



##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the current state and context
+    // information because it will prevent controller to assign the task to the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want to update
+    // context as long as current state existed. We just want to update context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {

Review comment:
       @narendly Thank you for mentioning these corner cases. 
   
   I went through the logic and I am sure the code handles this corner case as well. Let me explain why (please note that if task goes to terminal states, with this PR we are dropping 
    the task first and schedule them in next pipeline):
   
   Here are the steps:
   1- Requested State is set to TASK_ERROR by the participant. 
   2- Controller sends RUNNING -> TASK_ERROR to participant.
   3- Participant process it and makes current state as TASK_ERROR.
   4- Controller see this (i.e. TASK_ERROR) current state and send TASK_ERROR -> DROPPED message.
   5- Participant gets the message and drops/removes current states.
   6- Controller does not see current state (because it is removed) and sets the context of the task to be INIT.  Schedule the task again on the participant by sending INIT-> RUNNING message. (this happens in handleAdditionalTaskAssignment method).
   *Now controller by itself does not send RUNNING -> COMPLETE because COMPLETE state should be requested by the participant. The participant will either request TASK_ERROR or COMPLETE state in current state.
   7- If the task goes to error state we see this delta again (because context is set to be INIT before scheduling the task) and mark it ERROR with new finish time. 
   
   There are two points here that helps controller to handle this cornet case:
   1- controller will not send running -> complete unless participant requested state is COMPLETE. So if the task goes to TASK_ERROR in participant, the requested state will be TASK_ERROR and controller always respects the requested state and send RUNNING -> TASK_ERROR. 
   2- Dropping the task in terminal state makes the controller to reschedule the task with jobContext as INIT. This behavior helps the controller to see these deltas in future pipelines and update the information correctly in jobContext.
   

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current state and context
+    // information because it will prevent controller to stuck in race condition while there is two
+    // current states. In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want to update
+    // context as long as current state existed. We just want to update context information

Review comment:
       @narendly 
   
   The reason that delayed scheduling was working is because we were relying on previousAssignment. Let's say we are relying on previousAssignment and we decided to send the task to TASK_ERROR. This task will be recorded in previousAssignment and next pipeline we iterate through the task existed in previous assignment and record finish time and context information. Then this task will not be existed in future pipelines previousAssignment and we do not update finish time in the future pipelines. (Actually some of the test guided me toward this change).
   
   However, if we want to be independent of previousAssignment and rely on currentState (which is also means we are relying on participant's reactions), we need to consider that currentState might not change for several pipelines. In this case, it is necessary to monitor these deltas and not update the context information (specially time sensitive information) multiple times.

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the current state and context
+    // information because it will prevent controller to assign the task to the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want to update
+    // context as long as current state existed. We just want to update context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {

Review comment:
       @narendly Thank you for mentioning these corner cases. 
   
   I went through the logic and I am sure the code handles this corner case as well. Let me explain why (please note that if task goes to terminal states, with this PR we are dropping the task first and schedule them in next pipeline):
   
   Here are the steps:
   1- Requested State is set to TASK_ERROR by the participant. 
   2- Controller sends RUNNING -> TASK_ERROR to participant.
   3- Participant process it and makes current state as TASK_ERROR.
   4- Controller see this (i.e. TASK_ERROR) current state and send TASK_ERROR -> DROPPED message.
   5- Participant gets the message and drops/removes current states.
   6- Controller does not see current state (because it is removed) and sets the context of the task to be INIT.  Schedule the task again on the participant by sending INIT-> RUNNING message. (this happens in handleAdditionalTaskAssignment method).
   *Now controller by itself does not send RUNNING -> COMPLETE because COMPLETE state should be requested by the participant. The participant will either request TASK_ERROR or COMPLETE state in current state.
   7- If the task goes to error state we see this delta again (because context is set to be INIT before scheduling the task) and mark it ERROR with new finish time. 
   
   There are two points here that helps controller to handle this cornet case:
   1- controller will not send running -> complete unless participant requested state is COMPLETE. So if the task goes to TASK_ERROR in participant, the requested state will be TASK_ERROR and controller always respects the requested state and send RUNNING -> TASK_ERROR. 
   2- Dropping the task in terminal state makes the controller to reschedule the task with jobContext as INIT. This behavior helps the controller to see these deltas in future pipelines and update the information correctly in jobContext.
   

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current state and context
+    // information because it will prevent controller to stuck in race condition while there is two
+    // current states. In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want to update
+    // context as long as current state existed. We just want to update context information

Review comment:
       @narendly 
   
   The reason that delayed scheduling was working is because we were relying on previousAssignment. Let's say we are relying on previousAssignment and we decided to send the task to TASK_ERROR. This task will be recorded in previousAssignment and next pipeline we iterate through the task existed in previous assignment and record finish time and context information. Then this task will not be existed in future pipelines previousAssignment and we do not update finish time in the future pipelines. (Actually some of the tests have guided me toward this change).
   
   However, if we want to be independent of previousAssignment and rely on currentState (which is also means we are relying on participant's reactions), we need to consider that currentState might not change for several pipelines. In this case, it is necessary to monitor these deltas and not update the context information (specially time sensitive information) multiple times.

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current state and context
+    // information because it will prevent controller to stuck in race condition while there is two
+    // current states. In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want to update
+    // context as long as current state existed. We just want to update context information

Review comment:
       @narendly 
   
   The reason that delayed scheduling was working is because we were relying on previousAssignment. Let's say we are relying on previousAssignment and we decided to send the task to TASK_ERROR. This task will be recorded in previousAssignment and next pipeline we iterate through the task existed in previous assignment and record finish time and context information. Then this task will not be existed in future pipelines previousAssignment and we do not update finish time in the future pipelines. (Actually some of the tests have guided me toward this change).
   
   However, if we want to be independent of previousAssignment and rely on currentState (which also means we are relying on participant's reactions), we need to consider that currentState might not change for several pipelines. In this case, it is necessary to monitor these deltas and not update the context information (specially time sensitive information) multiple times.

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the current state and context
+    // information because it will prevent controller to assign the task to the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want to update
+    // context as long as current state existed. We just want to update context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {

Review comment:
       I double checked again and it seems the only way that controller sends COMPLETED message to participant should be initiated by participant by setting the requested state to be completed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #994: Remove the scheduling decision based on PreviousAssignment

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r428327174



##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -242,16 +230,16 @@ public void updatePreviousAssignedTasksStatus(
         }
           break;
         case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the context object.
-          donePartitions.add(pId);
+          // The task has completed on this partition. Drop it from the instance and add it to assignedPartitions in
+          // order to avoid scheduling it again in this pipeline.

Review comment:
       Exactly. Since we do not get DROPPED state in current state, we will not mark the the task as DROPPED in job context. If the task is completed and we send COMPLETE to DROPPED message, then the participant will DROP the currentstate (meaning it will be removed from currentState) and this function will not see the task on the instance again because we are based on CurrentState now. Hence, we will not mark the task DROPPED in jobContext in this function. Basically since in this function context follows the currentState, then DROPPED context will not be possible. Right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #994: Remove the scheduling decision based on PreviousAssignment

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r428928371



##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -238,21 +238,20 @@ private ResourceAssignment computeResourceMapping(String jobResource,
     // These dropping transitions will be prioritized above all task state transition assignments
     Map<String, Set<Integer>> tasksToDrop = new HashMap<>();
 
-    Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments =
-        getPrevInstanceToTaskAssignments(liveInstances, prevTaskToInstanceStateAssignment,
-            allPartitions, currStateOutput, jobResource, tasksToDrop);
+    Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments =
+        getCurrentInstanceToTaskAssignments(liveInstances, currStateOutput, jobResource, tasksToDrop);
 
-    updateInstanceToTaskAssignmentsFromContext(jobCtx, prevInstanceToTaskAssignments);
+    updateInstanceToTaskAssignmentsFromContext(jobCtx, currentInstanceToTaskAssignments);
 
     long currentTime = System.currentTimeMillis();
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("All partitions: " + allPartitions + " taskAssignment: "
-          + prevInstanceToTaskAssignments + " excludedInstances: " + excludedInstances);
+          + currentInstanceToTaskAssignments + " excludedInstances: " + excludedInstances);
     }
 
     // Release resource for tasks in terminal state
-    updatePreviousAssignedTasksStatus(prevInstanceToTaskAssignments, excludedInstances, jobResource,
+    updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments, excludedInstances, jobResource,

Review comment:
       I think it should be fine to keep it as it is. Because we are updating the state of the tasks that has been assigned in previous pipelines based on the currentstate of the tasks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #994: Remove the scheduling decision based on PreviousAssignment

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r427611990



##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -238,21 +238,20 @@ private ResourceAssignment computeResourceMapping(String jobResource,
     // These dropping transitions will be prioritized above all task state transition assignments
     Map<String, Set<Integer>> tasksToDrop = new HashMap<>();
 
-    Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments =
-        getPrevInstanceToTaskAssignments(liveInstances, prevTaskToInstanceStateAssignment,
-            allPartitions, currStateOutput, jobResource, tasksToDrop);
+    Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments =
+        getCurrentInstanceToTaskAssignments(liveInstances, currStateOutput, jobResource, tasksToDrop);
 
-    updateInstanceToTaskAssignmentsFromContext(jobCtx, prevInstanceToTaskAssignments);
+    updateInstanceToTaskAssignmentsFromContext(jobCtx, currentInstanceToTaskAssignments);
 
     long currentTime = System.currentTimeMillis();
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("All partitions: " + allPartitions + " taskAssignment: "
-          + prevInstanceToTaskAssignments + " excludedInstances: " + excludedInstances);
+          + currentInstanceToTaskAssignments + " excludedInstances: " + excludedInstances);
     }
 
     // Release resource for tasks in terminal state
-    updatePreviousAssignedTasksStatus(prevInstanceToTaskAssignments, excludedInstances, jobResource,
+    updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments, excludedInstances, jobResource,

Review comment:
       Should this function be renamed? 

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -121,17 +120,6 @@ public void updatePreviousAssignedTasksStatus(
               instance, pId);
           continue;
         }

Review comment:
       Would it be a problem that this skipping check is now done after markPartitionCompleted/markPartitionError? 

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the current state and context
+    // information because it will prevent controller to assign the task to the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want to update
+    // context as long as current state existed. We just want to update context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {
+      jobCtx.setPartitionState(pId, currentState);
+      String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
+      if (taskMsg != null) {
+        jobCtx.setPartitionInfo(pId, taskMsg);
+      }

Review comment:
       Is there a possibility that a message exists without state changes? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #994: Remove the scheduling decision based on PreviousAssignment

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r426807350



##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -99,16 +99,15 @@ public void updatePreviousAssignedTasksStatus(
       }
 
       // If not an excluded instance, we must instantiate its entry in assignedPartitions
-      Set<Integer> pSet = prevInstanceToTaskAssignments.get(instance);
+      Set<Integer> pSet = currentInstanceToTaskAssignments.get(instance);
 
       // We need to remove all task pId's to be dropped because we already made an assignment in
       // paMap above for them to be dropped. The following does this.
       if (tasksToDrop.containsKey(instance)) {
         pSet.removeAll(tasksToDrop.get(instance));
       }
 
-      // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
-      // TASK_ERROR, ERROR.
+      // Used to keep track of partitions that are in one the INIT or DROPPED states.

Review comment:
       Yes. Done.

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -242,16 +230,16 @@ public void updatePreviousAssignedTasksStatus(
         }
           break;
         case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the context object.
-          donePartitions.add(pId);
+          // The task has completed on this partition. Drop it from the instance and add it to assignedPartitions in
+          // order to avoid scheduling it again in this pipeline.

Review comment:
       The reason is the behavior has been changed in this PR. We have an assignment for it and the assignment is dropped. So, it goes to assigned partitions. Once it is dropped from participant, then we do not even consider it for the new assignment.
   

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -242,16 +230,16 @@ public void updatePreviousAssignedTasksStatus(
         }
           break;
         case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the context object.
-          donePartitions.add(pId);
+          // The task has completed on this partition. Drop it from the instance and add it to assignedPartitions in
+          // order to avoid scheduling it again in this pipeline.
+          assignedPartitions.get(instance).add(pId);
+          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));

Review comment:
       You are right. I also had this concern. But note that we do not change the JobContext to DROPPED. Let’s say controller sends COMPLETED to DROPPED message to the participant. Now, the current state will be COMPLETED until participant processes the message. Once it is processed, the current state will be null, and that task will not be considered again. So, to answer your question, it is totally depending on the behavior of the participant. If participant set the current state  to DROPPED, which is not the case here, your concern was correct. Since participant removes the current state of the task, we do not need to be worry about that case.

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -263,7 +251,11 @@ public void updatePreviousAssignedTasksStatus(
         case TASK_ABORTED:
 
         case ERROR: {
-          donePartitions.add(pId); // The task may be rescheduled on a different instance.
+          // First make this task which is in terminal state to be dropped.
+          // Later on, in next pipeline in handleAdditionalAssignments, the task will be retried if possible.
+          // (meaning it is not ABORTED and max number of attempts has not been reached yet)
+          assignedPartitions.get(instance).add(pId);
+          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));

Review comment:
       The answer to this question is also similar to COMPLETED case :). Please see it above.

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current state and context

Review comment:
       Fixed. Thanks.

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current state and context
+    // information because it will prevent controller to stuck in race condition while there is two

Review comment:
       The race condition that here is scenario that I am talking about is the scenario where we have two current states of a task in different participants. Since we have only one slot for each task in paMap, we want to make sure that the task is DROPPED from the wrong one and is assigned to the correct instance. I am basically talking about this #922 and #461. Let me rephrase this comment to make it clear.

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current state and context
+    // information because it will prevent controller to stuck in race condition while there is two
+    // current states. In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.

Review comment:
       Exactly. We add this check in PR #923 to help resolve this scenario. Maybe using race condition is not the good word here. I rephrased the comments.

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current state and context
+    // information because it will prevent controller to stuck in race condition while there is two
+    // current states. In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want to update
+    // context as long as current state existed. We just want to update context information
+    // (specially finish time) once.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {

Review comment:
       Exactly. I added a comment about it.

##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -380,72 +379,38 @@ private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions,
 
   /**
    * @param liveInstances
-   * @param prevAssignment task partition -> (instance -> state)
-   * @param allTaskPartitions all task partitionIds
    * @param currStateOutput currentStates to make sure currentStates copied over expired sessions
    *          are accounted for
    * @param jobName job name
    * @param tasksToDrop instance -> pId's, to gather all pIds that need to be dropped
    * @return instance -> partitionIds from previous assignment, if the instance is still live
    */
-  protected static Map<String, SortedSet<Integer>> getPrevInstanceToTaskAssignments(
-      Iterable<String> liveInstances, ResourceAssignment prevAssignment,
-      Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput, String jobName,
+  protected static Map<String, SortedSet<Integer>> getCurrentInstanceToTaskAssignments(
+      Iterable<String> liveInstances, CurrentStateOutput currStateOutput, String jobName,
       Map<String, Set<Integer>> tasksToDrop) {
     Map<String, SortedSet<Integer>> result = new HashMap<>();
     for (String instance : liveInstances) {
       result.put(instance, new TreeSet<>());
     }
 
-    // First, add all task partitions from prevAssignment
-    // TODO: Remove this portion to get rid of prevAssignment from Task Framework
-    for (Partition partition : prevAssignment.getMappedPartitions()) {
-      int pId = TaskUtil.getPartitionId(partition.getPartitionName());
-      if (allTaskPartitions.contains(pId)) {
-        Map<String, String> replicaMap = prevAssignment.getReplicaMap(partition);
-        for (String instance : replicaMap.keySet()) {
-          SortedSet<Integer> pIdSet = result.get(instance);
-          if (pIdSet != null) {
-            pIdSet.add(pId);
-          }
-        }
-      }
-    }
-
-    // Generate prevInstanceToTaskAssignment with CurrentStateOutput as source of truth
-
-    // Add all pIds existing in CurrentStateOutput as well because task currentStates copied over
-    // from previous sessions won't show up in prevInstanceToTaskAssignments
-    // We need to add these back here in order for these task partitions to be dropped (after a
-    // copy-over, the Controller will send a message to drop the state currentState)
-    // partitions: (partition -> instance -> currentState)
+    // Generate currentInstanceToTaskAssignment with CurrentStateOutput as source of truth
+    // Add all pIds existing in CurrentStateOutput
+    // We need to add these pIds here and make decision about the existing tasks with current state
+    // in updatePreviousAssignedTasksStatus method

Review comment:
       Fixed. Thanks.

##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -380,72 +379,38 @@ private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions,
 
   /**
    * @param liveInstances
-   * @param prevAssignment task partition -> (instance -> state)
-   * @param allTaskPartitions all task partitionIds
    * @param currStateOutput currentStates to make sure currentStates copied over expired sessions
    *          are accounted for
    * @param jobName job name
    * @param tasksToDrop instance -> pId's, to gather all pIds that need to be dropped
    * @return instance -> partitionIds from previous assignment, if the instance is still live
    */
-  protected static Map<String, SortedSet<Integer>> getPrevInstanceToTaskAssignments(
-      Iterable<String> liveInstances, ResourceAssignment prevAssignment,
-      Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput, String jobName,
+  protected static Map<String, SortedSet<Integer>> getCurrentInstanceToTaskAssignments(
+      Iterable<String> liveInstances, CurrentStateOutput currStateOutput, String jobName,
       Map<String, Set<Integer>> tasksToDrop) {
     Map<String, SortedSet<Integer>> result = new HashMap<>();
     for (String instance : liveInstances) {
       result.put(instance, new TreeSet<>());
     }
 
-    // First, add all task partitions from prevAssignment
-    // TODO: Remove this portion to get rid of prevAssignment from Task Framework
-    for (Partition partition : prevAssignment.getMappedPartitions()) {
-      int pId = TaskUtil.getPartitionId(partition.getPartitionName());
-      if (allTaskPartitions.contains(pId)) {
-        Map<String, String> replicaMap = prevAssignment.getReplicaMap(partition);
-        for (String instance : replicaMap.keySet()) {
-          SortedSet<Integer> pIdSet = result.get(instance);
-          if (pIdSet != null) {
-            pIdSet.add(pId);
-          }
-        }
-      }
-    }
-
-    // Generate prevInstanceToTaskAssignment with CurrentStateOutput as source of truth
-
-    // Add all pIds existing in CurrentStateOutput as well because task currentStates copied over
-    // from previous sessions won't show up in prevInstanceToTaskAssignments
-    // We need to add these back here in order for these task partitions to be dropped (after a
-    // copy-over, the Controller will send a message to drop the state currentState)
-    // partitions: (partition -> instance -> currentState)
+    // Generate currentInstanceToTaskAssignment with CurrentStateOutput as source of truth
+    // Add all pIds existing in CurrentStateOutput
+    // We need to add these pIds here and make decision about the existing tasks with current state
+    // in updatePreviousAssignedTasksStatus method
     Map<Partition, Map<String, String>> partitions = currStateOutput.getCurrentStateMap(jobName);
     for (Map.Entry<Partition, Map<String, String>> entry : partitions.entrySet()) {
       // Get all (instance -> currentState) mappings
       for (Map.Entry<String, String> instanceToCurrState : entry.getValue().entrySet()) {
         String instance = instanceToCurrState.getKey();
         String requestedState =
             currStateOutput.getRequestedState(jobName, entry.getKey(), instance);
-        TaskPartitionState currState = TaskPartitionState.valueOf(instanceToCurrState.getValue());
         int pId = TaskUtil.getPartitionId(entry.getKey().getPartitionName());
 
         if (result.containsKey(instance)) {
-          // We must add all active task pIds back here because dropping transition could overwrite
-          // an active transition in paMap
-          // Add all task partitions in the following states:
-          // currState = INIT, requestedState = RUNNING (bootstrap)
-          // currState = RUNNING, requestedState = ANY (active)
-          // ** for tasks that are just in INIT state, we do not add them here because old
-          // Participants, upon connection reset, set tasks' currentStates to INIT. We cannot
-          // consider those tasks active **
-          if (currState == TaskPartitionState.INIT && requestedState != null
-              && requestedState.equals(TaskPartitionState.RUNNING.name())
-              || currState == TaskPartitionState.RUNNING) {
-            result.get(instance).add(pId);
-          }
-
+          // We must add all pIds back here

Review comment:
       I agree. Removed the comment. Basically, we have explained everything in the comments already.

##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -380,72 +379,38 @@ private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions,
 
   /**
    * @param liveInstances
-   * @param prevAssignment task partition -> (instance -> state)
-   * @param allTaskPartitions all task partitionIds
    * @param currStateOutput currentStates to make sure currentStates copied over expired sessions
    *          are accounted for
    * @param jobName job name
    * @param tasksToDrop instance -> pId's, to gather all pIds that need to be dropped
    * @return instance -> partitionIds from previous assignment, if the instance is still live
    */
-  protected static Map<String, SortedSet<Integer>> getPrevInstanceToTaskAssignments(
-      Iterable<String> liveInstances, ResourceAssignment prevAssignment,
-      Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput, String jobName,
+  protected static Map<String, SortedSet<Integer>> getCurrentInstanceToTaskAssignments(
+      Iterable<String> liveInstances, CurrentStateOutput currStateOutput, String jobName,
       Map<String, Set<Integer>> tasksToDrop) {
     Map<String, SortedSet<Integer>> result = new HashMap<>();
     for (String instance : liveInstances) {
       result.put(instance, new TreeSet<>());
     }
 
-    // First, add all task partitions from prevAssignment
-    // TODO: Remove this portion to get rid of prevAssignment from Task Framework
-    for (Partition partition : prevAssignment.getMappedPartitions()) {
-      int pId = TaskUtil.getPartitionId(partition.getPartitionName());
-      if (allTaskPartitions.contains(pId)) {
-        Map<String, String> replicaMap = prevAssignment.getReplicaMap(partition);
-        for (String instance : replicaMap.keySet()) {
-          SortedSet<Integer> pIdSet = result.get(instance);
-          if (pIdSet != null) {
-            pIdSet.add(pId);
-          }
-        }
-      }
-    }
-
-    // Generate prevInstanceToTaskAssignment with CurrentStateOutput as source of truth
-
-    // Add all pIds existing in CurrentStateOutput as well because task currentStates copied over
-    // from previous sessions won't show up in prevInstanceToTaskAssignments
-    // We need to add these back here in order for these task partitions to be dropped (after a
-    // copy-over, the Controller will send a message to drop the state currentState)
-    // partitions: (partition -> instance -> currentState)
+    // Generate currentInstanceToTaskAssignment with CurrentStateOutput as source of truth
+    // Add all pIds existing in CurrentStateOutput
+    // We need to add these pIds here and make decision about the existing tasks with current state
+    // in updatePreviousAssignedTasksStatus method
     Map<Partition, Map<String, String>> partitions = currStateOutput.getCurrentStateMap(jobName);
     for (Map.Entry<Partition, Map<String, String>> entry : partitions.entrySet()) {
       // Get all (instance -> currentState) mappings
       for (Map.Entry<String, String> instanceToCurrState : entry.getValue().entrySet()) {
         String instance = instanceToCurrState.getKey();
         String requestedState =
             currStateOutput.getRequestedState(jobName, entry.getKey(), instance);
-        TaskPartitionState currState = TaskPartitionState.valueOf(instanceToCurrState.getValue());
         int pId = TaskUtil.getPartitionId(entry.getKey().getPartitionName());
 
         if (result.containsKey(instance)) {
-          // We must add all active task pIds back here because dropping transition could overwrite
-          // an active transition in paMap
-          // Add all task partitions in the following states:
-          // currState = INIT, requestedState = RUNNING (bootstrap)
-          // currState = RUNNING, requestedState = ANY (active)
-          // ** for tasks that are just in INIT state, we do not add them here because old
-          // Participants, upon connection reset, set tasks' currentStates to INIT. We cannot
-          // consider those tasks active **
-          if (currState == TaskPartitionState.INIT && requestedState != null
-              && requestedState.equals(TaskPartitionState.RUNNING.name())
-              || currState == TaskPartitionState.RUNNING) {
-            result.get(instance).add(pId);
-          }
-
+          // We must add all pIds back here
+          result.get(instance).add(pId);
           // Check if this task needs to be dropped. If so, we need to add to tasksToDrop no matter
-          // what its current state is so that it will be dropped
+          // what its current state is so that it will be dropped.

Review comment:
       Done.

##########
File path: helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
##########
@@ -462,10 +427,10 @@ private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions,
    * If partition is missing from prevInstanceToTaskAssignments (e.g. previous assignment is
    * deleted) it is added from context. Otherwise, the context won't be updated.
    * @param jobCtx Job Context
-   * @param prevInstanceToTaskAssignments instance -> partitionIds from previous assignment
+   * @param currentInstanceToTaskAssignments instance -> partitionIds from current assignment

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #994: Remove the scheduling decision based on PreviousAssignment

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r428327174



##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -242,16 +230,16 @@ public void updatePreviousAssignedTasksStatus(
         }
           break;
         case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the context object.
-          donePartitions.add(pId);
+          // The task has completed on this partition. Drop it from the instance and add it to assignedPartitions in
+          // order to avoid scheduling it again in this pipeline.

Review comment:
       Exactly. Since we do not get DROPPED state in current state, we will not mark the the task as DROPPED in job context. If the task is completed and we send COMPLETE to DROPPED message, then the participant will DROP the currentstate (meaning it will be removed from currentState) and this function will not see the task on the instance again because we are based on CurrentState now. Hence, we will not mark the task DROPPED in jobContext. Basically since in this function context follows the currentState, then DROPPED context will not be possible. Right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on pull request #994: Remove the scheduling decision based on PreviousAssignment

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on pull request #994:
URL: https://github.com/apache/helix/pull/994#issuecomment-634334592






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #994: Remove the scheduling decision based on PreviousAssignment

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r430049218



##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current state and context
+    // information because it will prevent controller to stuck in race condition while there is two
+    // current states. In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want to update
+    // context as long as current state existed. We just want to update context information

Review comment:
       @alirezazamani -
   
   Okay, I think your point about not updating when there's no delta is valid, but I'm not sure if what you said about delayed scheduling is entirely true. Delayed scheduling of tasks is a feature that was previously working, right? If what you said about it was the case, then how come our delayed scheduling feature was working all this time?

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the current state and context
+    // information because it will prevent controller to assign the task to the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want to update
+    // context as long as current state existed. We just want to update context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {

Review comment:
       @alirezazamani Another concern here:
   
   Is it possible for a task to be retried so fast that it ends up being in the same state? For example,
   
   task_error -> (controller reschedules it) -> (controller sends messages, error -> init, init -> running, running -> complete) -> participant processes message so quickly but it goes into task_error again, and by the time controller gets to this task, the states are the same as the previous run. Basically, controller sees task_error ->  task_error.
   
   Is that going to be an issue? I guess partition id is set before this check so that will be updated accordingly, but what about finish time or the time at which the task was marked as error?

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the current state and context
+    // information because it will prevent controller to assign the task to the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want to update
+    // context as long as current state existed. We just want to update context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {

Review comment:
       @alirezazamani Another concern here:
   
   Is it possible for a task to be retried so fast (on the participant side) that it ends up being in the same state? For example,
   
   task_error -> (controller reschedules it) -> (controller sends messages, error -> init, init -> running, running -> complete) -> participant processes message so quickly but it goes into task_error again, and by the time controller gets to this task, the states are the same as the previous run. Basically, controller sees task_error ->  task_error.
   
   Is that going to be an issue? I guess partition id is set before this check so that will be updated accordingly, but what about finish time or the time at which the task was marked as error?

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the current state and context
+    // information because it will prevent controller to assign the task to the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want to update
+    // context as long as current state existed. We just want to update context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {

Review comment:
       > 1- controller will not send running -> complete unless participant requested state is COMPLETE
   
   Are you sure this is true? Can you verify that init -> running / running -> complete are not sent out at the same time? The target state during message generation phase should be complete if I recall correctly.

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,57 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assigned participant needs to be updated regardless of the current state and context
+    // information because it will prevent controller to stuck in race condition while there is two
+    // current states. In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want to update
+    // context as long as current state existed. We just want to update context information

Review comment:
       Thanks for the explanation. The reason we need to know if CurrentState changed is because we no longer have prevAssignment.

##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -389,13 +381,59 @@ private TaskPartitionState updateJobContextAndGetTaskCurrentState(
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the current state and context
+    // information because it will prevent controller to assign the task to the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want to update
+    // context as long as current state existed. We just want to update context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {

Review comment:
       Sounds good. Overall I think this analysis is sound. Let's make sure we do a careful round of  E2E testing as well as load testing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly merged pull request #994: Remove the scheduling decision based on PreviousAssignment

Posted by GitBox <gi...@apache.org>.
narendly merged pull request #994:
URL: https://github.com/apache/helix/pull/994


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani edited a comment on pull request #994: Remove the scheduling decision based on PreviousAssignment

Posted by GitBox <gi...@apache.org>.
alirezazamani edited a comment on pull request #994:
URL: https://github.com/apache/helix/pull/994#issuecomment-634336485






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] alirezazamani commented on a change in pull request #994: Remove the scheduling decision based on PreviousAssignment

Posted by GitBox <gi...@apache.org>.
alirezazamani commented on a change in pull request #994:
URL: https://github.com/apache/helix/pull/994#discussion_r428327174



##########
File path: helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
##########
@@ -242,16 +230,16 @@ public void updatePreviousAssignedTasksStatus(
         }
           break;
         case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the context object.
-          donePartitions.add(pId);
+          // The task has completed on this partition. Drop it from the instance and add it to assignedPartitions in
+          // order to avoid scheduling it again in this pipeline.

Review comment:
       Exactly. Since we do not get DROPPED state in current state, we will not mark the the task as DROPPED in job context. If the task is completed and we send COMPLETE to DROPPED message, then the participant will DROP the currentstate and this function will not see the task on the instance again because we are based on CurrentState now. Hence, we will not mark the task DROPPED in jobContext. Basically since in this function context follows the currentState, then DROPPED context will not be possible. Right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org