You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/06/01 18:44:44 UTC

[helix] 03/07: Apply per replica level throttling logic to Recovery/Load Rebalance and PendingMessage #1719

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit c819bf7fad34efa7cd57c1363009bda48780560a
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Thu May 6 13:22:48 2021 -0700

    Apply per replica level throttling logic to Recovery/Load Rebalance and PendingMessage #1719
    
    This commit contains change of:
    
    Recovery/Load Rebalance and pending messages charge adapting per replica throttling.
    Change partition level comparator based on 1) missed top states, 2) missed active replicas and 3) ideal state matching or not.
---
 .../stages/IntermediateStateCalcStage.java         | 299 +++++++--------------
 1 file changed, 101 insertions(+), 198 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index c46a6ce..6a5d2f1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -365,9 +365,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
               resourceName, partitionsWithErrorStateReplica));
     }
 
-    chargePendingTransition(resource, currentStateOutput, throttleController,
-        partitionsNeedRecovery, partitionsNeedLoadBalance, cache,
-        bestPossiblePartitionStateMap, intermediatePartitionStateMap);
+    chargePendingTransition(resource, currentStateOutput, throttleController, cache, preferenceLists, stateModelDef, intermediatePartitionStateMap);
 
     // Perform recovery balance
     Set<Partition> recoveryThrottledPartitions =
@@ -423,211 +421,114 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
   }
 
   /**
-   * Check for a partition, whether all transitions for its replicas are downward transitions. Note
-   * that this function does NOT check for ERROR states.
-   * @param currentStateMap
-   * @param bestPossibleMap
-   * @param stateModelDef
-   * @return true if there are; false otherwise
+   * Determine the message is downward message or not.
+   * @param message                  message for load rebalance
+   * @param stateModelDefinition     state model definition object for this resource
+   * @return                         set of messages allowed for downward state transitions
    */
-  private boolean isLoadBalanceDownwardForAllReplicas(Map<String, String> currentStateMap,
-      Map<String, String> bestPossibleMap, StateModelDefinition stateModelDef) {
-    Set<String> allInstances = new HashSet<>();
-    allInstances.addAll(currentStateMap.keySet());
-    allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, Integer> statePriorityMap = stateModelDef.getStatePriorityMap();
-
-    for (String instance : allInstances) {
-      String currentState = currentStateMap.get(instance);
-      String bestPossibleState = bestPossibleMap.get(instance);
-      if (currentState == null) {
-        return false; // null -> state is upward
-      }
-      if (bestPossibleState != null) {
-        // Compare priority values and return if an upward transition is found
-        // Note that lower integer value implies higher priority
-        if (!statePriorityMap.containsKey(currentState)
-            || !statePriorityMap.containsKey(bestPossibleState)) {
-          // If the state is not found in statePriorityMap, consider it not strictly downward by
-          // default because we can't determine whether it is downward
-          return false;
-        }
-        if (statePriorityMap.get(currentState) > statePriorityMap.get(bestPossibleState)) {
-          return false;
-        }
-      }
-    }
-    return true;
+  private boolean isLoadBalanceDownwardStateTransition(Message message, StateModelDefinition stateModelDefinition) {
+    // state model definition is not found
+    if (stateModelDefinition == null) {
+      return false;
+    }
+
+    Map<String, Integer> statePriorityMap = stateModelDefinition.getStatePriorityMap();
+    // Compare priority values and return if an upward transition is found
+    // Note that lower integer value implies higher priority
+    // If the state is not found in statePriorityMap, consider it not strictly downward by
+    // default because we can't determine whether it is downward
+    if (statePriorityMap.containsKey(message.getFromState()) && statePriorityMap.containsKey(message.getToState())
+        && statePriorityMap.get(message.getFromState()) > statePriorityMap.get(message.getToState())) {
+      return true;
+    }
+    return false;
   }
 
   /**
    * Check and charge all pending transitions for throttling.
    */
   private void chargePendingTransition(Resource resource, CurrentStateOutput currentStateOutput,
-      StateTransitionThrottleController throttleController, Set<Partition> partitionsNeedRecovery,
-      Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider cache,
-      PartitionStateMap bestPossiblePartitionStateMap,
+      StateTransitionThrottleController throttleController, ResourceControllerDataProvider cache,
+      Map<String, List<String>> preferenceLists, StateModelDefinition stateModelDefinition,
       PartitionStateMap intermediatePartitionStateMap) {
     String resourceName = resource.getResourceName();
-
     // check and charge pending transitions
     for (Partition partition : resource.getPartitions()) {
+      // To clarify that custom mode does not apply recovery/load rebalance since user can define different number of
+      // replicas for different partitions. Actually, the custom will stopped from resource level checks if this resource
+      // is not FULL_AUTO, we will return best possible state and do nothing.
+      Map<String, Integer> requiredStates =
+          getRequiredStates(resourceName, cache, preferenceLists.get(partition.getPartitionName()));
       // Maps instance to its current state
-      Map<String, String> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resourceName, partition);
+      Map<String, String> currentStateMap = currentStateOutput.getCurrentStateMap(resourceName, partition);
       // Maps instance to its pending (next) state
-      Map<String, String> pendingMap =
-          currentStateOutput.getPendingStateMap(resourceName, partition);
-
-      StateTransitionThrottleConfig.RebalanceType rebalanceType = RebalanceType.NONE;
-      if (partitionsNeedRecovery.contains(partition)) {
-        rebalanceType = StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE;
-      } else if (partitionsNeedLoadbalance.contains(partition)) {
-        rebalanceType = StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE;
-      }
-
-      if (pendingMap.size() > 0) {
-        boolean shouldChargePartition = false;
-        for (String instance : pendingMap.keySet()) {
-          String currentState = currentStateMap.get(instance);
-          String pendingState = pendingMap.get(instance);
-          if (pendingState != null && !pendingState.equals(currentState)
-              && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
-              .contains(instance)) {
-            // Only charge this instance if the partition is not disabled
-            throttleController.chargeInstance(rebalanceType, instance);
-            shouldChargePartition = true;
-            // If there is a pending state transition for the partition, that means that an assignment
-            // has already been made and the state transition message has already been sent out for the partition
-            // in a previous pipeline run. We must honor this and reflect it by charging for the pending state transition message.
-
-            // Since the assignment has already been made for the pending message, we do a special treatment
-            // for it by setting the best possible state directly in intermediatePartitionStateMap so that the pending
-            // message won't be double-assigned or double-charged in recovery or load balance.
-            handlePendingStateTransitionsForThrottling(partition, partitionsNeedRecovery,
-                partitionsNeedLoadbalance, rebalanceType, bestPossiblePartitionStateMap,
-                intermediatePartitionStateMap);
-          }
-        }
-        if (shouldChargePartition) {
-          throttleController.chargeCluster(rebalanceType);
+      List<Message> pendingMessages =
+          new ArrayList<>(currentStateOutput.getPendingMessageMap(resourceName, partition).values());
+      Collections.sort(pendingMessages, new MessagePriorityComparator(preferenceLists.get(partition.getPartitionName()),
+          stateModelDefinition.getStatePriorityMap()));
+
+      for (Message message : pendingMessages) {
+        StateTransitionThrottleConfig.RebalanceType rebalanceType =
+            getRebalanceTypePerMessage(requiredStates, message, currentStateMap);
+        String currentState = currentStateMap.get(message.getTgtName());
+        if (!message.getToState().equals(currentState) && message.getFromState().equals(currentState)
+            && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+            .contains(message.getTgtName())) {
+          throttleController.chargeInstance(rebalanceType, message.getTgtName());
           throttleController.chargeResource(rebalanceType, resourceName);
+          throttleController.chargeCluster(rebalanceType);
         }
+        intermediatePartitionStateMap.setState(partition, message.getTgtName(), message.getToState());
       }
     }
   }
 
   /**
-   * Sort partitions according to partition priority {@link PartitionPriorityComparator}, and for
-   * each partition, throttle state transitions if needed. Also populate
-   * intermediatePartitionStateMap either with BestPossibleState (if no throttling is necessary) or
-   * CurrentState (if throttled).
-   * @param resource
-   * @param bestPossiblePartitionStateMap
-   * @param throttleController
-   * @param intermediatePartitionStateMap
-   * @param partitionsNeedRecovery
-   * @param currentStateOutput
-   * @param topState
-   * @param cache
-   * @return a set of partitions that need recovery but did not get recovered due to throttling
+   * Thin wrapper for per message throttling with recovery rebalance type. Also populate
+   * intermediatePartitionStateMap with generated messages from {@link MessageGenerationPhase}.
+   * @param resource                      the resource to throttle
+   * @param throttleController            throttle controller object
+   * @param messageToThrottle             the message to be throttled
+   * @param intermediatePartitionStateMap output result for this stage that intermediate state map
+   * @param cache                         cache object for computational metadata from external storage
+   * @param messagesThrottled             messages that have already been throttled
+   * @param resourceMessageMap            the map for all messages from MessageSelectStage. Remove the message
+   *                                      if it has been throttled
    */
-  private Set<Partition> recoveryRebalance(Resource resource,
-      PartitionStateMap bestPossiblePartitionStateMap,
-      StateTransitionThrottleController throttleController,
-      PartitionStateMap intermediatePartitionStateMap, Set<Partition> partitionsNeedRecovery,
-      CurrentStateOutput currentStateOutput, String topState,
-      ResourceControllerDataProvider cache) {
-    String resourceName = resource.getResourceName();
-    Set<Partition> partitionRecoveryBalanceThrottled = new HashSet<>();
-
-    // Maps Partition -> Instance -> State
-    Map<Partition, Map<String, String>> currentStateMap =
-        currentStateOutput.getCurrentStateMap(resourceName);
-    List<Partition> partitionsNeedRecoveryPrioritized = new ArrayList<>(partitionsNeedRecovery);
-
-    // We want the result of the intermediate state calculation to be deterministic. We sort here by
-    // partition name to ensure that the order is consistent for inputs fed into
-    // PartitionPriorityComparator sort
-    partitionsNeedRecoveryPrioritized.sort(Comparator.comparing(Partition::getPartitionName));
-    partitionsNeedRecoveryPrioritized.sort(new PartitionPriorityComparator(
-        bestPossiblePartitionStateMap.getStateMap(), currentStateMap, topState, true));
-
-    // For each partition, apply throttling if needed.
-    for (Partition partition : partitionsNeedRecoveryPrioritized) {
-      throttleStateTransitionsForPartition(throttleController, resourceName, partition,
-          currentStateOutput, bestPossiblePartitionStateMap, partitionRecoveryBalanceThrottled,
-          intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE, cache);
-    }
-    LogUtil.logInfo(logger, _eventId, String.format(
-        "For resource %s: Num of partitions needing recovery: %d, Num of partitions needing recovery"
-            + " but throttled (not recovered): %d",
-        resourceName, partitionsNeedRecovery.size(), partitionRecoveryBalanceThrottled.size()));
-    return partitionRecoveryBalanceThrottled;
+  private void recoveryRebalance(Resource resource, Partition partition,
+      StateTransitionThrottleController throttleController, Message messageToThrottle,
+      PartitionStateMap intermediatePartitionStateMap, ResourceControllerDataProvider cache,
+      Set<Message> messagesThrottled, Map<Partition, List<Message>> resourceMessageMap) {
+    throttleStateTransitionsForReplica(throttleController, resource.getResourceName(), partition, messageToThrottle,
+        messagesThrottled, intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE, cache, resourceMessageMap);
   }
 
   /**
-   * Sort partitions according to partition priority {@link PartitionPriorityComparator}, and for
-   * each partition, throttle state transitions if needed. Also populate
-   * intermediatePartitionStateMap either with BestPossibleState (if no throttling is necessary) or
-   * CurrentState (if throttled).
-   * @param resource
-   * @param currentStateOutput
-   * @param bestPossiblePartitionStateMap
-   * @param throttleController
-   * @param intermediatePartitionStateMap
-   * @param partitionsNeedLoadbalance
-   * @param currentStateMap
-   * @param onlyDownwardLoadBalance true when only allowing downward transitions
-   * @param stateModelDef for determining whether a partition's transitions are strictly downward
-   * @param cache
-   * @return
+   * Thin wrapper for per message throttling with load rebalance type. Also populate
+   * intermediatePartitionStateMap with generated messages from {@link MessageGenerationPhase}.
+   * @param resource                      the resource to throttle
+   * @param throttleController            throttle controller object
+   * @param messageToThrottle             the message to be throttle
+   * @param intermediatePartitionStateMap output result for this stage that intermediate state map
+   * @param cache                         cache object for computational metadata from external storage
+   * @param onlyDownwardLoadBalance       does allow only downward load balance
+   * @param stateModelDefinition          state model definition of this resource
+   * @param messagesThrottled             messages are already throttled
+   * @param resourceMessageMap            the map for all messages from MessageSelectStage. Remove the message
+   *                                      if it has been throttled
    */
-  private Set<Partition> loadRebalance(Resource resource, CurrentStateOutput currentStateOutput,
-      PartitionStateMap bestPossiblePartitionStateMap,
-      StateTransitionThrottleController throttleController,
-      PartitionStateMap intermediatePartitionStateMap, Set<Partition> partitionsNeedLoadbalance,
-      Map<Partition, Map<String, String>> currentStateMap, boolean onlyDownwardLoadBalance,
-      StateModelDefinition stateModelDef, ResourceControllerDataProvider cache) {
-    String resourceName = resource.getResourceName();
-    Set<Partition> partitionsLoadbalanceThrottled = new HashSet<>();
-
-    List<Partition> partitionsNeedLoadRebalancePrioritized =
-        new ArrayList<>(partitionsNeedLoadbalance);
-
-    // We want the result of the intermediate state calculation to be deterministic. We sort here by
-    // partition name to ensure that the order is consistent for inputs fed into
-    // PartitionPriorityComparator sort
-    partitionsNeedLoadRebalancePrioritized.sort(Comparator.comparing(Partition::getPartitionName));
-    partitionsNeedLoadRebalancePrioritized.sort(new PartitionPriorityComparator(
-        bestPossiblePartitionStateMap.getStateMap(), currentStateMap, "", false));
-
-    for (Partition partition : partitionsNeedLoadRebalancePrioritized) {
-      // If this is a downward load balance, check if the partition's transition is strictly
-      // downward
-      if (onlyDownwardLoadBalance) {
-        Map<String, String> currentStateMapForPartition =
-            currentStateOutput.getCurrentStateMap(resourceName, partition);
-        Map<String, String> bestPossibleMapForPartition =
-            bestPossiblePartitionStateMap.getPartitionMap(partition);
-        if (!isLoadBalanceDownwardForAllReplicas(currentStateMapForPartition,
-            bestPossibleMapForPartition, stateModelDef)) {
-          // For downward load balance, if a partition's transitions are not strictly downward,
-          // set currentState to intermediateState
-          intermediatePartitionStateMap.setState(partition, currentStateMapForPartition);
-          continue;
-        }
-      }
-      throttleStateTransitionsForPartition(throttleController, resourceName, partition,
-          currentStateOutput, bestPossiblePartitionStateMap, partitionsLoadbalanceThrottled,
-          intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE, cache);
-    }
-    LogUtil.logInfo(logger, _eventId,
-        String.format(
-            "For resource %s: Num of partitions needing load-balance: %d, Num of partitions needing"
-                + " load-balance but throttled (not load-balanced): %d",
-            resourceName, partitionsNeedLoadbalance.size(), partitionsLoadbalanceThrottled.size()));
-    return partitionsLoadbalanceThrottled;
+  private void loadRebalance(Resource resource, Partition partition,
+      StateTransitionThrottleController throttleController, Message messageToThrottle,
+      PartitionStateMap intermediatePartitionStateMap, ResourceControllerDataProvider cache,
+      boolean onlyDownwardLoadBalance, StateModelDefinition stateModelDefinition, Set<Message> messagesThrottled,
+      Map<Partition, List<Message>> resourceMessageMap) {
+    if (onlyDownwardLoadBalance && isLoadBalanceDownwardStateTransition(messageToThrottle, stateModelDefinition)) {
+      // Remove the message already allowed for downward state transitions.
+      intermediatePartitionStateMap.setState(partition, messageToThrottle.getTgtName(), messageToThrottle.getToState());
+      return;
+    }
+    throttleStateTransitionsForReplica(throttleController, resource.getResourceName(), partition, messageToThrottle,
+        messagesThrottled, intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE, cache, resourceMessageMap);
   }
 
   /**
@@ -643,11 +544,13 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
    *                                          of IntermediateStateCalcStage
    * @param rebalanceType                     the rebalance type to charge quota
    * @param cache                             cached cluster metadata required by the throttle controller
+   * @param resourceMessageMap                the map for all messages from MessageSelectStage. Remove the message
+   *                                          if it has been throttled.
    */
   private void throttleStateTransitionsForReplica(StateTransitionThrottleController throttleController,
       String resourceName, Partition partition, Message messageToThrottle, Set<Message> messagesThrottled,
       PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType,
-      ResourceControllerDataProvider cache) {
+      ResourceControllerDataProvider cache, Map<Partition, List<Message>> resourceMessageMap) {
     boolean hasReachedThrottlingLimit = false;
     if (throttleController.shouldThrottleForResource(rebalanceType, resourceName)) {
       hasReachedThrottlingLimit = true;
@@ -678,6 +581,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       intermediatePartitionStateMap.setState(partition, messageToThrottle.getTgtName(), messageToThrottle.getToState());
     } else {
       // Intermediate Map is based on current state
+      // Remove the message from MessageSelection result if it has been throttled since the message will be dispatched
+      // by next stage if it is not removed.
+      resourceMessageMap.get(partition).remove(messageToThrottle);
       messagesThrottled.add(messageToThrottle);
     }
   }
@@ -912,29 +818,26 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     private Map<Partition, Map<String, String>> _bestPossibleMap;
     private Map<Partition, Map<String, String>> _currentStateMap;
     private String _topState;
-    private boolean _recoveryRebalance;
 
     PartitionPriorityComparator(Map<Partition, Map<String, String>> bestPossibleMap,
-        Map<Partition, Map<String, String>> currentStateMap, String topState,
-        boolean recoveryRebalance) {
+        Map<Partition, Map<String, String>> currentStateMap, String topState){
       _bestPossibleMap = bestPossibleMap;
       _currentStateMap = currentStateMap;
       _topState = topState;
-      _recoveryRebalance = recoveryRebalance;
     }
 
     @Override
     public int compare(Partition p1, Partition p2) {
-      if (_recoveryRebalance) {
-        int missTopState1 = getMissTopStateIndex(p1);
-        int missTopState2 = getMissTopStateIndex(p2);
-        // Highest priority for the partition without top state
-        if (missTopState1 != missTopState2) {
-          return Integer.compare(missTopState1, missTopState2);
-        }
-        // Higher priority for the partition with fewer active replicas
-        int currentActiveReplicas1 = getCurrentActiveReplicas(p1);
-        int currentActiveReplicas2 = getCurrentActiveReplicas(p2);
+      int missTopState1 = getMissTopStateIndex(p1);
+      int missTopState2 = getMissTopStateIndex(p2);
+      // Highest priority for the partition without top state
+      if (missTopState1 != missTopState2) {
+        return Integer.compare(missTopState1, missTopState2);
+      }
+      // Higher priority for the partition with fewer active replicas
+      int currentActiveReplicas1 = getCurrentActiveReplicas(p1);
+      int currentActiveReplicas2 = getCurrentActiveReplicas(p2);
+      if (currentActiveReplicas1 != currentActiveReplicas2) {
         return Integer.compare(currentActiveReplicas1, currentActiveReplicas2);
       }
       // Higher priority for the partition with fewer replicas with states matching with IdealState