You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/04/01 00:46:58 UTC

[GitHub] [pinot] jtao15 commented on a change in pull request #8441: Add retainInstancesSequence feature to table rebalance to minimize data movement between instances

jtao15 commented on a change in pull request #8441:
URL: https://github.com/apache/pinot/pull/8441#discussion_r840126785



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
##########
@@ -145,6 +145,8 @@ public RebalanceResult rebalance(TableConfig tableConfig, Configuration rebalanc
         tableConfig.getRoutingConfig().getInstanceSelectorType());
     boolean bestEfforts = rebalanceConfig.getBoolean(RebalanceConfigConstants.BEST_EFFORTS,
         RebalanceConfigConstants.DEFAULT_BEST_EFFORTS);
+    boolean retainInstanceSequence = rebalanceConfig.getBoolean(RebalanceConfigConstants.RETAIN_INSTANCE_SEQUENCE,
+        RebalanceConfigConstants.DEFAULT_RETAIN_INSTANCE_SEQUENCE);

Review comment:
       Should we throw exceptions if `retainInstanceSequence = true` and `reassignInstances = true` or the table does not allow instance assignment? 

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
##########
@@ -121,12 +190,40 @@ public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String table
       LOGGER.info("Selecting pools: {} for table: {}", poolsToSelect, _tableNameWithType);
       pools.retainAll(poolsToSelect);
     } else {
-      // Non-pool based selection
+      // Non-pool based selection. All the instances should be associated with a single pool, which is always 0.
+      // E.g.: Pool0 -> [ I1, I2, I3, I4, I5, I6 ]
 
-      LOGGER.info("Selecting {} instances for table: {}", numCandidateInstances, _tableNameWithType);
+      LOGGER.info("Selecting {} instances for table: {}", candidateInstanceConfigsMap.size(), _tableNameWithType);
       // Put all instance configs as pool 0
-      poolToInstanceConfigsMap.put(0, candidateInstanceConfigs);
+
+      for (Map.Entry<Integer, List<String>> entry : existingPoolToInstancesMap.entrySet()) {

Review comment:
       This assume the pools are not changed between `existingInstancePartitions` and `instanceConfigs`. Is it possible that all the instances in the same pool got removed? In other words, is it possible that `poolToNewInstanceConfigsMap.get(pool)` is empty because the pools are changed?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
##########
@@ -48,52 +52,117 @@ public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String table
 
   /**
    * Returns a map from pool to instance configs based on the tag and pool config for the given instance configs.
+   * @param instanceConfigs list of latest instance configs from ZK.
+   * @param existingPoolToInstancesMap existing instance with sequence that should be respected. An empty list
+   *                                      means no preceding sequence to respect and the instances would be sorted.
    */
-  public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> instanceConfigs) {
+  public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> instanceConfigs,
+      Map<Integer, List<String>> existingPoolToInstancesMap) {
     int tableNameHash = Math.abs(_tableNameWithType.hashCode());
     LOGGER.info("Starting instance tag/pool selection for table: {} with hash: {}", _tableNameWithType, tableNameHash);
 
-    // Filter out the instances with the correct tag
+    // If existingPoolToInstancesMap is null, treat it as an empty map.
+    if (existingPoolToInstancesMap == null) {
+      existingPoolToInstancesMap = Collections.emptyMap();
+    }
+    // Filter out the instances with the correct tag.
+    // Use LinkedHashMap here to retain the sorted list of instance names.
     String tag = _tagPoolConfig.getTag();
-    List<InstanceConfig> candidateInstanceConfigs = new ArrayList<>();
+    Map<String, InstanceConfig> candidateInstanceConfigsMap = new LinkedHashMap<>();
     for (InstanceConfig instanceConfig : instanceConfigs) {
       if (instanceConfig.getTags().contains(tag)) {
-        candidateInstanceConfigs.add(instanceConfig);
+        candidateInstanceConfigsMap.put(instanceConfig.getInstanceName(), instanceConfig);
       }
     }
-    candidateInstanceConfigs.sort(Comparator.comparing(InstanceConfig::getInstanceName));
-    int numCandidateInstances = candidateInstanceConfigs.size();
+
+    // Find out newly added instances from the latest copies of instance configs.
+    // A deque is used here in order to retain the sequence,
+    // given the fact that the list of instance configs is always sorted.
+    Deque<String> newlyAddedInstances = new LinkedList<>(candidateInstanceConfigsMap.keySet());
+    for (List<String> existingInstancesWithSequence : existingPoolToInstancesMap.values()) {
+      newlyAddedInstances.removeAll(existingInstancesWithSequence);
+    }
+
+    int numCandidateInstances = candidateInstanceConfigsMap.size();
     Preconditions.checkState(numCandidateInstances > 0, "No enabled instance has the tag: %s", tag);
     LOGGER.info("{} enabled instances have the tag: {} for table: {}", numCandidateInstances, tag, _tableNameWithType);
 
-    Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new TreeMap<>();
+    Map<Integer, List<InstanceConfig>> poolToLatestInstanceConfigsMap = new TreeMap<>();
     if (_tagPoolConfig.isPoolBased()) {
-      // Pool based selection
+      // Pool based selection. All the instances should be associated with a specific pool number.
+      // Instance selection should be done within the same pool.
+      // E.g.: Pool0 -> [ I1, I2, I3 ]
+      //       Pool1 -> [ I4, I5, I6 ]
 
-      // Extract the pool information from the instance configs
-      for (InstanceConfig instanceConfig : candidateInstanceConfigs) {
+      // Each pool number associates with a map that key is the instance name and value is the instance config.
+      Map<Integer, Map<String, InstanceConfig>> poolToInstanceConfigsMap = new HashMap<>();
+      // Each pool number associates with a list of newly added instance configs,
+      // so that new instances can be fetched from this list.
+      Map<Integer, Deque<InstanceConfig>> poolToNewInstanceConfigsMap = new HashMap<>();
+
+      // Extract the pool information from the instance configs.
+      for (Map.Entry<String, InstanceConfig> entry : candidateInstanceConfigsMap.entrySet()) {
+        String instanceName = entry.getKey();
+        InstanceConfig instanceConfig = entry.getValue();
         Map<String, String> poolMap = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY);
         if (poolMap != null && poolMap.containsKey(tag)) {
           int pool = Integer.parseInt(poolMap.get(tag));
-          poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(instanceConfig);
+          poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new TreeMap<>()).put(instanceName, instanceConfig);
+          if (newlyAddedInstances.contains(instanceName)) {
+            poolToNewInstanceConfigsMap.computeIfAbsent(pool, k -> new LinkedList<>()).add(instanceConfig);
+          }
+        }
+      }
+
+      for (Map.Entry<Integer, List<String>> entry : existingPoolToInstancesMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<String> existingInstanceAssignmentInPool = entry.getValue();
+        List<InstanceConfig> candidateInstanceConfigsWithSequence = new ArrayList<>();
+        for (String existingInstance: existingInstanceAssignmentInPool) {
+          InstanceConfig instanceConfig = poolToInstanceConfigsMap.get(pool).get(existingInstance);
+          // Add instances to the candidate list and respect the sequence of the existing instances from the ZK.
+          // The missing/removed instances will be replaced by the newly instances.
+          // If the instance still exists from ZK, then add it to the candidate list.
+          // E.g. if the old instances are: [I1, I2, I3, I4] and the new instance are: [I1, I3, I4, I5, I6],
+          // the removed instance is I2 and the newly added instances are I5 and I6.
+          // The position of I2 would be replaced by I5, the new remaining I6 would be appended to the tail.
+          // Thus, the new order would be [I1, I5, I3, I4, I6].

Review comment:
       `HashBasedRotateInstanceConstraintApplier` will rotate the list which depends on the number of instances, even if we retain the instances order here, after applying the constraints, the order can be different?
   Say we need to select 4 instances (2 replicas * 2 instances/replica), and the rotated list is `[I6, I1, I5, I3, I4]`, then we will pick `[I6, I1, I5, I3]`. This won't guarantee minimum segments movement because we need to move all segments from `I4` to `I5` or `I6`. Is this a valid case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org