You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/06/01 18:44:43 UTC

[helix] 02/07: Change throttling logic to per message (#1714)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 35dd2e77338abc4e544ec103bd3c4a6be180d335
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Wed Apr 28 13:20:09 2021 -0700

    Change throttling logic to per message (#1714)
    
    Apply the logic for throttling with per message quota charge.
---
 .../stages/IntermediateStateCalcStage.java         | 115 ++++++---------------
 1 file changed, 33 insertions(+), 82 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index a840d5e..c46a6ce 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -631,104 +631,55 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
   }
 
   /**
-   * Check the status on throttling at every level (cluster, resource, instance) and set
-   * intermediatePartitionStateMap accordingly per partition.
-   * @param throttleController
-   * @param resourceName
-   * @param partition
-   * @param currentStateOutput
-   * @param bestPossiblePartitionStateMap
-   * @param partitionsThrottled
-   * @param intermediatePartitionStateMap
-   * @param rebalanceType
-   * @param cache
+   * Check the status for a single message on throttling at every level (cluster, resource, replica) and set
+   * intermediatePartitionStateMap accordingly for that replica.
+   * @param throttleController                throttle controller object for throttling quota
+   * @param resourceName                      the resource for throttling check
+   * @param partition                         the partition for throttling check
+   * @param messageToThrottle                 the message to be throttled
+   * @param messagesThrottled                 the cumulative set of messages that have been throttled already. These
+   *                                          messages represent the replicas of this partition that have been throttled.
+   * @param intermediatePartitionStateMap     the cumulative partition-state mapping as a result of the throttling step
+   *                                          of IntermediateStateCalcStage
+   * @param rebalanceType                     the rebalance type to charge quota
+   * @param cache                             cached cluster metadata required by the throttle controller
    */
-  private void throttleStateTransitionsForPartition(
-      StateTransitionThrottleController throttleController, String resourceName,
-      Partition partition, CurrentStateOutput currentStateOutput,
-      PartitionStateMap bestPossiblePartitionStateMap, Set<Partition> partitionsThrottled,
+  private void throttleStateTransitionsForReplica(StateTransitionThrottleController throttleController,
+      String resourceName, Partition partition, Message messageToThrottle, Set<Message> messagesThrottled,
       PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType,
       ResourceControllerDataProvider cache) {
-
-    Map<String, String> currentStateMap =
-        currentStateOutput.getCurrentStateMap(resourceName, partition);
-    Map<String, String> bestPossibleMap = bestPossiblePartitionStateMap.getPartitionMap(partition);
-    Set<String> allInstances = new HashSet<>(currentStateMap.keySet());
-    allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, String> intermediateMap = new HashMap<>();
-
     boolean hasReachedThrottlingLimit = false;
     if (throttleController.shouldThrottleForResource(rebalanceType, resourceName)) {
       hasReachedThrottlingLimit = true;
       if (logger.isDebugEnabled()) {
-        LogUtil.logDebug(logger, _eventId,
-            String.format("Throttled on partition: %s in resource: %s",
-                partition.getPartitionName(), resourceName));
+        LogUtil.logDebug(logger, _eventId, String.format(
+            "Throttled because of cluster/resource quota is full for message {%s} on partition {%s} in resource {%s}",
+            messageToThrottle.getId(), partition.getPartitionName(), resourceName));
       }
     } else {
-      // throttle if any of the instances are not able to accept state transitions
-      for (String instance : allInstances) {
-        String currentState = currentStateMap.get(instance);
-        String bestPossibleState = bestPossibleMap.get(instance);
-        if (bestPossibleState != null && !bestPossibleState.equals(currentState)
-            && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
-                .contains(instance)) {
-          if (throttleController.shouldThrottleForInstance(rebalanceType, instance)) {
-            hasReachedThrottlingLimit = true;
-            if (logger.isDebugEnabled()) {
-              LogUtil.logDebug(logger, _eventId,
-                  String.format(
-                      "Throttled because of instance: %s for partition: %s in resource: %s",
-                      instance, partition.getPartitionName(), resourceName));
-            }
-            break;
+      // Since message already generated, we can assume the current state is not null and target state is not null
+      if (!cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+          .contains(messageToThrottle.getTgtName())) {
+        if (throttleController.shouldThrottleForInstance(rebalanceType, messageToThrottle.getTgtName())) {
+          hasReachedThrottlingLimit = true;
+          if (logger.isDebugEnabled()) {
+            LogUtil.logDebug(logger, _eventId, String.format(
+                "Throttled because of instance level quota is full on instance {%s} for message {%s} of partition {%s} in resource {%s}",
+                messageToThrottle.getId(), messageToThrottle.getTgtName(), partition.getPartitionName(), resourceName));
           }
         }
       }
     }
+    // If there is still room for this replica, proceed to charge at the cluster and resource level and set the
+    // intermediate partition-state mapping so that the state transition message can move forward.
     if (!hasReachedThrottlingLimit) {
-      // This implies that there is room for more state transitions.
-      // Find instances with a replica whose current state is different from BestPossibleState and
-      // "charge" for it, and bestPossibleStates will become intermediate states
-      intermediateMap.putAll(bestPossibleMap);
-      boolean shouldChargeForPartition = false;
-      for (String instance : allInstances) {
-        String currentState = currentStateMap.get(instance);
-        String bestPossibleState = bestPossibleMap.get(instance);
-        if (bestPossibleState != null && !bestPossibleState.equals(currentState)
-            && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
-                .contains(instance)) {
-          throttleController.chargeInstance(rebalanceType, instance);
-          shouldChargeForPartition = true;
-        }
-      }
-      if (shouldChargeForPartition) {
-        throttleController.chargeCluster(rebalanceType);
-        throttleController.chargeResource(rebalanceType, resourceName);
-      }
+      throttleController.chargeCluster(rebalanceType);
+      throttleController.chargeResource(rebalanceType, resourceName);
+      intermediatePartitionStateMap.setState(partition, messageToThrottle.getTgtName(), messageToThrottle.getToState());
     } else {
-      // No more room for more state transitions; current states will just become intermediate
-      // states unless the partition is disabled
-      // Add this partition to a set of throttled partitions
-      for (String instance : allInstances) {
-        String currentState = currentStateMap.get(instance);
-        String bestPossibleState = bestPossibleMap.get(instance);
-        if (bestPossibleState != null && !bestPossibleState.equals(currentState)
-            && cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
-                .contains(instance)) {
-          // Because this partition is disabled, we allow assignment
-          intermediateMap.put(instance, bestPossibleState);
-        } else {
-          // This partition is not disabled, so it must be throttled by just passing on the current
-          // state
-          if (currentState != null) {
-            intermediateMap.put(instance, currentState);
-          }
-          partitionsThrottled.add(partition);
-        }
-      }
+      // Intermediate Map is based on current state
+      messagesThrottled.add(messageToThrottle);
     }
-    intermediatePartitionStateMap.setState(partition, intermediateMap);
   }
 
   /**