You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/10/18 18:21:49 UTC

[helix] branch master updated: Fix NPE in intermediate state calculation stage (#2668)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f19dad4f Fix NPE in intermediate state calculation stage (#2668)
4f19dad4f is described below

commit 4f19dad4f5e2a016f8e3355d240605644e745259
Author: Komal Desai <kd...@linkedin.com>
AuthorDate: Wed Oct 18 11:21:43 2023 -0700

    Fix NPE in intermediate state calculation stage (#2668)
    
    Fix the NPE in intermediate state calcuation stage when a partition is deleted through update in NUM_PARTITION field
---
 .../stages/IntermediateStateCalcStage.java         |  20 ++-
 .../stages/TestIntermediateStateCalcStage.java     | 162 +++++++++++++++++++++
 2 files changed, 177 insertions(+), 5 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 00c49ad91..ec17b620c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -367,6 +367,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
           currentStateOutput.getCurrentStateMap(resourceName, partition).entrySet().stream()
               .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
       List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
+      if (preferenceList == null || preferenceList.size() == 0) {
+        continue;
+      }
       Map<String, Integer> requiredState = getRequiredStates(resourceName, cache, preferenceList);
       messagesToThrottle.sort(new MessagePriorityComparator(preferenceList, stateModelDef.getStatePriorityMap()));
       for (Message message : messagesToThrottle) {
@@ -467,6 +470,10 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       // To clarify that custom mode does not apply recovery/load rebalance since user can define different number of
       // replicas for different partitions. Actually, the custom will stopped from resource level checks if this resource
       // is not FULL_AUTO, we will return best possible state and do nothing.
+      List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
+      if (preferenceList == null) {
+        continue;
+      }
       Map<String, Integer> requiredStates =
           getRequiredStates(resourceName, cache, preferenceLists.get(partition.getPartitionName()));
       // Maps instance to its current state
@@ -643,15 +650,18 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     StateModelDefinition stateModelDefinition =
         resourceControllerDataProvider.getStateModelDef(idealState.getStateModelDefRef());
     int requiredNumReplica =
-        idealState.getMinActiveReplicas() == -1 ? idealState.getReplicaCount(preferenceList.size())
+        idealState.getMinActiveReplicas() == -1 ?
+            idealState.getReplicaCount(preferenceList == null ? 0 : preferenceList.size())
             : idealState.getMinActiveReplicas();
 
     // Generate a state mapping, state -> required numbers based on the live and enabled instances for this partition
     // preference list
-    return stateModelDefinition.getStateCountMap(
-        (int) preferenceList.stream()
-            .filter(i -> resourceControllerDataProvider.getEnabledLiveInstances().contains(i))
-            .count(), requiredNumReplica); // StateModelDefinition's counts
+    if (preferenceList != null) {
+      return stateModelDefinition.getStateCountMap((int) preferenceList.stream().filter(i -> resourceControllerDataProvider.getEnabledLiveInstances().contains(i))
+          .count(), requiredNumReplica); // StateModelDefinition's counts
+    }
+    return stateModelDefinition.getStateCountMap(resourceControllerDataProvider.getEnabledLiveInstances().size(),
+        requiredNumReplica); // StateModelDefinition's counts
   }
 
   /**
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
index b651e0d28..60281e65b 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
@@ -406,6 +406,168 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
     }
   }
 
+  @Test
+  public void testPartitionMissing() {
+    String resourcePrefix = "resource";
+    int nResource = 4;
+    int nPartition = 2;
+    int nReplica = 3;
+
+    String[] resources = new String[nResource];
+    for (int i = 0; i < nResource; i++) {
+      resources[i] = resourcePrefix + "_" + i;
+    }
+
+    preSetup(resources, nReplica, nReplica);
+    event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap(resources, nPartition, "OnlineOffline"));
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+        getResourceMap(resources, nPartition, "OnlineOffline"));
+
+    // Initialize bestpossible state and current state
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    MessageOutput messageSelectOutput = new MessageOutput();
+    IntermediateStateOutput expectedResult = new IntermediateStateOutput();
+
+    _clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(1);
+    setClusterConfig(_clusterConfig);
+
+    for (String resource : resources) {
+      IdealState is = accessor.getProperty(accessor.keyBuilder().idealStates(resource));
+      setSingleIdealState(is);
+
+      Map<String, List<String>> partitionMap = new HashMap<>();
+      for (int p = 0; p < nPartition; p++) {
+        Partition partition = new Partition(resource + "_" + p);
+        for (int r = 0; r < nReplica; r++) {
+          String instanceName = HOSTNAME_PREFIX + r;
+
+          // PartitionMap is used as a preferenceList.
+          // For the last partition, let us add null as preferenceList.
+          if (p != nPartition - 1) {
+             partitionMap.put(partition.getPartitionName(), Collections.singletonList(instanceName));
+          } else {
+             partitionMap.put(partition.getPartitionName(), null);
+          }
+
+          // TODO: The following code is same for testNoStateMissing
+          if (resource.endsWith("0")) {
+            // Regular recovery balance
+            currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
+            // add blocked state transition messages
+            Message pendingMessage = generateMessage("OFFLINE", "ONLINE", instanceName);
+            currentStateOutput.setPendingMessage(resource, partition, instanceName, pendingMessage);
+
+            bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
+
+            // should be recovered:
+            expectedResult.setState(resource, partition, instanceName, "ONLINE");
+          } else if (resource.endsWith("1")) {
+            // Regular load balance
+            currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
+            currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", "OFFLINE");
+            bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
+            messageSelectOutput.addMessage(resource, partition,
+                generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
+            // should be recovered:
+            expectedResult.setState(resource, partition, instanceName, "ONLINE");
+          } else if (resource.endsWith("2")) {
+            // Recovery balance with transient states, should keep the current states in the output.
+            currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
+            bestPossibleStateOutput.setState(resource, partition, instanceName, "OFFLINE");
+            // should be kept unchanged:
+            expectedResult.setState(resource, partition, instanceName, "OFFLINE");
+          } else if (resource.endsWith("3")) {
+            // One unresolved error should not prevent recovery balance
+            bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
+            if (p == 0) {
+              if (r == 0) {
+                currentStateOutput.setCurrentState(resource, partition, instanceName, "ERROR");
+                bestPossibleStateOutput.setState(resource, partition, instanceName, "ERROR");
+                // This partition is still ERROR
+                expectedResult.setState(resource, partition, instanceName, "ERROR");
+              } else {
+                currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
+                messageSelectOutput.addMessage(resource, partition, generateMessage("OFFLINE", "ONLINE", instanceName));
+                // Recovery balance
+                expectedResult.setState(resource, partition, instanceName, "ONLINE");
+              }
+            } else {
+              currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
+              currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", "OFFLINE");
+              // load balance is throttled, so keep all current states
+              messageSelectOutput.addMessage(resource, partition,
+                  generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
+              expectedResult.setState(resource, partition, instanceName, "ONLINE");
+              // The following must be removed because now downward state transitions are allowed
+              // expectedResult.setState(resource, partition, instanceName + "-1", "OFFLINE");
+            }
+          } else if (resource.endsWith("4")) {
+            // Test that partitions with replicas to drop are dropping them when recovery is
+            // happening for other partitions
+            if (p == 0) {
+              // This partition requires recovery
+              currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
+              bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
+              messageSelectOutput.addMessage(resource, partition, generateMessage("OFFLINE", "ONLINE", instanceName));
+              // After recovery, it should be back ONLINE
+              expectedResult.setState(resource, partition, instanceName, "ONLINE");
+            } else {
+              // Other partitions require dropping of replicas
+              currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
+              currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", "OFFLINE");
+              // BestPossibleState dictates that we only need one ONLINE replica
+              bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
+              bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "DROPPED");
+              messageSelectOutput.addMessage(resource, partition,
+                  generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
+              // So instanceName-1 will NOT be expected to show up in expectedResult
+              expectedResult.setState(resource, partition, instanceName, "ONLINE");
+              expectedResult.setState(resource, partition, instanceName + "-1", "DROPPED");
+            }
+          } else if (resource.endsWith("5")) {
+            // Test that load balance bringing up a new replica does NOT happen with a recovery
+            // partition
+            if (p == 0) {
+              // Set up a partition requiring recovery
+              currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
+              bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
+              messageSelectOutput.addMessage(resource, partition, generateMessage("OFFLINE", "ONLINE", instanceName));
+              // After recovery, it should be back ONLINE
+              expectedResult.setState(resource, partition, instanceName, "ONLINE");
+            } else {
+              currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
+              bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
+              // Check that load balance (bringing up a new node) did not take place
+              bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "ONLINE");
+              messageSelectOutput.addMessage(resource, partition,
+                  generateMessage("OFFLINE", "ONLINE", instanceName + "-1"));
+              expectedResult.setState(resource, partition, instanceName, "ONLINE");
+            }
+          }
+        }
+      }
+      bestPossibleStateOutput.setPreferenceLists(resource, partitionMap);
+    }
+
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+    event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
+    runStage(event, new ReadClusterDataStage());
+    runStage(event, new IntermediateStateCalcStage());
+
+    IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
+
+    for (String resource : resources) {
+      // Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare
+      // anything.
+      Assert.assertTrue(output.getPartitionStateMap(resource)
+          .getStateMap()
+          .equals(expectedResult.getPartitionStateMap(resource).getStateMap()));
+    }
+  }
+
   private void preSetup(String[] resources, int numOfLiveInstances, int numOfReplicas) {
     setupIdealState(numOfLiveInstances, resources, numOfLiveInstances, numOfReplicas,
         IdealState.RebalanceMode.FULL_AUTO, "OnlineOffline");