You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2021/04/28 20:29:41 UTC

[GitHub] [helix] dasahcc opened a new pull request #1719: [Replica Level Throttle] Apply per replica level throttling logic to Recovery/Load Rebalance and PendingMessage

dasahcc opened a new pull request #1719:
URL: https://github.com/apache/helix/pull/1719


   ### Issues
   
   - [x] My PR addresses the following Helix issues and references them in the PR description:
   
   resolves #343 
   
   ### Description
   
   - [X] Here are some details about my PR, including screenshots of any UI changes:
   
   This commit contains change of:
   1. Recovery/Load Rebalance and pending messages charge adapting per replica throttling.
   2. Change partition level comparator based on 1) missed top states, 2) missed active replicas and 3) ideal state matching or not.
   
   For global change contexts please refer: #1713
   
   ### Tests
   
   Split PRs. The branch will not able to compile or test until finishes code change.
   Will have tests files and testing result in last several PRs.
   
   - [ ] The following tests are written for this issue:
   
   
   - The following is the result of the "mvn test" command on the appropriate module:
   
   ### Documentation (Optional)
   
   
   (Link the GitHub wiki you added)
   
   ### Commits
   
   - My commits all reference appropriate Apache Helix GitHub issues in their subject lines. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Code Quality
   
   - My diff has been formatted using helix-style.xml 
   (helix-style-intellij.xml if IntelliJ IDE is used)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] dasahcc merged pull request #1719: [Replica Level Throttle] Apply per replica level throttling logic to Recovery/Load Rebalance and PendingMessage

Posted by GitBox <gi...@apache.org>.
dasahcc merged pull request #1719:
URL: https://github.com/apache/helix/pull/1719


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1719: [Replica Level Throttle] Apply per replica level throttling logic to Recovery/Load Rebalance and PendingMessage

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1719:
URL: https://github.com/apache/helix/pull/1719#discussion_r624563961



##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -423,211 +421,112 @@ private PartitionStateMap computeIntermediatePartitionState(ResourceControllerDa
   }
 
   /**
-   * Check for a partition, whether all transitions for its replicas are downward transitions. Note
-   * that this function does NOT check for ERROR states.
-   * @param currentStateMap
-   * @param bestPossibleMap
-   * @param stateModelDef
-   * @return true if there are; false otherwise
+   * Determine the message is downward message or not  Not that this function does NOT check for ERROR states.

Review comment:
       Nit: fix this sentence?

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -423,211 +421,112 @@ private PartitionStateMap computeIntermediatePartitionState(ResourceControllerDa
   }
 
   /**
-   * Check for a partition, whether all transitions for its replicas are downward transitions. Note
-   * that this function does NOT check for ERROR states.
-   * @param currentStateMap
-   * @param bestPossibleMap
-   * @param stateModelDef
-   * @return true if there are; false otherwise
+   * Determine the message is downward message or not  Not that this function does NOT check for ERROR states.
+   * @param message                  message for load rebalance
+   * @param stateModelDefinition     state model definition object for this resource
+   * @return                         set of messages are allowed for downward state transitions

Review comment:
       nit: remove "are"

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -423,211 +421,112 @@ private PartitionStateMap computeIntermediatePartitionState(ResourceControllerDa
   }
 
   /**
-   * Check for a partition, whether all transitions for its replicas are downward transitions. Note
-   * that this function does NOT check for ERROR states.
-   * @param currentStateMap
-   * @param bestPossibleMap
-   * @param stateModelDef
-   * @return true if there are; false otherwise
+   * Determine the message is downward message or not  Not that this function does NOT check for ERROR states.
+   * @param message                  message for load rebalance
+   * @param stateModelDefinition     state model definition object for this resource
+   * @return                         set of messages are allowed for downward state transitions
    */
-  private boolean isLoadBalanceDownwardForAllReplicas(Map<String, String> currentStateMap,
-      Map<String, String> bestPossibleMap, StateModelDefinition stateModelDef) {
-    Set<String> allInstances = new HashSet<>();
-    allInstances.addAll(currentStateMap.keySet());
-    allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, Integer> statePriorityMap = stateModelDef.getStatePriorityMap();
-
-    for (String instance : allInstances) {
-      String currentState = currentStateMap.get(instance);
-      String bestPossibleState = bestPossibleMap.get(instance);
-      if (currentState == null) {
-        return false; // null -> state is upward
-      }
-      if (bestPossibleState != null) {
-        // Compare priority values and return if an upward transition is found
-        // Note that lower integer value implies higher priority
-        if (!statePriorityMap.containsKey(currentState)
-            || !statePriorityMap.containsKey(bestPossibleState)) {
-          // If the state is not found in statePriorityMap, consider it not strictly downward by
-          // default because we can't determine whether it is downward
-          return false;
-        }
-        if (statePriorityMap.get(currentState) > statePriorityMap.get(bestPossibleState)) {
-          return false;
-        }
-      }
-    }
-    return true;
+  private boolean isLoadBalanceDownwardStateTransition(Message message, StateModelDefinition stateModelDefinition) {
+    // state model definition is not found
+    if (stateModelDefinition == null) {
+      return false;
+    }
+
+    Map<String, Integer> statePriorityMap = stateModelDefinition.getStatePriorityMap();
+    // Compare priority values and return if an upward transition is found
+    // Note that lower integer value implies higher priority
+    // If the state is not found in statePriorityMap, consider it not strictly downward by
+    // default because we can't determine whether it is downward
+    if (statePriorityMap.containsKey(message.getFromState()) && statePriorityMap.containsKey(message.getToState())
+        && statePriorityMap.get(message.getFromState()) > statePriorityMap.get(message.getToState())) {
+      return true;
+    }
+    return false;
   }
 
   /**
    * Check and charge all pending transitions for throttling.
    */
   private void chargePendingTransition(Resource resource, CurrentStateOutput currentStateOutput,
-      StateTransitionThrottleController throttleController, Set<Partition> partitionsNeedRecovery,
-      Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider cache,
-      PartitionStateMap bestPossiblePartitionStateMap,
+      StateTransitionThrottleController throttleController, ResourceControllerDataProvider cache,
+      Map<String, List<String>> preferenceLists, StateModelDefinition stateModelDefinition,
       PartitionStateMap intermediatePartitionStateMap) {
     String resourceName = resource.getResourceName();
-
     // check and charge pending transitions
     for (Partition partition : resource.getPartitions()) {
+      // Custom mode does not apply recovery/load reblance since they are independent on each partition's replica number.
+      Map<String, Integer> requiredStates =
+          getRequiredStates(resourceName, cache, preferenceLists.get(partition.getPartitionName()));

Review comment:
       Nit: typo - "reblance"
   
   Also, this comment is a little confusing - where in the code do we check if the current rebalancer is a custom rebalancer here? And what do you mean by they are independent on each partition's replica number?

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -423,211 +421,112 @@ private PartitionStateMap computeIntermediatePartitionState(ResourceControllerDa
   }
 
   /**
-   * Check for a partition, whether all transitions for its replicas are downward transitions. Note
-   * that this function does NOT check for ERROR states.
-   * @param currentStateMap
-   * @param bestPossibleMap
-   * @param stateModelDef
-   * @return true if there are; false otherwise
+   * Determine the message is downward message or not  Not that this function does NOT check for ERROR states.
+   * @param message                  message for load rebalance
+   * @param stateModelDefinition     state model definition object for this resource
+   * @return                         set of messages are allowed for downward state transitions
    */
-  private boolean isLoadBalanceDownwardForAllReplicas(Map<String, String> currentStateMap,
-      Map<String, String> bestPossibleMap, StateModelDefinition stateModelDef) {
-    Set<String> allInstances = new HashSet<>();
-    allInstances.addAll(currentStateMap.keySet());
-    allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, Integer> statePriorityMap = stateModelDef.getStatePriorityMap();
-
-    for (String instance : allInstances) {
-      String currentState = currentStateMap.get(instance);
-      String bestPossibleState = bestPossibleMap.get(instance);
-      if (currentState == null) {
-        return false; // null -> state is upward
-      }
-      if (bestPossibleState != null) {
-        // Compare priority values and return if an upward transition is found
-        // Note that lower integer value implies higher priority
-        if (!statePriorityMap.containsKey(currentState)
-            || !statePriorityMap.containsKey(bestPossibleState)) {
-          // If the state is not found in statePriorityMap, consider it not strictly downward by
-          // default because we can't determine whether it is downward
-          return false;
-        }
-        if (statePriorityMap.get(currentState) > statePriorityMap.get(bestPossibleState)) {
-          return false;
-        }
-      }
-    }
-    return true;
+  private boolean isLoadBalanceDownwardStateTransition(Message message, StateModelDefinition stateModelDefinition) {
+    // state model definition is not found
+    if (stateModelDefinition == null) {
+      return false;
+    }
+
+    Map<String, Integer> statePriorityMap = stateModelDefinition.getStatePriorityMap();
+    // Compare priority values and return if an upward transition is found
+    // Note that lower integer value implies higher priority
+    // If the state is not found in statePriorityMap, consider it not strictly downward by
+    // default because we can't determine whether it is downward
+    if (statePriorityMap.containsKey(message.getFromState()) && statePriorityMap.containsKey(message.getToState())
+        && statePriorityMap.get(message.getFromState()) > statePriorityMap.get(message.getToState())) {
+      return true;
+    }
+    return false;
   }
 
   /**
    * Check and charge all pending transitions for throttling.
    */
   private void chargePendingTransition(Resource resource, CurrentStateOutput currentStateOutput,
-      StateTransitionThrottleController throttleController, Set<Partition> partitionsNeedRecovery,
-      Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider cache,
-      PartitionStateMap bestPossiblePartitionStateMap,
+      StateTransitionThrottleController throttleController, ResourceControllerDataProvider cache,
+      Map<String, List<String>> preferenceLists, StateModelDefinition stateModelDefinition,
       PartitionStateMap intermediatePartitionStateMap) {
     String resourceName = resource.getResourceName();
-
     // check and charge pending transitions
     for (Partition partition : resource.getPartitions()) {
+      // Custom mode does not apply recovery/load reblance since they are independent on each partition's replica number.
+      Map<String, Integer> requiredStates =
+          getRequiredStates(resourceName, cache, preferenceLists.get(partition.getPartitionName()));
       // Maps instance to its current state
-      Map<String, String> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resourceName, partition);
+      Map<String, String> currentStateMap = currentStateOutput.getCurrentStateMap(resourceName, partition);
       // Maps instance to its pending (next) state
-      Map<String, String> pendingMap =
-          currentStateOutput.getPendingStateMap(resourceName, partition);
-
-      StateTransitionThrottleConfig.RebalanceType rebalanceType = RebalanceType.NONE;
-      if (partitionsNeedRecovery.contains(partition)) {
-        rebalanceType = StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE;
-      } else if (partitionsNeedLoadbalance.contains(partition)) {
-        rebalanceType = StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE;
-      }
-
-      if (pendingMap.size() > 0) {
-        boolean shouldChargePartition = false;
-        for (String instance : pendingMap.keySet()) {
-          String currentState = currentStateMap.get(instance);
-          String pendingState = pendingMap.get(instance);
-          if (pendingState != null && !pendingState.equals(currentState)
-              && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
-              .contains(instance)) {
-            // Only charge this instance if the partition is not disabled
-            throttleController.chargeInstance(rebalanceType, instance);
-            shouldChargePartition = true;
-            // If there is a pending state transition for the partition, that means that an assignment
-            // has already been made and the state transition message has already been sent out for the partition
-            // in a previous pipeline run. We must honor this and reflect it by charging for the pending state transition message.
-
-            // Since the assignment has already been made for the pending message, we do a special treatment
-            // for it by setting the best possible state directly in intermediatePartitionStateMap so that the pending
-            // message won't be double-assigned or double-charged in recovery or load balance.
-            handlePendingStateTransitionsForThrottling(partition, partitionsNeedRecovery,
-                partitionsNeedLoadbalance, rebalanceType, bestPossiblePartitionStateMap,
-                intermediatePartitionStateMap);
-          }
-        }
-        if (shouldChargePartition) {
-          throttleController.chargeCluster(rebalanceType);
+      List<Message> pendingMessages =
+          new ArrayList<>(currentStateOutput.getPendingMessageMap(resourceName, partition).values());
+      Collections.sort(pendingMessages, new MessagePriorityComparator(preferenceLists.get(partition.getPartitionName()),
+          stateModelDefinition.getStatePriorityMap()));
+
+      for (Message message : pendingMessages) {
+        StateTransitionThrottleConfig.RebalanceType rebalanceType =
+            getRebalanceTypePerMessage(requiredStates, message, currentStateMap);
+        String currentState = currentStateMap.get(message.getTgtName());
+        if (!message.getToState().equals(currentState) && message.getFromState().equals(currentState)
+            && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+            .contains(message.getTgtName())) {
+          throttleController.chargeInstance(rebalanceType, message.getTgtName());
           throttleController.chargeResource(rebalanceType, resourceName);
+          throttleController.chargeCluster(rebalanceType);
         }
+        intermediatePartitionStateMap.setState(partition, message.getTgtName(), message.getToState());
       }
     }
   }
 
   /**
-   * Sort partitions according to partition priority {@link PartitionPriorityComparator}, and for
-   * each partition, throttle state transitions if needed. Also populate
-   * intermediatePartitionStateMap either with BestPossibleState (if no throttling is necessary) or
-   * CurrentState (if throttled).
-   * @param resource
-   * @param bestPossiblePartitionStateMap
-   * @param throttleController
-   * @param intermediatePartitionStateMap
-   * @param partitionsNeedRecovery
-   * @param currentStateOutput
-   * @param topState
-   * @param cache
-   * @return a set of partitions that need recovery but did not get recovered due to throttling
+   * Thin wrapper for per message throttling with recovery rebalance type. Also populate
+   * intermediatePartitionStateMap with generated messages from {@link MessageGenerationPhase}.
+   * @param resource                      the resource to throttle
+   * @param throttleController            throttle controller object
+   * @param messageToThrottle             the message to be throttle
+   * @param intermediatePartitionStateMap output result for this stage that intemediate state map
+   * @param cache                         cache object for computational metadata from external storage
+   * @param messagesThrottled             messages are already throttled

Review comment:
       "messages that have already been throttled"

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -423,211 +421,112 @@ private PartitionStateMap computeIntermediatePartitionState(ResourceControllerDa
   }
 
   /**
-   * Check for a partition, whether all transitions for its replicas are downward transitions. Note
-   * that this function does NOT check for ERROR states.
-   * @param currentStateMap
-   * @param bestPossibleMap
-   * @param stateModelDef
-   * @return true if there are; false otherwise
+   * Determine the message is downward message or not  Not that this function does NOT check for ERROR states.
+   * @param message                  message for load rebalance
+   * @param stateModelDefinition     state model definition object for this resource
+   * @return                         set of messages are allowed for downward state transitions
    */
-  private boolean isLoadBalanceDownwardForAllReplicas(Map<String, String> currentStateMap,
-      Map<String, String> bestPossibleMap, StateModelDefinition stateModelDef) {
-    Set<String> allInstances = new HashSet<>();
-    allInstances.addAll(currentStateMap.keySet());
-    allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, Integer> statePriorityMap = stateModelDef.getStatePriorityMap();
-
-    for (String instance : allInstances) {
-      String currentState = currentStateMap.get(instance);
-      String bestPossibleState = bestPossibleMap.get(instance);
-      if (currentState == null) {
-        return false; // null -> state is upward
-      }
-      if (bestPossibleState != null) {
-        // Compare priority values and return if an upward transition is found
-        // Note that lower integer value implies higher priority
-        if (!statePriorityMap.containsKey(currentState)
-            || !statePriorityMap.containsKey(bestPossibleState)) {
-          // If the state is not found in statePriorityMap, consider it not strictly downward by
-          // default because we can't determine whether it is downward
-          return false;
-        }
-        if (statePriorityMap.get(currentState) > statePriorityMap.get(bestPossibleState)) {
-          return false;
-        }
-      }
-    }
-    return true;
+  private boolean isLoadBalanceDownwardStateTransition(Message message, StateModelDefinition stateModelDefinition) {
+    // state model definition is not found
+    if (stateModelDefinition == null) {
+      return false;
+    }
+
+    Map<String, Integer> statePriorityMap = stateModelDefinition.getStatePriorityMap();
+    // Compare priority values and return if an upward transition is found
+    // Note that lower integer value implies higher priority
+    // If the state is not found in statePriorityMap, consider it not strictly downward by
+    // default because we can't determine whether it is downward
+    if (statePriorityMap.containsKey(message.getFromState()) && statePriorityMap.containsKey(message.getToState())
+        && statePriorityMap.get(message.getFromState()) > statePriorityMap.get(message.getToState())) {
+      return true;
+    }
+    return false;
   }
 
   /**
    * Check and charge all pending transitions for throttling.
    */
   private void chargePendingTransition(Resource resource, CurrentStateOutput currentStateOutput,
-      StateTransitionThrottleController throttleController, Set<Partition> partitionsNeedRecovery,
-      Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider cache,
-      PartitionStateMap bestPossiblePartitionStateMap,
+      StateTransitionThrottleController throttleController, ResourceControllerDataProvider cache,
+      Map<String, List<String>> preferenceLists, StateModelDefinition stateModelDefinition,
       PartitionStateMap intermediatePartitionStateMap) {
     String resourceName = resource.getResourceName();
-
     // check and charge pending transitions
     for (Partition partition : resource.getPartitions()) {
+      // Custom mode does not apply recovery/load reblance since they are independent on each partition's replica number.
+      Map<String, Integer> requiredStates =
+          getRequiredStates(resourceName, cache, preferenceLists.get(partition.getPartitionName()));
       // Maps instance to its current state
-      Map<String, String> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resourceName, partition);
+      Map<String, String> currentStateMap = currentStateOutput.getCurrentStateMap(resourceName, partition);
       // Maps instance to its pending (next) state
-      Map<String, String> pendingMap =
-          currentStateOutput.getPendingStateMap(resourceName, partition);
-
-      StateTransitionThrottleConfig.RebalanceType rebalanceType = RebalanceType.NONE;
-      if (partitionsNeedRecovery.contains(partition)) {
-        rebalanceType = StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE;
-      } else if (partitionsNeedLoadbalance.contains(partition)) {
-        rebalanceType = StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE;
-      }
-
-      if (pendingMap.size() > 0) {
-        boolean shouldChargePartition = false;
-        for (String instance : pendingMap.keySet()) {
-          String currentState = currentStateMap.get(instance);
-          String pendingState = pendingMap.get(instance);
-          if (pendingState != null && !pendingState.equals(currentState)
-              && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
-              .contains(instance)) {
-            // Only charge this instance if the partition is not disabled
-            throttleController.chargeInstance(rebalanceType, instance);
-            shouldChargePartition = true;
-            // If there is a pending state transition for the partition, that means that an assignment
-            // has already been made and the state transition message has already been sent out for the partition
-            // in a previous pipeline run. We must honor this and reflect it by charging for the pending state transition message.
-
-            // Since the assignment has already been made for the pending message, we do a special treatment
-            // for it by setting the best possible state directly in intermediatePartitionStateMap so that the pending
-            // message won't be double-assigned or double-charged in recovery or load balance.
-            handlePendingStateTransitionsForThrottling(partition, partitionsNeedRecovery,
-                partitionsNeedLoadbalance, rebalanceType, bestPossiblePartitionStateMap,
-                intermediatePartitionStateMap);
-          }
-        }
-        if (shouldChargePartition) {
-          throttleController.chargeCluster(rebalanceType);
+      List<Message> pendingMessages =
+          new ArrayList<>(currentStateOutput.getPendingMessageMap(resourceName, partition).values());
+      Collections.sort(pendingMessages, new MessagePriorityComparator(preferenceLists.get(partition.getPartitionName()),
+          stateModelDefinition.getStatePriorityMap()));
+
+      for (Message message : pendingMessages) {
+        StateTransitionThrottleConfig.RebalanceType rebalanceType =
+            getRebalanceTypePerMessage(requiredStates, message, currentStateMap);
+        String currentState = currentStateMap.get(message.getTgtName());
+        if (!message.getToState().equals(currentState) && message.getFromState().equals(currentState)
+            && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+            .contains(message.getTgtName())) {
+          throttleController.chargeInstance(rebalanceType, message.getTgtName());
           throttleController.chargeResource(rebalanceType, resourceName);
+          throttleController.chargeCluster(rebalanceType);
         }
+        intermediatePartitionStateMap.setState(partition, message.getTgtName(), message.getToState());
       }
     }
   }
 
   /**
-   * Sort partitions according to partition priority {@link PartitionPriorityComparator}, and for
-   * each partition, throttle state transitions if needed. Also populate
-   * intermediatePartitionStateMap either with BestPossibleState (if no throttling is necessary) or
-   * CurrentState (if throttled).
-   * @param resource
-   * @param bestPossiblePartitionStateMap
-   * @param throttleController
-   * @param intermediatePartitionStateMap
-   * @param partitionsNeedRecovery
-   * @param currentStateOutput
-   * @param topState
-   * @param cache
-   * @return a set of partitions that need recovery but did not get recovered due to throttling
+   * Thin wrapper for per message throttling with recovery rebalance type. Also populate
+   * intermediatePartitionStateMap with generated messages from {@link MessageGenerationPhase}.
+   * @param resource                      the resource to throttle
+   * @param throttleController            throttle controller object
+   * @param messageToThrottle             the message to be throttle

Review comment:
       "throttled"?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] dasahcc commented on a change in pull request #1719: [Replica Level Throttle] Apply per replica level throttling logic to Recovery/Load Rebalance and PendingMessage

Posted by GitBox <gi...@apache.org>.
dasahcc commented on a change in pull request #1719:
URL: https://github.com/apache/helix/pull/1719#discussion_r625266316



##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -423,211 +421,112 @@ private PartitionStateMap computeIntermediatePartitionState(ResourceControllerDa
   }
 
   /**
-   * Check for a partition, whether all transitions for its replicas are downward transitions. Note
-   * that this function does NOT check for ERROR states.
-   * @param currentStateMap
-   * @param bestPossibleMap
-   * @param stateModelDef
-   * @return true if there are; false otherwise
+   * Determine the message is downward message or not  Not that this function does NOT check for ERROR states.
+   * @param message                  message for load rebalance
+   * @param stateModelDefinition     state model definition object for this resource
+   * @return                         set of messages are allowed for downward state transitions
    */
-  private boolean isLoadBalanceDownwardForAllReplicas(Map<String, String> currentStateMap,
-      Map<String, String> bestPossibleMap, StateModelDefinition stateModelDef) {
-    Set<String> allInstances = new HashSet<>();
-    allInstances.addAll(currentStateMap.keySet());
-    allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, Integer> statePriorityMap = stateModelDef.getStatePriorityMap();
-
-    for (String instance : allInstances) {
-      String currentState = currentStateMap.get(instance);
-      String bestPossibleState = bestPossibleMap.get(instance);
-      if (currentState == null) {
-        return false; // null -> state is upward
-      }
-      if (bestPossibleState != null) {
-        // Compare priority values and return if an upward transition is found
-        // Note that lower integer value implies higher priority
-        if (!statePriorityMap.containsKey(currentState)
-            || !statePriorityMap.containsKey(bestPossibleState)) {
-          // If the state is not found in statePriorityMap, consider it not strictly downward by
-          // default because we can't determine whether it is downward
-          return false;
-        }
-        if (statePriorityMap.get(currentState) > statePriorityMap.get(bestPossibleState)) {
-          return false;
-        }
-      }
-    }
-    return true;
+  private boolean isLoadBalanceDownwardStateTransition(Message message, StateModelDefinition stateModelDefinition) {
+    // state model definition is not found
+    if (stateModelDefinition == null) {
+      return false;
+    }
+
+    Map<String, Integer> statePriorityMap = stateModelDefinition.getStatePriorityMap();
+    // Compare priority values and return if an upward transition is found
+    // Note that lower integer value implies higher priority
+    // If the state is not found in statePriorityMap, consider it not strictly downward by
+    // default because we can't determine whether it is downward
+    if (statePriorityMap.containsKey(message.getFromState()) && statePriorityMap.containsKey(message.getToState())
+        && statePriorityMap.get(message.getFromState()) > statePriorityMap.get(message.getToState())) {
+      return true;
+    }
+    return false;
   }
 
   /**
    * Check and charge all pending transitions for throttling.
    */
   private void chargePendingTransition(Resource resource, CurrentStateOutput currentStateOutput,
-      StateTransitionThrottleController throttleController, Set<Partition> partitionsNeedRecovery,
-      Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider cache,
-      PartitionStateMap bestPossiblePartitionStateMap,
+      StateTransitionThrottleController throttleController, ResourceControllerDataProvider cache,
+      Map<String, List<String>> preferenceLists, StateModelDefinition stateModelDefinition,
       PartitionStateMap intermediatePartitionStateMap) {
     String resourceName = resource.getResourceName();
-
     // check and charge pending transitions
     for (Partition partition : resource.getPartitions()) {
+      // Custom mode does not apply recovery/load reblance since they are independent on each partition's replica number.
+      Map<String, Integer> requiredStates =
+          getRequiredStates(resourceName, cache, preferenceLists.get(partition.getPartitionName()));

Review comment:
       Yes. Let me fix this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on pull request #1719: [Replica Level Throttle] Apply per replica level throttling logic to Recovery/Load Rebalance and PendingMessage

Posted by GitBox <gi...@apache.org>.
narendly commented on pull request #1719:
URL: https://github.com/apache/helix/pull/1719#issuecomment-830697879


   A general question - why is it necessary to remove throttled messages one by one from `resourceStateMap`? I think if we could explain that a little further in the code as a comment, that would be helpful.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] dasahcc commented on pull request #1719: [Replica Level Throttle] Apply per replica level throttling logic to Recovery/Load Rebalance and PendingMessage

Posted by GitBox <gi...@apache.org>.
dasahcc commented on pull request #1719:
URL: https://github.com/apache/helix/pull/1719#issuecomment-832277027


   @narendly  Could you please take another round of look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1719: [Replica Level Throttle] Apply per replica level throttling logic to Recovery/Load Rebalance and PendingMessage

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1719:
URL: https://github.com/apache/helix/pull/1719#discussion_r627526808



##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -423,211 +421,113 @@ private PartitionStateMap computeIntermediatePartitionState(ResourceControllerDa
   }
 
   /**
-   * Check for a partition, whether all transitions for its replicas are downward transitions. Note
-   * that this function does NOT check for ERROR states.
-   * @param currentStateMap
-   * @param bestPossibleMap
-   * @param stateModelDef
-   * @return true if there are; false otherwise
+   * Determine the message is downward message or not.
+   * @param message                  message for load rebalance
+   * @param stateModelDefinition     state model definition object for this resource
+   * @return                         set of messages allowed for downward state transitions
    */
-  private boolean isLoadBalanceDownwardForAllReplicas(Map<String, String> currentStateMap,
-      Map<String, String> bestPossibleMap, StateModelDefinition stateModelDef) {
-    Set<String> allInstances = new HashSet<>();
-    allInstances.addAll(currentStateMap.keySet());
-    allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, Integer> statePriorityMap = stateModelDef.getStatePriorityMap();
-
-    for (String instance : allInstances) {
-      String currentState = currentStateMap.get(instance);
-      String bestPossibleState = bestPossibleMap.get(instance);
-      if (currentState == null) {
-        return false; // null -> state is upward
-      }
-      if (bestPossibleState != null) {
-        // Compare priority values and return if an upward transition is found
-        // Note that lower integer value implies higher priority
-        if (!statePriorityMap.containsKey(currentState)
-            || !statePriorityMap.containsKey(bestPossibleState)) {
-          // If the state is not found in statePriorityMap, consider it not strictly downward by
-          // default because we can't determine whether it is downward
-          return false;
-        }
-        if (statePriorityMap.get(currentState) > statePriorityMap.get(bestPossibleState)) {
-          return false;
-        }
-      }
-    }
-    return true;
+  private boolean isLoadBalanceDownwardStateTransition(Message message, StateModelDefinition stateModelDefinition) {
+    // state model definition is not found
+    if (stateModelDefinition == null) {
+      return false;
+    }
+
+    Map<String, Integer> statePriorityMap = stateModelDefinition.getStatePriorityMap();
+    // Compare priority values and return if an upward transition is found
+    // Note that lower integer value implies higher priority
+    // If the state is not found in statePriorityMap, consider it not strictly downward by
+    // default because we can't determine whether it is downward
+    if (statePriorityMap.containsKey(message.getFromState()) && statePriorityMap.containsKey(message.getToState())
+        && statePriorityMap.get(message.getFromState()) > statePriorityMap.get(message.getToState())) {
+      return true;
+    }
+    return false;
   }
 
   /**
    * Check and charge all pending transitions for throttling.
    */
   private void chargePendingTransition(Resource resource, CurrentStateOutput currentStateOutput,
-      StateTransitionThrottleController throttleController, Set<Partition> partitionsNeedRecovery,
-      Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider cache,
-      PartitionStateMap bestPossiblePartitionStateMap,
+      StateTransitionThrottleController throttleController, ResourceControllerDataProvider cache,
+      Map<String, List<String>> preferenceLists, StateModelDefinition stateModelDefinition,
       PartitionStateMap intermediatePartitionStateMap) {
     String resourceName = resource.getResourceName();
-
     // check and charge pending transitions
     for (Partition partition : resource.getPartitions()) {
+      // Custom mode does not apply recovery/load rebalance since user can define different number of replicas for
+      // different partitions.

Review comment:
       There's still part of my previous comment that's not addressed: 
   To the reader it's a little confusing what Custom Rebalance mode has anything to do with this particular line of code - we are checking and charging pending transitions, but why are we discussing Custom mode here? I don't see here that we are specifically checking whether it's custom mode. Like:
   
   ```
   if (customMode) {
       return;
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #1719: [Replica Level Throttle] Apply per replica level throttling logic to Recovery/Load Rebalance and PendingMessage

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #1719:
URL: https://github.com/apache/helix/pull/1719#discussion_r627691571



##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -423,211 +421,113 @@ private PartitionStateMap computeIntermediatePartitionState(ResourceControllerDa
   }
 
   /**
-   * Check for a partition, whether all transitions for its replicas are downward transitions. Note
-   * that this function does NOT check for ERROR states.
-   * @param currentStateMap
-   * @param bestPossibleMap
-   * @param stateModelDef
-   * @return true if there are; false otherwise
+   * Determine the message is downward message or not.
+   * @param message                  message for load rebalance
+   * @param stateModelDefinition     state model definition object for this resource
+   * @return                         set of messages allowed for downward state transitions
    */
-  private boolean isLoadBalanceDownwardForAllReplicas(Map<String, String> currentStateMap,
-      Map<String, String> bestPossibleMap, StateModelDefinition stateModelDef) {
-    Set<String> allInstances = new HashSet<>();
-    allInstances.addAll(currentStateMap.keySet());
-    allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, Integer> statePriorityMap = stateModelDef.getStatePriorityMap();
-
-    for (String instance : allInstances) {
-      String currentState = currentStateMap.get(instance);
-      String bestPossibleState = bestPossibleMap.get(instance);
-      if (currentState == null) {
-        return false; // null -> state is upward
-      }
-      if (bestPossibleState != null) {
-        // Compare priority values and return if an upward transition is found
-        // Note that lower integer value implies higher priority
-        if (!statePriorityMap.containsKey(currentState)
-            || !statePriorityMap.containsKey(bestPossibleState)) {
-          // If the state is not found in statePriorityMap, consider it not strictly downward by
-          // default because we can't determine whether it is downward
-          return false;
-        }
-        if (statePriorityMap.get(currentState) > statePriorityMap.get(bestPossibleState)) {
-          return false;
-        }
-      }
-    }
-    return true;
+  private boolean isLoadBalanceDownwardStateTransition(Message message, StateModelDefinition stateModelDefinition) {
+    // state model definition is not found
+    if (stateModelDefinition == null) {
+      return false;
+    }
+
+    Map<String, Integer> statePriorityMap = stateModelDefinition.getStatePriorityMap();
+    // Compare priority values and return if an upward transition is found
+    // Note that lower integer value implies higher priority
+    // If the state is not found in statePriorityMap, consider it not strictly downward by
+    // default because we can't determine whether it is downward
+    if (statePriorityMap.containsKey(message.getFromState()) && statePriorityMap.containsKey(message.getToState())
+        && statePriorityMap.get(message.getFromState()) > statePriorityMap.get(message.getToState())) {
+      return true;
+    }
+    return false;
   }
 
   /**
    * Check and charge all pending transitions for throttling.
    */
   private void chargePendingTransition(Resource resource, CurrentStateOutput currentStateOutput,
-      StateTransitionThrottleController throttleController, Set<Partition> partitionsNeedRecovery,
-      Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider cache,
-      PartitionStateMap bestPossiblePartitionStateMap,
+      StateTransitionThrottleController throttleController, ResourceControllerDataProvider cache,
+      Map<String, List<String>> preferenceLists, StateModelDefinition stateModelDefinition,
       PartitionStateMap intermediatePartitionStateMap) {
     String resourceName = resource.getResourceName();
-
     // check and charge pending transitions
     for (Partition partition : resource.getPartitions()) {
+      // Custom mode does not apply recovery/load rebalance since user can define different number of replicas for
+      // different partitions.

Review comment:
       Could you add this to the comment above? That would reduce confusion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] dasahcc commented on pull request #1719: [Replica Level Throttle] Apply per replica level throttling logic to Recovery/Load Rebalance and PendingMessage

Posted by GitBox <gi...@apache.org>.
dasahcc commented on pull request #1719:
URL: https://github.com/apache/helix/pull/1719#issuecomment-831435946


   > A general question - why is it necessary to remove throttled messages one by one from `resourceStateMap`? I think if we could explain that a little further in the code as a comment, that would be helpful.
   
   Do you mean remove messages from resourceMessageMap? They are the output from message generation and will be dispatched by the following stage. If the message has been throttled, we need to remove it. I will comment it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] dasahcc commented on a change in pull request #1719: [Replica Level Throttle] Apply per replica level throttling logic to Recovery/Load Rebalance and PendingMessage

Posted by GitBox <gi...@apache.org>.
dasahcc commented on a change in pull request #1719:
URL: https://github.com/apache/helix/pull/1719#discussion_r627647596



##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -423,211 +421,113 @@ private PartitionStateMap computeIntermediatePartitionState(ResourceControllerDa
   }
 
   /**
-   * Check for a partition, whether all transitions for its replicas are downward transitions. Note
-   * that this function does NOT check for ERROR states.
-   * @param currentStateMap
-   * @param bestPossibleMap
-   * @param stateModelDef
-   * @return true if there are; false otherwise
+   * Determine the message is downward message or not.
+   * @param message                  message for load rebalance
+   * @param stateModelDefinition     state model definition object for this resource
+   * @return                         set of messages allowed for downward state transitions
    */
-  private boolean isLoadBalanceDownwardForAllReplicas(Map<String, String> currentStateMap,
-      Map<String, String> bestPossibleMap, StateModelDefinition stateModelDef) {
-    Set<String> allInstances = new HashSet<>();
-    allInstances.addAll(currentStateMap.keySet());
-    allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, Integer> statePriorityMap = stateModelDef.getStatePriorityMap();
-
-    for (String instance : allInstances) {
-      String currentState = currentStateMap.get(instance);
-      String bestPossibleState = bestPossibleMap.get(instance);
-      if (currentState == null) {
-        return false; // null -> state is upward
-      }
-      if (bestPossibleState != null) {
-        // Compare priority values and return if an upward transition is found
-        // Note that lower integer value implies higher priority
-        if (!statePriorityMap.containsKey(currentState)
-            || !statePriorityMap.containsKey(bestPossibleState)) {
-          // If the state is not found in statePriorityMap, consider it not strictly downward by
-          // default because we can't determine whether it is downward
-          return false;
-        }
-        if (statePriorityMap.get(currentState) > statePriorityMap.get(bestPossibleState)) {
-          return false;
-        }
-      }
-    }
-    return true;
+  private boolean isLoadBalanceDownwardStateTransition(Message message, StateModelDefinition stateModelDefinition) {
+    // state model definition is not found
+    if (stateModelDefinition == null) {
+      return false;
+    }
+
+    Map<String, Integer> statePriorityMap = stateModelDefinition.getStatePriorityMap();
+    // Compare priority values and return if an upward transition is found
+    // Note that lower integer value implies higher priority
+    // If the state is not found in statePriorityMap, consider it not strictly downward by
+    // default because we can't determine whether it is downward
+    if (statePriorityMap.containsKey(message.getFromState()) && statePriorityMap.containsKey(message.getToState())
+        && statePriorityMap.get(message.getFromState()) > statePriorityMap.get(message.getToState())) {
+      return true;
+    }
+    return false;
   }
 
   /**
    * Check and charge all pending transitions for throttling.
    */
   private void chargePendingTransition(Resource resource, CurrentStateOutput currentStateOutput,
-      StateTransitionThrottleController throttleController, Set<Partition> partitionsNeedRecovery,
-      Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider cache,
-      PartitionStateMap bestPossiblePartitionStateMap,
+      StateTransitionThrottleController throttleController, ResourceControllerDataProvider cache,
+      Map<String, List<String>> preferenceLists, StateModelDefinition stateModelDefinition,
       PartitionStateMap intermediatePartitionStateMap) {
     String resourceName = resource.getResourceName();
-
     // check and charge pending transitions
     for (Partition partition : resource.getPartitions()) {
+      // Custom mode does not apply recovery/load rebalance since user can define different number of replicas for
+      // different partitions.

Review comment:
       Thanks for clarifying! This comment I put is to explain the assumption of how we derive the states we required. But for real situation, the custom mode even not go so far.
   
   In resource level at top level function in IntermediateStage in #1713 line 300, it will return :
   
    if (!throttleController.isThrottleEnabled() || !IdealState.RebalanceMode.FULL_AUTO.equals(
           idealState.getRebalanceMode()) || resourceMessageMap.isEmpty()) {
         return bestPossiblePartitionStateMap;
       }
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org