You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/06/01 18:44:43 UTC
[helix] 02/07: Change throttling logic to per message (#1714)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 35dd2e77338abc4e544ec103bd3c4a6be180d335
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Wed Apr 28 13:20:09 2021 -0700
Change throttling logic to per message (#1714)
Apply the logic for throttling with per message quota charge.
---
.../stages/IntermediateStateCalcStage.java | 115 ++++++---------------
1 file changed, 33 insertions(+), 82 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index a840d5e..c46a6ce 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -631,104 +631,55 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
}
/**
- * Check the status on throttling at every level (cluster, resource, instance) and set
- * intermediatePartitionStateMap accordingly per partition.
- * @param throttleController
- * @param resourceName
- * @param partition
- * @param currentStateOutput
- * @param bestPossiblePartitionStateMap
- * @param partitionsThrottled
- * @param intermediatePartitionStateMap
- * @param rebalanceType
- * @param cache
+ * Check the status for a single message on throttling at every level (cluster, resource, replica) and set
+ * intermediatePartitionStateMap accordingly for that replica.
+ * @param throttleController throttle controller object for throttling quota
+ * @param resourceName the resource for throttling check
+ * @param partition the partition for throttling check
+ * @param messageToThrottle the message to be throttled
+ * @param messagesThrottled the cumulative set of messages that have been throttled already. These
+ * messages represent the replicas of this partition that have been throttled.
+ * @param intermediatePartitionStateMap the cumulative partition-state mapping as a result of the throttling step
+ * of IntermediateStateCalcStage
+ * @param rebalanceType the rebalance type to charge quota
+ * @param cache cached cluster metadata required by the throttle controller
*/
- private void throttleStateTransitionsForPartition(
- StateTransitionThrottleController throttleController, String resourceName,
- Partition partition, CurrentStateOutput currentStateOutput,
- PartitionStateMap bestPossiblePartitionStateMap, Set<Partition> partitionsThrottled,
+ private void throttleStateTransitionsForReplica(StateTransitionThrottleController throttleController,
+ String resourceName, Partition partition, Message messageToThrottle, Set<Message> messagesThrottled,
PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType,
ResourceControllerDataProvider cache) {
-
- Map<String, String> currentStateMap =
- currentStateOutput.getCurrentStateMap(resourceName, partition);
- Map<String, String> bestPossibleMap = bestPossiblePartitionStateMap.getPartitionMap(partition);
- Set<String> allInstances = new HashSet<>(currentStateMap.keySet());
- allInstances.addAll(bestPossibleMap.keySet());
- Map<String, String> intermediateMap = new HashMap<>();
-
boolean hasReachedThrottlingLimit = false;
if (throttleController.shouldThrottleForResource(rebalanceType, resourceName)) {
hasReachedThrottlingLimit = true;
if (logger.isDebugEnabled()) {
- LogUtil.logDebug(logger, _eventId,
- String.format("Throttled on partition: %s in resource: %s",
- partition.getPartitionName(), resourceName));
+ LogUtil.logDebug(logger, _eventId, String.format(
+ "Throttled because of cluster/resource quota is full for message {%s} on partition {%s} in resource {%s}",
+ messageToThrottle.getId(), partition.getPartitionName(), resourceName));
}
} else {
- // throttle if any of the instances are not able to accept state transitions
- for (String instance : allInstances) {
- String currentState = currentStateMap.get(instance);
- String bestPossibleState = bestPossibleMap.get(instance);
- if (bestPossibleState != null && !bestPossibleState.equals(currentState)
- && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
- .contains(instance)) {
- if (throttleController.shouldThrottleForInstance(rebalanceType, instance)) {
- hasReachedThrottlingLimit = true;
- if (logger.isDebugEnabled()) {
- LogUtil.logDebug(logger, _eventId,
- String.format(
- "Throttled because of instance: %s for partition: %s in resource: %s",
- instance, partition.getPartitionName(), resourceName));
- }
- break;
+ // Since message already generated, we can assume the current state is not null and target state is not null
+ if (!cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+ .contains(messageToThrottle.getTgtName())) {
+ if (throttleController.shouldThrottleForInstance(rebalanceType, messageToThrottle.getTgtName())) {
+ hasReachedThrottlingLimit = true;
+ if (logger.isDebugEnabled()) {
+ LogUtil.logDebug(logger, _eventId, String.format(
+ "Throttled because of instance level quota is full on instance {%s} for message {%s} of partition {%s} in resource {%s}",
+ messageToThrottle.getId(), messageToThrottle.getTgtName(), partition.getPartitionName(), resourceName));
}
}
}
}
+ // If there is still room for this replica, proceed to charge at the cluster and resource level and set the
+ // intermediate partition-state mapping so that the state transition message can move forward.
if (!hasReachedThrottlingLimit) {
- // This implies that there is room for more state transitions.
- // Find instances with a replica whose current state is different from BestPossibleState and
- // "charge" for it, and bestPossibleStates will become intermediate states
- intermediateMap.putAll(bestPossibleMap);
- boolean shouldChargeForPartition = false;
- for (String instance : allInstances) {
- String currentState = currentStateMap.get(instance);
- String bestPossibleState = bestPossibleMap.get(instance);
- if (bestPossibleState != null && !bestPossibleState.equals(currentState)
- && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
- .contains(instance)) {
- throttleController.chargeInstance(rebalanceType, instance);
- shouldChargeForPartition = true;
- }
- }
- if (shouldChargeForPartition) {
- throttleController.chargeCluster(rebalanceType);
- throttleController.chargeResource(rebalanceType, resourceName);
- }
+ throttleController.chargeCluster(rebalanceType);
+ throttleController.chargeResource(rebalanceType, resourceName);
+ intermediatePartitionStateMap.setState(partition, messageToThrottle.getTgtName(), messageToThrottle.getToState());
} else {
- // No more room for more state transitions; current states will just become intermediate
- // states unless the partition is disabled
- // Add this partition to a set of throttled partitions
- for (String instance : allInstances) {
- String currentState = currentStateMap.get(instance);
- String bestPossibleState = bestPossibleMap.get(instance);
- if (bestPossibleState != null && !bestPossibleState.equals(currentState)
- && cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
- .contains(instance)) {
- // Because this partition is disabled, we allow assignment
- intermediateMap.put(instance, bestPossibleState);
- } else {
- // This partition is not disabled, so it must be throttled by just passing on the current
- // state
- if (currentState != null) {
- intermediateMap.put(instance, currentState);
- }
- partitionsThrottled.add(partition);
- }
- }
+ // Intermediate Map is based on current state
+ messagesThrottled.add(messageToThrottle);
}
- intermediatePartitionStateMap.setState(partition, intermediateMap);
}
/**