You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/10/20 16:43:53 UTC

[helix] branch master updated: Follow up change: NPE in IntermediateStateCalc (#2673)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 7da0ddbe6 Follow up change: NPE in IntermediateStateCalc (#2673)
7da0ddbe6 is described below

commit 7da0ddbe6148dd1d518c601b529c8c11fef41665
Author: Komal Desai <kd...@linkedin.com>
AuthorDate: Fri Oct 20 09:43:47 2023 -0700

    Follow up change: NPE in IntermediateStateCalc (#2673)
    
    Follow up change to NPE in intermediate stage, we should not skip message throttling in case of missing partition's preference list.
---
 .../controller/stages/IntermediateStateCalcStage.java | 19 +++++++++----------
 1 file changed, 9 insertions(+), 10 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index ec17b620c..477e4f99b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -367,11 +367,11 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
           currentStateOutput.getCurrentStateMap(resourceName, partition).entrySet().stream()
               .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
       List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
-      if (preferenceList == null || preferenceList.size() == 0) {
-        continue;
-      }
       Map<String, Integer> requiredState = getRequiredStates(resourceName, cache, preferenceList);
-      messagesToThrottle.sort(new MessagePriorityComparator(preferenceList, stateModelDef.getStatePriorityMap()));
+      if (preferenceList != null && !preferenceList.isEmpty()) {
+        // Sort messages based on the priority (priority is defined in the state model definition
+        messagesToThrottle.sort(new MessagePriorityComparator(preferenceList, stateModelDef.getStatePriorityMap()));
+      }
       for (Message message : messagesToThrottle) {
         RebalanceType rebalanceType =
             getRebalanceTypePerMessage(requiredState, message, derivedCurrentStateMap);
@@ -470,10 +470,6 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       // To clarify that custom mode does not apply recovery/load rebalance since user can define different number of
       // replicas for different partitions. Actually, the custom will stopped from resource level checks if this resource
       // is not FULL_AUTO, we will return best possible state and do nothing.
-      List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
-      if (preferenceList == null) {
-        continue;
-      }
       Map<String, Integer> requiredStates =
           getRequiredStates(resourceName, cache, preferenceLists.get(partition.getPartitionName()));
       // Maps instance to its current state
@@ -482,8 +478,11 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       // Maps instance to its pending (next) state
       List<Message> pendingMessages = new ArrayList<>(
           currentStateOutput.getPendingMessageMap(resourceName, partition).values());
-      pendingMessages.sort(new MessagePriorityComparator(preferenceLists.get(partition.getPartitionName()),
-          stateModelDefinition.getStatePriorityMap()));
+      List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
+      if (preferenceList != null && !preferenceList.isEmpty()) {
+        pendingMessages.sort(new MessagePriorityComparator(preferenceList,
+            stateModelDefinition.getStatePriorityMap()));
+      }
 
       for (Message message : pendingMessages) {
         StateTransitionThrottleConfig.RebalanceType rebalanceType =