You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/10/20 16:43:53 UTC
[helix] branch master updated: Follow up change: NPE in IntermediateStateCalc (#2673)
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 7da0ddbe6 Follow up change: NPE in IntermediateStateCalc (#2673)
7da0ddbe6 is described below
commit 7da0ddbe6148dd1d518c601b529c8c11fef41665
Author: Komal Desai <kd...@linkedin.com>
AuthorDate: Fri Oct 20 09:43:47 2023 -0700
Follow up change: NPE in IntermediateStateCalc (#2673)
Follow up change to NPE in intermediate stage, we should not skip message throttling in case of missing partition's preference list.
---
.../controller/stages/IntermediateStateCalcStage.java | 19 +++++++++----------
1 file changed, 9 insertions(+), 10 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index ec17b620c..477e4f99b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -367,11 +367,11 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
currentStateOutput.getCurrentStateMap(resourceName, partition).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
- if (preferenceList == null || preferenceList.size() == 0) {
- continue;
- }
Map<String, Integer> requiredState = getRequiredStates(resourceName, cache, preferenceList);
- messagesToThrottle.sort(new MessagePriorityComparator(preferenceList, stateModelDef.getStatePriorityMap()));
+ if (preferenceList != null && !preferenceList.isEmpty()) {
+ // Sort messages based on the priority (priority is defined in the state model definition
+ messagesToThrottle.sort(new MessagePriorityComparator(preferenceList, stateModelDef.getStatePriorityMap()));
+ }
for (Message message : messagesToThrottle) {
RebalanceType rebalanceType =
getRebalanceTypePerMessage(requiredState, message, derivedCurrentStateMap);
@@ -470,10 +470,6 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
// To clarify that custom mode does not apply recovery/load rebalance since user can define different number of
// replicas for different partitions. Actually, the custom will stopped from resource level checks if this resource
// is not FULL_AUTO, we will return best possible state and do nothing.
- List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
- if (preferenceList == null) {
- continue;
- }
Map<String, Integer> requiredStates =
getRequiredStates(resourceName, cache, preferenceLists.get(partition.getPartitionName()));
// Maps instance to its current state
@@ -482,8 +478,11 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
// Maps instance to its pending (next) state
List<Message> pendingMessages = new ArrayList<>(
currentStateOutput.getPendingMessageMap(resourceName, partition).values());
- pendingMessages.sort(new MessagePriorityComparator(preferenceLists.get(partition.getPartitionName()),
- stateModelDefinition.getStatePriorityMap()));
+ List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
+ if (preferenceList != null && !preferenceList.isEmpty()) {
+ pendingMessages.sort(new MessagePriorityComparator(preferenceList,
+ stateModelDefinition.getStatePriorityMap()));
+ }
for (Message message : pendingMessages) {
StateTransitionThrottleConfig.RebalanceType rebalanceType =