You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/04/28 20:21:32 UTC

[helix] branch replica_level_throttle updated (29aaf64 -> 6fb8088)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a change to branch replica_level_throttle
in repository https://gitbox.apache.org/repos/asf/helix.git.


 discard 29aaf64  Change throttling logic to per message (#1714)
    omit ec00381  [Replica Level Throttle] Add per replica rebalance type compute logic (#1703)
     add 541d093  Fix test TestResourceChangeDetector. (#1712)
     add 07e3ed5  Remove lock in HelixStateTransitionHandler (#1681)
     add af3ee1c  Change MessageQueueMonitor from static to dynamic metric (#1715)
     new b9e6f63  [Replica Level Throttle] Add per replica rebalance type compute logic (#1703)
     new 6fb8088  Change throttling logic to per message (#1714)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (29aaf64)
            \
             N -- N -- N   refs/heads/replica_level_throttle (6fb8088)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../handling/HelixStateTransitionHandler.java      | 96 +++++++++++-----------
 .../monitoring/mbeans/MessageQueueMonitor.java     | 63 ++++++--------
 .../mbeans/MessageQueueMonitorMBean.java           | 30 -------
 .../changedetector/TestResourceChangeDetector.java | 93 ++++++++-------------
 4 files changed, 109 insertions(+), 173 deletions(-)
 delete mode 100644 helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitorMBean.java

[helix] 01/02: [Replica Level Throttle] Add per replica rebalance type compute logic (#1703)

Posted by jx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch replica_level_throttle
in repository https://gitbox.apache.org/repos/asf/helix.git

commit b9e6f635035361bfc338761924693fd9d7c2f01f
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Fri Apr 23 14:58:48 2021 -0700

    [Replica Level Throttle] Add per replica rebalance type compute logic (#1703)
    
    * Add per replica rebalance type compute logic
    
    Three functions added: 1) rebalance type computation required state.
                           2) rebalance type per message
                           3) message sorting rules and comparators to determine which message to apply first.
---
 .../stages/IntermediateStateCalcStage.java         | 82 +++++++++++++++++++++-
 1 file changed, 81 insertions(+), 1 deletion(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index c0defa9..a840d5e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -29,6 +29,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
@@ -43,6 +45,7 @@ import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.MaintenanceSignal;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
@@ -796,6 +799,54 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
   }
 
   /**
+   * Determine the message rebalance type with message and current states.
+   * @param desiredStates         Ideally how may states we needed for guarantee the health of replica
+   * @param message               The message to be determined what is the rebalance type
+   * @param derivedCurrentStates  Derived from current states with previous messages not be throttled.
+   * @return                      Rebalance type. Recovery or load.
+   */
+  private RebalanceType getRebalanceTypePerMessage(Map<String, Integer> desiredStates, Message message,
+      Map<String, String> derivedCurrentStates) {
+    Map<String, Integer> desiredStatesSnapshot = new HashMap<>(desiredStates);
+    // Looping existing current states to see whether current states fulfilled all the required states.
+    for (String state : derivedCurrentStates.values()) {
+      if (desiredStatesSnapshot.containsKey(state)) {
+        if (desiredStatesSnapshot.get(state) == 1) {
+          desiredStatesSnapshot.remove(state);
+        } else {
+          desiredStatesSnapshot.put(state, desiredStatesSnapshot.get(state) - 1);
+        }
+      }
+    }
+
+    // If the message contains any "required" state changes, then it is considered recovery rebalance.
+    // Otherwise, it is load balance.
+    return desiredStatesSnapshot.containsKey(message.getToState()) ? RebalanceType.RECOVERY_BALANCE
+        : RebalanceType.LOAD_BALANCE;
+  }
+
+  private Map<String, Integer> getRequiredStates(String resourceName,
+      ResourceControllerDataProvider resourceControllerDataProvider, List<String> preferenceList) {
+
+    // Prepare required inputs: 1) Priority State List 2) required number of replica
+    IdealState idealState = resourceControllerDataProvider.getIdealState(resourceName);
+    StateModelDefinition stateModelDefinition =
+        resourceControllerDataProvider.getStateModelDef(idealState.getStateModelDefRef());
+    int requiredNumReplica = idealState.getMinActiveReplicas() == -1
+        ? idealState.getReplicaCount(preferenceList.size())
+        : idealState.getMinActiveReplicas();
+
+    // Generate a state mapping, state -> required numbers based on the live and enabled instances for this partition
+    // preference list
+    LinkedHashMap<String, Integer> expectedStateCountMap = stateModelDefinition.getStateCountMap(
+        (int) preferenceList.stream()
+            .filter(i -> resourceControllerDataProvider.getEnabledLiveInstances().contains(i))
+            .count(), requiredNumReplica); // StateModelDefinition's counts
+
+    return expectedStateCountMap;
+  }
+
+  /**
    * Log rebalancer metadata for debugging purposes.
    * @param resource
    * @param allPartitions
@@ -874,7 +925,36 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     }
   }
 
-  // Compare partitions according following standard:
+  private class MessagePriorityComparator implements Comparator<Message> {
+    private Map<String, Integer> _preferenceInstanceMap;
+    private Map<String, Integer> _statePriorityMap;
+
+    MessagePriorityComparator(List<String> preferenceList, Map<String, Integer> statePriorityMap) {
+      // Get instance -> priority map.
+      _preferenceInstanceMap = IntStream.range(0, preferenceList.size())
+          .boxed()
+          .collect(Collectors.toMap(preferenceList::get, index -> index));
+      _statePriorityMap = statePriorityMap;
+    }
+
+    @Override
+    public int compare(Message m1, Message m2) {
+      //  Compare rules:
+      //     1. Higher target state has higher priority.
+      //     2. If target state is same, range it as preference list order.
+      //     3. Sort by the name of targeted instances just for deterministic ordering.
+      if (m1.getToState().equals(m2.getToState()) && _preferenceInstanceMap.containsKey(m1.getTgtName())
+          && _preferenceInstanceMap.containsKey(m2.getTgtName())) {
+        return _preferenceInstanceMap.get(m1.getTgtName()).compareTo(_preferenceInstanceMap.get(m2.getTgtName()));
+      }
+      if (!m1.getToState().equals(m2.getToState())) {
+        return _statePriorityMap.get(m1.getToState()).compareTo(_statePriorityMap.get(m2.getToState()));
+      }
+      return m1.getTgtName().compareTo(m2.getTgtName());
+    }
+  }
+
+    // Compare partitions according following standard:
   // 1) Partition without top state always is the highest priority.
   // 2) For partition with top-state, the more number of active replica it has, the less priority.
   private class PartitionPriorityComparator implements Comparator<Partition> {

[helix] 02/02: Change throttling logic to per message (#1714)

Posted by jx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch replica_level_throttle
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 6fb8088ad2f91ac2e5a02ebfa1f180c8db142f67
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Wed Apr 28 13:20:09 2021 -0700

    Change throttling logic to per message (#1714)
    
    Apply the logic for throttling with per message quota charge.
---
 .../stages/IntermediateStateCalcStage.java         | 115 ++++++---------------
 1 file changed, 33 insertions(+), 82 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index a840d5e..c46a6ce 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -631,104 +631,55 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
   }
 
   /**
-   * Check the status on throttling at every level (cluster, resource, instance) and set
-   * intermediatePartitionStateMap accordingly per partition.
-   * @param throttleController
-   * @param resourceName
-   * @param partition
-   * @param currentStateOutput
-   * @param bestPossiblePartitionStateMap
-   * @param partitionsThrottled
-   * @param intermediatePartitionStateMap
-   * @param rebalanceType
-   * @param cache
+   * Check the status for a single message on throttling at every level (cluster, resource, replica) and set
+   * intermediatePartitionStateMap accordingly for that replica.
+   * @param throttleController                throttle controller object for throttling quota
+   * @param resourceName                      the resource for throttling check
+   * @param partition                         the partition for throttling check
+   * @param messageToThrottle                 the message to be throttled
+   * @param messagesThrottled                 the cumulative set of messages that have been throttled already. These
+   *                                          messages represent the replicas of this partition that have been throttled.
+   * @param intermediatePartitionStateMap     the cumulative partition-state mapping as a result of the throttling step
+   *                                          of IntermediateStateCalcStage
+   * @param rebalanceType                     the rebalance type to charge quota
+   * @param cache                             cached cluster metadata required by the throttle controller
    */
-  private void throttleStateTransitionsForPartition(
-      StateTransitionThrottleController throttleController, String resourceName,
-      Partition partition, CurrentStateOutput currentStateOutput,
-      PartitionStateMap bestPossiblePartitionStateMap, Set<Partition> partitionsThrottled,
+  private void throttleStateTransitionsForReplica(StateTransitionThrottleController throttleController,
+      String resourceName, Partition partition, Message messageToThrottle, Set<Message> messagesThrottled,
       PartitionStateMap intermediatePartitionStateMap, RebalanceType rebalanceType,
       ResourceControllerDataProvider cache) {
-
-    Map<String, String> currentStateMap =
-        currentStateOutput.getCurrentStateMap(resourceName, partition);
-    Map<String, String> bestPossibleMap = bestPossiblePartitionStateMap.getPartitionMap(partition);
-    Set<String> allInstances = new HashSet<>(currentStateMap.keySet());
-    allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, String> intermediateMap = new HashMap<>();
-
     boolean hasReachedThrottlingLimit = false;
     if (throttleController.shouldThrottleForResource(rebalanceType, resourceName)) {
       hasReachedThrottlingLimit = true;
       if (logger.isDebugEnabled()) {
-        LogUtil.logDebug(logger, _eventId,
-            String.format("Throttled on partition: %s in resource: %s",
-                partition.getPartitionName(), resourceName));
+        LogUtil.logDebug(logger, _eventId, String.format(
+            "Throttled because of cluster/resource quota is full for message {%s} on partition {%s} in resource {%s}",
+            messageToThrottle.getId(), partition.getPartitionName(), resourceName));
       }
     } else {
-      // throttle if any of the instances are not able to accept state transitions
-      for (String instance : allInstances) {
-        String currentState = currentStateMap.get(instance);
-        String bestPossibleState = bestPossibleMap.get(instance);
-        if (bestPossibleState != null && !bestPossibleState.equals(currentState)
-            && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
-                .contains(instance)) {
-          if (throttleController.shouldThrottleForInstance(rebalanceType, instance)) {
-            hasReachedThrottlingLimit = true;
-            if (logger.isDebugEnabled()) {
-              LogUtil.logDebug(logger, _eventId,
-                  String.format(
-                      "Throttled because of instance: %s for partition: %s in resource: %s",
-                      instance, partition.getPartitionName(), resourceName));
-            }
-            break;
+      // Since message already generated, we can assume the current state is not null and target state is not null
+      if (!cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
+          .contains(messageToThrottle.getTgtName())) {
+        if (throttleController.shouldThrottleForInstance(rebalanceType, messageToThrottle.getTgtName())) {
+          hasReachedThrottlingLimit = true;
+          if (logger.isDebugEnabled()) {
+            LogUtil.logDebug(logger, _eventId, String.format(
+                "Throttled because of instance level quota is full on instance {%s} for message {%s} of partition {%s} in resource {%s}",
+                messageToThrottle.getId(), messageToThrottle.getTgtName(), partition.getPartitionName(), resourceName));
           }
         }
       }
     }
+    // If there is still room for this replica, proceed to charge at the cluster and resource level and set the
+    // intermediate partition-state mapping so that the state transition message can move forward.
     if (!hasReachedThrottlingLimit) {
-      // This implies that there is room for more state transitions.
-      // Find instances with a replica whose current state is different from BestPossibleState and
-      // "charge" for it, and bestPossibleStates will become intermediate states
-      intermediateMap.putAll(bestPossibleMap);
-      boolean shouldChargeForPartition = false;
-      for (String instance : allInstances) {
-        String currentState = currentStateMap.get(instance);
-        String bestPossibleState = bestPossibleMap.get(instance);
-        if (bestPossibleState != null && !bestPossibleState.equals(currentState)
-            && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
-                .contains(instance)) {
-          throttleController.chargeInstance(rebalanceType, instance);
-          shouldChargeForPartition = true;
-        }
-      }
-      if (shouldChargeForPartition) {
-        throttleController.chargeCluster(rebalanceType);
-        throttleController.chargeResource(rebalanceType, resourceName);
-      }
+      throttleController.chargeCluster(rebalanceType);
+      throttleController.chargeResource(rebalanceType, resourceName);
+      intermediatePartitionStateMap.setState(partition, messageToThrottle.getTgtName(), messageToThrottle.getToState());
     } else {
-      // No more room for more state transitions; current states will just become intermediate
-      // states unless the partition is disabled
-      // Add this partition to a set of throttled partitions
-      for (String instance : allInstances) {
-        String currentState = currentStateMap.get(instance);
-        String bestPossibleState = bestPossibleMap.get(instance);
-        if (bestPossibleState != null && !bestPossibleState.equals(currentState)
-            && cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
-                .contains(instance)) {
-          // Because this partition is disabled, we allow assignment
-          intermediateMap.put(instance, bestPossibleState);
-        } else {
-          // This partition is not disabled, so it must be throttled by just passing on the current
-          // state
-          if (currentState != null) {
-            intermediateMap.put(instance, currentState);
-          }
-          partitionsThrottled.add(partition);
-        }
-      }
+      // Intermediate Map is based on current state
+      messagesThrottled.add(messageToThrottle);
     }
-    intermediatePartitionStateMap.setState(partition, intermediateMap);
   }
 
   /**