You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2019/11/06 21:34:08 UTC

[helix] branch master updated: Fix partitions double charged for pending message

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 554dcd0  Fix partitions double charged for pending message
554dcd0 is described below

commit 554dcd0e328f96462c4712019be7513d9166b4ee
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Fri Nov 1 17:00:25 2019 -0700

    Fix partitions double charged for pending message
    
    Current Helix will charge the partitions with pending message twice due to not remove the partition from partitions need recovery/load balance set when they are charged for pending message.
    
    This fix will fix the problem for recovery/load/ANY rebalance type charged for pending messages.
---
 .../stages/IntermediateStateCalcStage.java         | 45 ++++++++++++-
 .../stages/TestIntermediateStateCalcStage.java     | 77 ++++++++++++----------
 2 files changed, 85 insertions(+), 37 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index da31b37..37df042 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -363,7 +363,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     }
 
     chargePendingTransition(resource, currentStateOutput, throttleController,
-        partitionsNeedRecovery, partitionsNeedLoadBalance, cache);
+        partitionsNeedRecovery, partitionsNeedLoadBalance, cache,
+        bestPossiblePartitionStateMap, intermediatePartitionStateMap);
 
     // Perform recovery balance
     Set<Partition> recoveryThrottledPartitions =
@@ -461,7 +462,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
    */
   private void chargePendingTransition(Resource resource, CurrentStateOutput currentStateOutput,
       StateTransitionThrottleController throttleController, Set<Partition> partitionsNeedRecovery,
-      Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider cache) {
+      Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider cache,
+      PartitionStateMap bestPossiblePartitionStateMap,
+      PartitionStateMap intermediatePartitionStateMap) {
     String resourceName = resource.getResourceName();
 
     // check and charge pending transitions
@@ -487,10 +490,20 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
           String pendingState = pendingMap.get(instance);
           if (pendingState != null && !pendingState.equals(currentState)
               && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
-                  .contains(instance)) {
+              .contains(instance)) {
             // Only charge this instance if the partition is not disabled
             throttleController.chargeInstance(rebalanceType, instance);
             shouldChargePartition = true;
+            // If there is a pending state transition for the partition, that means that an assignment
+            // has already been made and the state transition message has already been sent out for the partition
+            // in a previous pipeline run. We must honor this and reflect it by charging for the pending state transition message.
+
+            // Since the assignment has already been made for the pending message, we do a special treatment
+            // for it by setting the best possible state directly in intermediatePartitionStateMap so that the pending
+            // message won't be double-assigned or double-charged in recovery or load balance.
+            handlePendingStateTransitionsForThrottling(partition, partitionsNeedRecovery,
+                partitionsNeedLoadbalance, rebalanceType, bestPossiblePartitionStateMap,
+                intermediatePartitionStateMap);
           }
         }
         if (shouldChargePartition) {
@@ -945,4 +958,30 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       return matchedState;
     }
   }
+
+  /**
+   * Handle a partition with a pending message so that the partition will not be double-charged or double-assigned during recovery and load balance.
+   * @param partition
+   * @param partitionsNeedRecovery
+   * @param partitionsNeedLoadbalance
+   * @param rebalanceType
+   */
+  private void handlePendingStateTransitionsForThrottling(Partition partition,
+      Set<Partition> partitionsNeedRecovery, Set<Partition> partitionsNeedLoadbalance,
+      RebalanceType rebalanceType, PartitionStateMap bestPossiblePartitionStateMap,
+      PartitionStateMap intermediatePartitionStateMap) {
+    // Pass the best possible state directly into intermediatePartitionStateMap
+    // This is safe to do so because we already have a pending transition for this partition, implying that the assignment has been made in previous pipeline
+    intermediatePartitionStateMap
+        .setState(partition, bestPossiblePartitionStateMap.getPartitionMap(partition));
+    // Remove the partition's name from the set of partition (names) that need to be charged and assigned to prevent double-processing
+    switch (rebalanceType) {
+    case RECOVERY_BALANCE:
+      partitionsNeedRecovery.remove(partition);
+      break;
+    case LOAD_BALANCE:
+      partitionsNeedLoadbalance.remove(partition);
+      break;
+    }
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
index bb7ed98..dd727ea 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
@@ -25,14 +25,18 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+
 public class TestIntermediateStateCalcStage extends BaseStageTest {
   private ClusterConfig _clusterConfig;
 
@@ -43,17 +47,16 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
     int nPartition = 2;
     int nReplica = 3;
 
-    Set<String> resourceSet = new HashSet<>();
+    String[] resources = new String[nResource];
     for (int i = 0; i < nResource; i++) {
-      resourceSet.add(resourcePrefix + "_" + i);
+      resources[i] = resourcePrefix + "_" + i;
     }
 
-    preSetup(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, resourceSet, nReplica,
-        nReplica);
-    event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap(
-        resourceSet.toArray(new String[resourceSet.size()]), nPartition, "OnlineOffline"));
-    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), getResourceMap(
-        resourceSet.toArray(new String[resourceSet.size()]), nPartition, "OnlineOffline"));
+    preSetup(resources, nReplica, nReplica);
+    event.addAttribute(AttributeName.RESOURCES.name(),
+        getResourceMap(resources, nPartition, "OnlineOffline"));
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+        getResourceMap(resources, nPartition, "OnlineOffline"));
 
     // Initialize bestpossible state and current state
     BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
@@ -64,7 +67,7 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
     _clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(1);
     setClusterConfig(_clusterConfig);
 
-    for (String resource : resourceSet) {
+    for (String resource : resources) {
       IdealState is = accessor.getProperty(accessor.keyBuilder().idealStates(resource));
       setSingleIdealState(is);
 
@@ -77,6 +80,11 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
           if (resource.endsWith("0")) {
             // Regular recovery balance
             currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
+            // add blocked state transition messages
+            Message pendingMessage = new Message("customType", "001");
+            pendingMessage.setToState("ONLINE");
+            currentStateOutput.setPendingMessage(resource, partition, instanceName, pendingMessage);
+
             bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
             // should be recovered:
             expectedResult.setState(resource, partition, instanceName, "ONLINE");
@@ -109,8 +117,8 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
               }
             } else {
               currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
-              currentStateOutput.setCurrentState(resource, partition, instanceName + "-1",
-                  "OFFLINE");
+              currentStateOutput
+                  .setCurrentState(resource, partition, instanceName + "-1", "OFFLINE");
               // load balance is throttled, so keep all current states
               expectedResult.setState(resource, partition, instanceName, "ONLINE");
               // The following must be removed because now downward state transitions are allowed
@@ -128,8 +136,8 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
             } else {
               // Other partitions require dropping of replicas
               currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
-              currentStateOutput.setCurrentState(resource, partition, instanceName + "-1",
-                  "OFFLINE");
+              currentStateOutput
+                  .setCurrentState(resource, partition, instanceName + "-1", "OFFLINE");
               // BestPossibleState dictates that we only need one ONLINE replica
               bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
               bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "DROPPED");
@@ -152,7 +160,6 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
               // Check that load balance (bringing up a new node) did not take place
               bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "ONLINE");
               expectedResult.setState(resource, partition, instanceName, "ONLINE");
-
             }
           }
         }
@@ -169,7 +176,7 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
 
     IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
 
-    for (String resource : resourceSet) {
+    for (String resource : resources) {
       // Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare
       // anything.
       Assert.assertTrue(output.getPartitionStateMap(resource).getStateMap()
@@ -184,24 +191,24 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
     int nPartition = 2;
     int nReplica = 3;
 
-    Set<String> resourceSet = new HashSet<>();
+    String[] resources = new String[nResource];
     for (int i = 0; i < nResource; i++) {
-      resourceSet.add(resourcePrefix + "_" + i);
+      resources[i] = resourcePrefix + "_" + i;
     }
 
-    preSetup(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, resourceSet,
-        nReplica, nReplica);
-    event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap(
-        resourceSet.toArray(new String[resourceSet.size()]), nPartition, "OnlineOffline"));
-    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), getResourceMap(
-        resourceSet.toArray(new String[resourceSet.size()]), nPartition, "OnlineOffline"));
+    preSetup(resources, nReplica, nReplica);
+    event.addAttribute(AttributeName.RESOURCES.name(),
+        getResourceMap(resources, nPartition,
+            "OnlineOffline"));
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+        getResourceMap(resources, nPartition, "OnlineOffline"));
 
     // Initialize best possible state and current state
     BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
     CurrentStateOutput currentStateOutput = new CurrentStateOutput();
     IntermediateStateOutput expectedResult = new IntermediateStateOutput();
 
-    for (String resource : resourceSet) {
+    for (String resource : resources) {
       IdealState is = accessor.getProperty(accessor.keyBuilder().idealStates(resource));
       setSingleIdealState(is);
 
@@ -248,26 +255,28 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
 
     IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
 
-    for (String resource : resourceSet) {
+    for (String resource : resources) {
       // Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare
       // anything.
-      Assert.assertTrue(output.getPartitionStateMap(resource).getStateMap()
-          .equals(expectedResult.getPartitionStateMap(resource).getStateMap()));
+      Assert.assertEquals(output.getPartitionStateMap(resource).getStateMap(), expectedResult.getPartitionStateMap(resource).getStateMap());
     }
   }
 
-  private void preSetup(StateTransitionThrottleConfig.RebalanceType rebalanceType,
-      Set<String> resourceSet, int numOfLiveInstances, int numOfReplicas) {
-    setupIdealState(numOfLiveInstances, resourceSet.toArray(new String[resourceSet.size()]),
-        numOfLiveInstances, numOfReplicas, IdealState.RebalanceMode.FULL_AUTO, "OnlineOffline");
+  private void preSetup(String[] resources, int numOfLiveInstances, int numOfReplicas) {
+    setupIdealState(numOfLiveInstances, resources, numOfLiveInstances, numOfReplicas,
+        IdealState.RebalanceMode.FULL_AUTO, "OnlineOffline");
     setupStateModel();
     setupLiveInstances(numOfLiveInstances);
 
     // Set up cluster configs
     _clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
-    StateTransitionThrottleConfig throttleConfig = new StateTransitionThrottleConfig(rebalanceType,
-        StateTransitionThrottleConfig.ThrottleScope.CLUSTER, Integer.MAX_VALUE);
-    _clusterConfig.setStateTransitionThrottleConfigs(Collections.singletonList(throttleConfig));
+    _clusterConfig.setStateTransitionThrottleConfigs(ImmutableList
+        .of(new StateTransitionThrottleConfig(
+                StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
+                StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 3),
+            new StateTransitionThrottleConfig(
+                StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+                StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 3)));
     setClusterConfig(_clusterConfig);
   }
 }