You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2021/04/23 22:30:00 UTC

[GitHub] [helix] dasahcc opened a new pull request #1714: [Replica Level Throttle] Change throttling logic to per message

dasahcc opened a new pull request #1714:
URL: https://github.com/apache/helix/pull/1714


   
   ### Issues
   
   - [X] My PR addresses the following Helix issues and references them in the PR description:
   
   resolves #343
   
   ### Description
   
   - [X] Here are some details about my PR, including screenshots of any UI changes:
   
   Apply the logic for throttling with per message quota charge. This method will be leveraged for load rebalance / recover rebalance throttling in main logic.
   
   For global change contexts please refer: #1713
   
   ### Tests
   
   Split PRs. The branch will not able to compile or test until finishes code change.
   Will have tests files and testing result in last several PRs.
   
   - [ ] The following tests are written for this issue:
   
   - The following is the result of the "mvn test" command on the appropriate module:
   
   
   ### Documentation (Optional)
   
   - In case of new functionality, my PR adds documentation in the following wiki page:
   
   (Link the GitHub wiki you added)
   
   ### Commits
   
   - My commits all reference appropriate Apache Helix GitHub issues in their subject lines. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Code Quality
   
   - My diff has been formatted using helix-style.xml 
   (helix-style-intellij.xml if IntelliJ IDE is used)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] dasahcc commented on a change in pull request #1714: [Replica Level Throttle] Change throttling logic to per message

Posted by GitBox <gi...@apache.org>.
dasahcc commented on a change in pull request #1714:
URL: https://github.com/apache/helix/pull/1714#discussion_r621693298



##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -631,104 +631,53 @@ private void chargePendingTransition(Resource resource, CurrentStateOutput curre
   }
 
   /**
-   * Check the status on throttling at every level (cluster, resource, instance) and set
-   * intermediatePartitionStateMap accordingly per partition.
-   * @param throttleController
-   * @param resourceName
-   * @param partition
-   * @param currentStateOutput
-   * @param bestPossiblePartitionStateMap
-   * @param partitionsThrottled
-   * @param intermediatePartitionStateMap
-   * @param rebalanceType
-   * @param cache
+   * Check the status for a single message on throttling at every level (cluster, resource, replica) and set
+   * intermediatePartitionStateMap accordingly for that replica.
+   * @param throttleController                throttle controller object for throttling quota
+   * @param resourceName                      the resource for throttling check
+   * @param partition                         the partition for throttling check
+   * @param messageToThrottle                 the message to throttle
+   * @param messagesThrottled                 messages have been throttled
+   * @param intermediatePartitionStateMap     output for this stage of intermediate map
+   * @param rebalanceType                     the rebalance type to charge quota
+   * @param cache                             cache for controller required metadata
    */
-  private void throttleStateTransitionsForPartition(
-      StateTransitionThrottleController throttleController, String resourceName,
-      Partition partition, CurrentStateOutput currentStateOutput,
-      PartitionStateMap bestPossiblePartitionStateMap, Set<Partition> partitionsThrottled,
+  private void throttleStateTransitionsForReplica(StateTransitionThrottleController throttleController,
+      String resourceName, Partition partition, Message messageToThrottle, Set<Message> messagesThrottled,
       PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType,
       ResourceControllerDataProvider cache) {
-
-    Map<String, String> currentStateMap =
-        currentStateOutput.getCurrentStateMap(resourceName, partition);
-    Map<String, String> bestPossibleMap = bestPossiblePartitionStateMap.getPartitionMap(partition);
-    Set<String> allInstances = new HashSet<>(currentStateMap.keySet());
-    allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, String> intermediateMap = new HashMap<>();
-
     boolean hasReachedThrottlingLimit = false;
     if (throttleController.shouldThrottleForResource(rebalanceType, resourceName)) {
       hasReachedThrottlingLimit = true;
       if (logger.isDebugEnabled()) {
-        LogUtil.logDebug(logger, _eventId,
-            String.format("Throttled on partition: %s in resource: %s",
-                partition.getPartitionName(), resourceName));
+        LogUtil.logDebug(logger, _eventId, String.format(
+            "Throttled because of cluster/resource quota is full for message {%s} on partition {%s} in resource {%s}",
+            messageToThrottle.getId(), partition.getPartitionName(), resourceName));
       }
     } else {
-      // throttle if any of the instances are not able to accept state transitions
-      for (String instance : allInstances) {
-        String currentState = currentStateMap.get(instance);
-        String bestPossibleState = bestPossibleMap.get(instance);
-        if (bestPossibleState != null && !bestPossibleState.equals(currentState)
-            && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
-                .contains(instance)) {
-          if (throttleController.shouldThrottleForInstance(rebalanceType, instance)) {
-            hasReachedThrottlingLimit = true;
-            if (logger.isDebugEnabled()) {
-              LogUtil.logDebug(logger, _eventId,
-                  String.format(
-                      "Throttled because of instance: %s for partition: %s in resource: %s",
-                      instance, partition.getPartitionName(), resourceName));
-            }
-            break;
+      // Since message already generated, we can assume the current state is not null and target state is not null
+      if (!cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+          .contains(messageToThrottle.getTgtName())) {
+        if (throttleController.shouldThrottleForInstance(rebalanceType, messageToThrottle.getTgtName())) {
+          hasReachedThrottlingLimit = true;
+          if (logger.isDebugEnabled()) {
+            LogUtil.logDebug(logger, _eventId, String.format(
+                "Throttled because of instance level quota is full on instance {%s} for message {%s} of partition {%s} in resource {%s}",
+                messageToThrottle.getId(), messageToThrottle.getTgtName(), partition.getPartitionName(), resourceName));
           }
         }
       }
     }
+    // If there is a room to hold existing messages, charge this message.

Review comment:
       For your question:
   
   1. Yes, even this function is called more times than single partition, there is no cost operations, such as new object creation / deletion, data collection looping. The only operations are read/update the map in throttle controllers and intermediate state map.
   2. Usually you will have 3x as the one partition for throttling. Because a lot of messages have dependencies, which are removed by MessageSelectionStage. From historical data in production log, you can see that the number of messages is far less than what we have now.
   
   Finally, as I said, this is non avoidable operation. Because each of the throttling charge operation success or fail can lead different result of next message rebalance type. The only thing we can do is trying to make sure no cost operations for this simple function call and that is also the reason I am trying to let this function to be reviewed independently instead of combing it into a big logic code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] dasahcc merged pull request #1714: [Replica Level Throttle] Change throttling logic to per message

Posted by GitBox <gi...@apache.org>.
dasahcc merged pull request #1714:
URL: https://github.com/apache/helix/pull/1714


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1714: [Replica Level Throttle] Change throttling logic to per message

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1714:
URL: https://github.com/apache/helix/pull/1714#discussion_r621629630



##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -631,104 +631,53 @@ private void chargePendingTransition(Resource resource, CurrentStateOutput curre
   }
 
   /**
-   * Check the status on throttling at every level (cluster, resource, instance) and set
-   * intermediatePartitionStateMap accordingly per partition.
-   * @param throttleController
-   * @param resourceName
-   * @param partition
-   * @param currentStateOutput
-   * @param bestPossiblePartitionStateMap
-   * @param partitionsThrottled
-   * @param intermediatePartitionStateMap
-   * @param rebalanceType
-   * @param cache
+   * Check the status for a single message on throttling at every level (cluster, resource, replica) and set
+   * intermediatePartitionStateMap accordingly for that replica.
+   * @param throttleController                throttle controller object for throttling quota
+   * @param resourceName                      the resource for throttling check
+   * @param partition                         the partition for throttling check
+   * @param messageToThrottle                 the message to throttle
+   * @param messagesThrottled                 messages have been throttled

Review comment:
       Nit: messages that have been throttled already

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -631,104 +631,53 @@ private void chargePendingTransition(Resource resource, CurrentStateOutput curre
   }
 
   /**
-   * Check the status on throttling at every level (cluster, resource, instance) and set
-   * intermediatePartitionStateMap accordingly per partition.
-   * @param throttleController
-   * @param resourceName
-   * @param partition
-   * @param currentStateOutput
-   * @param bestPossiblePartitionStateMap
-   * @param partitionsThrottled
-   * @param intermediatePartitionStateMap
-   * @param rebalanceType
-   * @param cache
+   * Check the status for a single message on throttling at every level (cluster, resource, replica) and set
+   * intermediatePartitionStateMap accordingly for that replica.
+   * @param throttleController                throttle controller object for throttling quota
+   * @param resourceName                      the resource for throttling check
+   * @param partition                         the partition for throttling check
+   * @param messageToThrottle                 the message to throttle
+   * @param messagesThrottled                 messages have been throttled
+   * @param intermediatePartitionStateMap     output for this stage of intermediate map
+   * @param rebalanceType                     the rebalance type to charge quota
+   * @param cache                             cache for controller required metadata

Review comment:
       Nit: "cached cluster metadata required by the controller"

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -631,104 +631,53 @@ private void chargePendingTransition(Resource resource, CurrentStateOutput curre
   }
 
   /**
-   * Check the status on throttling at every level (cluster, resource, instance) and set
-   * intermediatePartitionStateMap accordingly per partition.
-   * @param throttleController
-   * @param resourceName
-   * @param partition
-   * @param currentStateOutput
-   * @param bestPossiblePartitionStateMap
-   * @param partitionsThrottled
-   * @param intermediatePartitionStateMap
-   * @param rebalanceType
-   * @param cache
+   * Check the status for a single message on throttling at every level (cluster, resource, replica) and set
+   * intermediatePartitionStateMap accordingly for that replica.
+   * @param throttleController                throttle controller object for throttling quota
+   * @param resourceName                      the resource for throttling check
+   * @param partition                         the partition for throttling check
+   * @param messageToThrottle                 the message to throttle
+   * @param messagesThrottled                 messages have been throttled
+   * @param intermediatePartitionStateMap     output for this stage of intermediate map

Review comment:
       Nit: a little unclear - did you mean "the cumulative partition-state mapping as a result of the throttling step of IntermediateStateCalcStage"?

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -631,104 +631,53 @@ private void chargePendingTransition(Resource resource, CurrentStateOutput curre
   }
 
   /**
-   * Check the status on throttling at every level (cluster, resource, instance) and set
-   * intermediatePartitionStateMap accordingly per partition.
-   * @param throttleController
-   * @param resourceName
-   * @param partition
-   * @param currentStateOutput
-   * @param bestPossiblePartitionStateMap
-   * @param partitionsThrottled
-   * @param intermediatePartitionStateMap
-   * @param rebalanceType
-   * @param cache
+   * Check the status for a single message on throttling at every level (cluster, resource, replica) and set
+   * intermediatePartitionStateMap accordingly for that replica.
+   * @param throttleController                throttle controller object for throttling quota
+   * @param resourceName                      the resource for throttling check
+   * @param partition                         the partition for throttling check
+   * @param messageToThrottle                 the message to throttle
+   * @param messagesThrottled                 messages have been throttled
+   * @param intermediatePartitionStateMap     output for this stage of intermediate map
+   * @param rebalanceType                     the rebalance type to charge quota
+   * @param cache                             cache for controller required metadata
    */
-  private void throttleStateTransitionsForPartition(
-      StateTransitionThrottleController throttleController, String resourceName,
-      Partition partition, CurrentStateOutput currentStateOutput,
-      PartitionStateMap bestPossiblePartitionStateMap, Set<Partition> partitionsThrottled,
+  private void throttleStateTransitionsForReplica(StateTransitionThrottleController throttleController,
+      String resourceName, Partition partition, Message messageToThrottle, Set<Message> messagesThrottled,
       PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType,
       ResourceControllerDataProvider cache) {
-
-    Map<String, String> currentStateMap =
-        currentStateOutput.getCurrentStateMap(resourceName, partition);
-    Map<String, String> bestPossibleMap = bestPossiblePartitionStateMap.getPartitionMap(partition);
-    Set<String> allInstances = new HashSet<>(currentStateMap.keySet());
-    allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, String> intermediateMap = new HashMap<>();
-
     boolean hasReachedThrottlingLimit = false;
     if (throttleController.shouldThrottleForResource(rebalanceType, resourceName)) {
       hasReachedThrottlingLimit = true;
       if (logger.isDebugEnabled()) {
-        LogUtil.logDebug(logger, _eventId,
-            String.format("Throttled on partition: %s in resource: %s",
-                partition.getPartitionName(), resourceName));
+        LogUtil.logDebug(logger, _eventId, String.format(
+            "Throttled because of cluster/resource quota is full for message {%s} on partition {%s} in resource {%s}",
+            messageToThrottle.getId(), partition.getPartitionName(), resourceName));
       }
     } else {
-      // throttle if any of the instances are not able to accept state transitions
-      for (String instance : allInstances) {
-        String currentState = currentStateMap.get(instance);
-        String bestPossibleState = bestPossibleMap.get(instance);
-        if (bestPossibleState != null && !bestPossibleState.equals(currentState)
-            && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
-                .contains(instance)) {
-          if (throttleController.shouldThrottleForInstance(rebalanceType, instance)) {
-            hasReachedThrottlingLimit = true;
-            if (logger.isDebugEnabled()) {
-              LogUtil.logDebug(logger, _eventId,
-                  String.format(
-                      "Throttled because of instance: %s for partition: %s in resource: %s",
-                      instance, partition.getPartitionName(), resourceName));
-            }
-            break;
+      // Since message already generated, we can assume the current state is not null and target state is not null
+      if (!cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+          .contains(messageToThrottle.getTgtName())) {
+        if (throttleController.shouldThrottleForInstance(rebalanceType, messageToThrottle.getTgtName())) {
+          hasReachedThrottlingLimit = true;
+          if (logger.isDebugEnabled()) {
+            LogUtil.logDebug(logger, _eventId, String.format(
+                "Throttled because of instance level quota is full on instance {%s} for message {%s} of partition {%s} in resource {%s}",
+                messageToThrottle.getId(), messageToThrottle.getTgtName(), partition.getPartitionName(), resourceName));
           }
         }
       }
     }
+    // If there is a room to hold existing messages, charge this message.

Review comment:
       Nit: did you mean: "If there is still room for this replica, proceed to charge at the cluster and resource level and set the intermediate partition-state mapping so that the state transition will be generated."
   
   Question: please confirm with me if my understanding is correct: we do not have `chargeReplica()`; instead we incorporate the replica-level throttling by having each message represent a replica.
   
   So the number of calls for `throttleStateTransitionsForPartition()` would be the total number of partitions in the cluster whereas the number of calls for `throttleStateTransitionsForReplica()` would be the total number of replicas in the cluster, correct?
   
   On an average case, we will have 3x replicas as partitions (because applications usually use 3 as the replication factor). I am curious, are you able to do a quick local benchmark of the `IntermediateStateCalcStage` for before & after for a production-grade cluster? It might help us see if we are unwittingly creating a bottleneck here somewhere.

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -631,104 +631,53 @@ private void chargePendingTransition(Resource resource, CurrentStateOutput curre
   }
 
   /**
-   * Check the status on throttling at every level (cluster, resource, instance) and set
-   * intermediatePartitionStateMap accordingly per partition.
-   * @param throttleController
-   * @param resourceName
-   * @param partition
-   * @param currentStateOutput
-   * @param bestPossiblePartitionStateMap
-   * @param partitionsThrottled
-   * @param intermediatePartitionStateMap
-   * @param rebalanceType
-   * @param cache
+   * Check the status for a single message on throttling at every level (cluster, resource, replica) and set
+   * intermediatePartitionStateMap accordingly for that replica.
+   * @param throttleController                throttle controller object for throttling quota
+   * @param resourceName                      the resource for throttling check
+   * @param partition                         the partition for throttling check
+   * @param messageToThrottle                 the message to throttle
+   * @param messagesThrottled                 messages have been throttled
+   * @param intermediatePartitionStateMap     output for this stage of intermediate map
+   * @param rebalanceType                     the rebalance type to charge quota
+   * @param cache                             cache for controller required metadata
    */
-  private void throttleStateTransitionsForPartition(
-      StateTransitionThrottleController throttleController, String resourceName,
-      Partition partition, CurrentStateOutput currentStateOutput,
-      PartitionStateMap bestPossiblePartitionStateMap, Set<Partition> partitionsThrottled,
+  private void throttleStateTransitionsForReplica(StateTransitionThrottleController throttleController,
+      String resourceName, Partition partition, Message messageToThrottle, Set<Message> messagesThrottled,
       PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType,
       ResourceControllerDataProvider cache) {
-
-    Map<String, String> currentStateMap =
-        currentStateOutput.getCurrentStateMap(resourceName, partition);
-    Map<String, String> bestPossibleMap = bestPossiblePartitionStateMap.getPartitionMap(partition);
-    Set<String> allInstances = new HashSet<>(currentStateMap.keySet());
-    allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, String> intermediateMap = new HashMap<>();
-
     boolean hasReachedThrottlingLimit = false;
     if (throttleController.shouldThrottleForResource(rebalanceType, resourceName)) {
       hasReachedThrottlingLimit = true;
       if (logger.isDebugEnabled()) {
-        LogUtil.logDebug(logger, _eventId,
-            String.format("Throttled on partition: %s in resource: %s",
-                partition.getPartitionName(), resourceName));
+        LogUtil.logDebug(logger, _eventId, String.format(
+            "Throttled because of cluster/resource quota is full for message {%s} on partition {%s} in resource {%s}",
+            messageToThrottle.getId(), partition.getPartitionName(), resourceName));
       }
     } else {
-      // throttle if any of the instances are not able to accept state transitions
-      for (String instance : allInstances) {
-        String currentState = currentStateMap.get(instance);
-        String bestPossibleState = bestPossibleMap.get(instance);
-        if (bestPossibleState != null && !bestPossibleState.equals(currentState)
-            && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
-                .contains(instance)) {
-          if (throttleController.shouldThrottleForInstance(rebalanceType, instance)) {
-            hasReachedThrottlingLimit = true;
-            if (logger.isDebugEnabled()) {
-              LogUtil.logDebug(logger, _eventId,
-                  String.format(
-                      "Throttled because of instance: %s for partition: %s in resource: %s",
-                      instance, partition.getPartitionName(), resourceName));
-            }
-            break;
+      // Since message already generated, we can assume the current state is not null and target state is not null
+      if (!cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+          .contains(messageToThrottle.getTgtName())) {
+        if (throttleController.shouldThrottleForInstance(rebalanceType, messageToThrottle.getTgtName())) {
+          hasReachedThrottlingLimit = true;
+          if (logger.isDebugEnabled()) {
+            LogUtil.logDebug(logger, _eventId, String.format(
+                "Throttled because of instance level quota is full on instance {%s} for message {%s} of partition {%s} in resource {%s}",
+                messageToThrottle.getId(), messageToThrottle.getTgtName(), partition.getPartitionName(), resourceName));
           }
         }
       }
     }
+    // If there is a room to hold existing messages, charge this message.

Review comment:
       Nit: did you mean: "If there is still room for this replica, proceed to charge at the cluster and resource level and set the intermediate partition-state mapping so that the state transition will be generated."
   
   Question: please confirm with me if my understanding is correct: we do not have `chargeReplica()`; instead we incorporate the replica-level throttling by having each message represent a replica.
   
   So the number of calls for `throttleStateTransitionsForPartition()` would be the total number of partitions in the cluster whereas the number of calls for `throttleStateTransitionsForReplica()` would be the total number of replicas in the cluster (because this method will be called for each message), correct?
   
   On an average case, we will have 3x replicas as partitions (because applications usually use 3 as the replication factor). I am curious, are you able to do a quick local benchmark of the `IntermediateStateCalcStage` for before & after for a production-grade cluster? It might help us see if we are unwittingly creating a bottleneck here somewhere.

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -631,104 +631,53 @@ private void chargePendingTransition(Resource resource, CurrentStateOutput curre
   }
 
   /**
-   * Check the status on throttling at every level (cluster, resource, instance) and set
-   * intermediatePartitionStateMap accordingly per partition.
-   * @param throttleController
-   * @param resourceName
-   * @param partition
-   * @param currentStateOutput
-   * @param bestPossiblePartitionStateMap
-   * @param partitionsThrottled
-   * @param intermediatePartitionStateMap
-   * @param rebalanceType
-   * @param cache
+   * Check the status for a single message on throttling at every level (cluster, resource, replica) and set
+   * intermediatePartitionStateMap accordingly for that replica.
+   * @param throttleController                throttle controller object for throttling quota
+   * @param resourceName                      the resource for throttling check
+   * @param partition                         the partition for throttling check
+   * @param messageToThrottle                 the message to throttle
+   * @param messagesThrottled                 messages have been throttled

Review comment:
       Nit: the cumulative set of messages that have been throttled already

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -631,104 +631,53 @@ private void chargePendingTransition(Resource resource, CurrentStateOutput curre
   }
 
   /**
-   * Check the status on throttling at every level (cluster, resource, instance) and set
-   * intermediatePartitionStateMap accordingly per partition.
-   * @param throttleController
-   * @param resourceName
-   * @param partition
-   * @param currentStateOutput
-   * @param bestPossiblePartitionStateMap
-   * @param partitionsThrottled
-   * @param intermediatePartitionStateMap
-   * @param rebalanceType
-   * @param cache
+   * Check the status for a single message on throttling at every level (cluster, resource, replica) and set
+   * intermediatePartitionStateMap accordingly for that replica.
+   * @param throttleController                throttle controller object for throttling quota
+   * @param resourceName                      the resource for throttling check
+   * @param partition                         the partition for throttling check
+   * @param messageToThrottle                 the message to throttle
+   * @param messagesThrottled                 messages have been throttled

Review comment:
       Nit: the cumulative set of messages that have been throttled already. These messages represent the replicas of this partition that have been throttled.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1714: [Replica Level Throttle] Change throttling logic to per message

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1714:
URL: https://github.com/apache/helix/pull/1714#discussion_r622496006



##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -631,104 +631,53 @@ private void chargePendingTransition(Resource resource, CurrentStateOutput curre
   }
 
   /**
-   * Check the status on throttling at every level (cluster, resource, instance) and set
-   * intermediatePartitionStateMap accordingly per partition.
-   * @param throttleController
-   * @param resourceName
-   * @param partition
-   * @param currentStateOutput
-   * @param bestPossiblePartitionStateMap
-   * @param partitionsThrottled
-   * @param intermediatePartitionStateMap
-   * @param rebalanceType
-   * @param cache
+   * Check the status for a single message on throttling at every level (cluster, resource, replica) and set
+   * intermediatePartitionStateMap accordingly for that replica.
+   * @param throttleController                throttle controller object for throttling quota
+   * @param resourceName                      the resource for throttling check
+   * @param partition                         the partition for throttling check
+   * @param messageToThrottle                 the message to throttle
+   * @param messagesThrottled                 messages have been throttled
+   * @param intermediatePartitionStateMap     output for this stage of intermediate map
+   * @param rebalanceType                     the rebalance type to charge quota
+   * @param cache                             cache for controller required metadata
    */
-  private void throttleStateTransitionsForPartition(
-      StateTransitionThrottleController throttleController, String resourceName,
-      Partition partition, CurrentStateOutput currentStateOutput,
-      PartitionStateMap bestPossiblePartitionStateMap, Set<Partition> partitionsThrottled,
+  private void throttleStateTransitionsForReplica(StateTransitionThrottleController throttleController,
+      String resourceName, Partition partition, Message messageToThrottle, Set<Message> messagesThrottled,
       PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType,
       ResourceControllerDataProvider cache) {
-
-    Map<String, String> currentStateMap =
-        currentStateOutput.getCurrentStateMap(resourceName, partition);
-    Map<String, String> bestPossibleMap = bestPossiblePartitionStateMap.getPartitionMap(partition);
-    Set<String> allInstances = new HashSet<>(currentStateMap.keySet());
-    allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, String> intermediateMap = new HashMap<>();
-
     boolean hasReachedThrottlingLimit = false;
     if (throttleController.shouldThrottleForResource(rebalanceType, resourceName)) {
       hasReachedThrottlingLimit = true;
       if (logger.isDebugEnabled()) {
-        LogUtil.logDebug(logger, _eventId,
-            String.format("Throttled on partition: %s in resource: %s",
-                partition.getPartitionName(), resourceName));
+        LogUtil.logDebug(logger, _eventId, String.format(
+            "Throttled because of cluster/resource quota is full for message {%s} on partition {%s} in resource {%s}",
+            messageToThrottle.getId(), partition.getPartitionName(), resourceName));
       }
     } else {
-      // throttle if any of the instances are not able to accept state transitions
-      for (String instance : allInstances) {
-        String currentState = currentStateMap.get(instance);
-        String bestPossibleState = bestPossibleMap.get(instance);
-        if (bestPossibleState != null && !bestPossibleState.equals(currentState)
-            && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
-                .contains(instance)) {
-          if (throttleController.shouldThrottleForInstance(rebalanceType, instance)) {
-            hasReachedThrottlingLimit = true;
-            if (logger.isDebugEnabled()) {
-              LogUtil.logDebug(logger, _eventId,
-                  String.format(
-                      "Throttled because of instance: %s for partition: %s in resource: %s",
-                      instance, partition.getPartitionName(), resourceName));
-            }
-            break;
+      // Since message already generated, we can assume the current state is not null and target state is not null
+      if (!cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+          .contains(messageToThrottle.getTgtName())) {
+        if (throttleController.shouldThrottleForInstance(rebalanceType, messageToThrottle.getTgtName())) {
+          hasReachedThrottlingLimit = true;
+          if (logger.isDebugEnabled()) {
+            LogUtil.logDebug(logger, _eventId, String.format(
+                "Throttled because of instance level quota is full on instance {%s} for message {%s} of partition {%s} in resource {%s}",
+                messageToThrottle.getId(), messageToThrottle.getTgtName(), partition.getPartitionName(), resourceName));
           }
         }
       }
     }
+    // If there is a room to hold existing messages, charge this message.

Review comment:
       @dasahcc 
   
   I agree that if we want to do replica-based throttling, this is something that would be needed considering the way we throttle. 
   
   Have we, though, considered making this optional/configurable? For example, many users of Helix do not require throttling capability at this level of granularity?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] dasahcc commented on a change in pull request #1714: [Replica Level Throttle] Change throttling logic to per message

Posted by GitBox <gi...@apache.org>.
dasahcc commented on a change in pull request #1714:
URL: https://github.com/apache/helix/pull/1714#discussion_r622509404



##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -631,104 +631,53 @@ private void chargePendingTransition(Resource resource, CurrentStateOutput curre
   }
 
   /**
-   * Check the status on throttling at every level (cluster, resource, instance) and set
-   * intermediatePartitionStateMap accordingly per partition.
-   * @param throttleController
-   * @param resourceName
-   * @param partition
-   * @param currentStateOutput
-   * @param bestPossiblePartitionStateMap
-   * @param partitionsThrottled
-   * @param intermediatePartitionStateMap
-   * @param rebalanceType
-   * @param cache
+   * Check the status for a single message on throttling at every level (cluster, resource, replica) and set
+   * intermediatePartitionStateMap accordingly for that replica.
+   * @param throttleController                throttle controller object for throttling quota
+   * @param resourceName                      the resource for throttling check
+   * @param partition                         the partition for throttling check
+   * @param messageToThrottle                 the message to throttle
+   * @param messagesThrottled                 messages have been throttled
+   * @param intermediatePartitionStateMap     output for this stage of intermediate map
+   * @param rebalanceType                     the rebalance type to charge quota
+   * @param cache                             cache for controller required metadata
    */
-  private void throttleStateTransitionsForPartition(
-      StateTransitionThrottleController throttleController, String resourceName,
-      Partition partition, CurrentStateOutput currentStateOutput,
-      PartitionStateMap bestPossiblePartitionStateMap, Set<Partition> partitionsThrottled,
+  private void throttleStateTransitionsForReplica(StateTransitionThrottleController throttleController,
+      String resourceName, Partition partition, Message messageToThrottle, Set<Message> messagesThrottled,
       PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType,
       ResourceControllerDataProvider cache) {
-
-    Map<String, String> currentStateMap =
-        currentStateOutput.getCurrentStateMap(resourceName, partition);
-    Map<String, String> bestPossibleMap = bestPossiblePartitionStateMap.getPartitionMap(partition);
-    Set<String> allInstances = new HashSet<>(currentStateMap.keySet());
-    allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, String> intermediateMap = new HashMap<>();
-
     boolean hasReachedThrottlingLimit = false;
     if (throttleController.shouldThrottleForResource(rebalanceType, resourceName)) {
       hasReachedThrottlingLimit = true;
       if (logger.isDebugEnabled()) {
-        LogUtil.logDebug(logger, _eventId,
-            String.format("Throttled on partition: %s in resource: %s",
-                partition.getPartitionName(), resourceName));
+        LogUtil.logDebug(logger, _eventId, String.format(
+            "Throttled because of cluster/resource quota is full for message {%s} on partition {%s} in resource {%s}",
+            messageToThrottle.getId(), partition.getPartitionName(), resourceName));
       }
     } else {
-      // throttle if any of the instances are not able to accept state transitions
-      for (String instance : allInstances) {
-        String currentState = currentStateMap.get(instance);
-        String bestPossibleState = bestPossibleMap.get(instance);
-        if (bestPossibleState != null && !bestPossibleState.equals(currentState)
-            && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
-                .contains(instance)) {
-          if (throttleController.shouldThrottleForInstance(rebalanceType, instance)) {
-            hasReachedThrottlingLimit = true;
-            if (logger.isDebugEnabled()) {
-              LogUtil.logDebug(logger, _eventId,
-                  String.format(
-                      "Throttled because of instance: %s for partition: %s in resource: %s",
-                      instance, partition.getPartitionName(), resourceName));
-            }
-            break;
+      // Since message already generated, we can assume the current state is not null and target state is not null
+      if (!cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+          .contains(messageToThrottle.getTgtName())) {
+        if (throttleController.shouldThrottleForInstance(rebalanceType, messageToThrottle.getTgtName())) {
+          hasReachedThrottlingLimit = true;
+          if (logger.isDebugEnabled()) {
+            LogUtil.logDebug(logger, _eventId, String.format(
+                "Throttled because of instance level quota is full on instance {%s} for message {%s} of partition {%s} in resource {%s}",
+                messageToThrottle.getId(), messageToThrottle.getTgtName(), partition.getPartitionName(), resourceName));
           }
         }
       }
     }
+    // If there is a room to hold existing messages, charge this message.

Review comment:
       That's a good suggestion! If user does not enable the throttling, we can skip this stage even. I will add this to following PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org