You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2022/02/07 23:07:05 UTC

[helix] branch master updated: Fixes #1802 - messages intended for instances that are no longer in the cluster (#1951)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 1927792  Fixes #1802 - messages intended for instances that are no longer in the cluster (#1951)
1927792 is described below

commit 192779276eda1a3336525a07ae823003172fcc9e
Author: Komal Desai <98...@users.noreply.github.com>
AuthorDate: Mon Feb 7 15:06:57 2022 -0800

    Fixes #1802 - messages intended for instances that are no longer in the cluster (#1951)
    
    In MessageGenerationPhase.java, - process() method populates the list of live instances from cache.
    
    But while generateMessage() method has the sessionIdMap information, it still goes through partition/resource/instance map without checking if instance is still part of the cluster or not.
    
    It is possible that cache has stale entry but that logic needs to be worked separately. But while generating message, we should check if the instance is still there.
    
    So this is a simple change. We need to still look further if cache is getting invalidated properly.
    
    To make sure that the cache properly is handled/refreshed under instance being replaced or deletion - have filled another bug: #1956
---
 .../helix/controller/stages/MessageGenerationPhase.java       | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 98313b9..7981302 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -193,7 +193,7 @@ public class MessageGenerationPhase extends AbstractBaseStage {
           }
         }
 
-        if (pendingMessage != null && shouldCleanUpPendingMessage(pendingMessage, currentState,
+        if (shouldCleanUpPendingMessage(pendingMessage, sessionIdMap, instanceName, currentState,
             currentStateOutput.getEndTime(resourceName, partition, instanceName))) {
           logAndAddToCleanUp(messagesToCleanUp, pendingMessage, instanceName, resourceName,
               partition, currentState, PENDING_MESSAGE);
@@ -203,7 +203,8 @@ public class MessageGenerationPhase extends AbstractBaseStage {
           // staleMessage can be simple or batch mode
           if ((System.currentTimeMillis() - currentStateOutput
               .getEndTime(resourceName, partition, instanceName) > DEFAULT_OBSELETE_MSG_PURGE_DELAY)
-              && staleMessage.getResourceName().equals(resourceName) && (
+              && staleMessage.getResourceName().equals(resourceName) && sessionIdMap
+              .containsKey(instanceName) && (
               staleMessage.getPartitionName().equals(partition.getPartitionName()) || (
                   staleMessage.getBatchMessageMode() && staleMessage.getPartitionNames()
                       .contains(partition.getPartitionName())))) {
@@ -404,9 +405,9 @@ public class MessageGenerationPhase extends AbstractBaseStage {
     });
   }
 
-  private boolean shouldCleanUpPendingMessage(Message pendingMsg, String currentState,
-      Long currentStateTransitionEndTime) {
-    if (pendingMsg == null) {
+  private boolean shouldCleanUpPendingMessage(Message pendingMsg, Map<String, String> sessionIdMap,
+      String instanceName, String currentState, Long currentStateTransitionEndTime) {
+    if (pendingMsg == null || !sessionIdMap.containsKey(instanceName)) {
       return false;
     }
     if (currentState.equalsIgnoreCase(pendingMsg.getToState())) {