You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/11/02 06:34:51 UTC

[incubator-pinot] branch master updated: Enhance TableRebalancer to support no-downtime rebalance for strict replica-group routing tables (#6212)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e0f15aa  Enhance TableRebalancer to support no-downtime rebalance for strict replica-group routing tables (#6212)
e0f15aa is described below

commit e0f15aab71cda8fce3f06d2f53a4c7e7a4f3c523
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Sun Nov 1 22:34:34 2020 -0800

    Enhance TableRebalancer to support no-downtime rebalance for strict replica-group routing tables (#6212)
    
    On top of #6208, this PR enhances the TableRebalancer to support the no-downtime rebalance for strict replica-group routing, which still hold the minimum available replicas requirement.
---
 .../helix/core/rebalance/TableRebalancer.java      | 106 +++++-
 .../helix/core/rebalance/TableRebalancerTest.java  | 372 +++++++++++++++++----
 2 files changed, 404 insertions(+), 74 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 644e990..081e52c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -23,7 +23,9 @@ import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.TimeoutException;
 import org.I0Itec.zkclient.exception.ZkBadVersionException;
 import org.apache.commons.configuration.Configuration;
@@ -46,6 +48,7 @@ import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign
 import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
 import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.spi.config.table.RoutingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@@ -127,12 +130,15 @@ public class TableRebalancer {
     int minReplicasToKeepUpForNoDowntime = rebalanceConfig
         .getInt(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
             RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME);
+    boolean enableStrictReplicaGroup =
+        tableConfig.getRoutingConfig() != null && RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE
+            .equalsIgnoreCase(tableConfig.getRoutingConfig().getInstanceSelectorType());
     boolean bestEfforts = rebalanceConfig
         .getBoolean(RebalanceConfigConstants.BEST_EFFORTS, RebalanceConfigConstants.DEFAULT_BEST_EFFORTS);
     LOGGER.info(
-        "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: {}, bestEfforts: {}",
+        "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, bestEfforts: {}",
         tableNameWithType, dryRun, reassignInstances, includeConsuming, bootstrap, downtime,
-        minReplicasToKeepUpForNoDowntime, bestEfforts);
+        minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, bestEfforts);
 
     // Validate table config
     try {
@@ -334,8 +340,8 @@ public class TableRebalancer {
       minAvailableReplicas = Math.max(numReplicas + minReplicasToKeepUpForNoDowntime, 0);
     }
 
-    LOGGER.info("Rebalancing table: {} with minAvailableReplicas: {}, bestEfforts: {}", tableNameWithType,
-        minAvailableReplicas, bestEfforts);
+    LOGGER.info("Rebalancing table: {} with minAvailableReplicas: {}, enableStrictReplicaGroup: {}, bestEfforts: {}",
+        tableNameWithType, minAvailableReplicas, enableStrictReplicaGroup, bestEfforts);
     int expectedVersion = currentIdealState.getRecord().getVersion();
     while (true) {
       // Wait for ExternalView to converge before updating the next IdealState
@@ -373,8 +379,10 @@ public class TableRebalancer {
       }
 
       if (currentAssignment.equals(targetAssignment)) {
-        LOGGER.info("Finished rebalancing table: {} with minAvailableReplicas: {}, bestEfforts: {} in {}ms.",
-            tableNameWithType, minAvailableReplicas, bestEfforts, System.currentTimeMillis() - startTimeMs);
+        LOGGER.info(
+            "Finished rebalancing table: {} with minAvailableReplicas: {}, enableStrictReplicaGroup: {}, bestEfforts: {} in {}ms.",
+            tableNameWithType, minAvailableReplicas, enableStrictReplicaGroup, bestEfforts,
+            System.currentTimeMillis() - startTimeMs);
         return new RebalanceResult(RebalanceResult.Status.DONE,
             "Success with minAvailableReplicas: " + minAvailableReplicas
                 + " (both IdealState and ExternalView should reach the target segment assignment)",
@@ -382,7 +390,7 @@ public class TableRebalancer {
       }
 
       Map<String, Map<String, String>> nextAssignment =
-          getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas);
+          getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup);
       LOGGER.info("Got the next assignment for table: {} with number of segments to be moved to each instance: {}",
           tableNameWithType,
           SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, nextAssignment));
@@ -546,22 +554,76 @@ public class TableRebalancer {
     return true;
   }
 
-  private static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment,
-      Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas) {
+  /**
+   * Returns the next assignment for the table based on the current assignment and the target assignment with regards to
+   * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all
+   * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement
+   * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment.
+   */
+  @VisibleForTesting
+  static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup) {
+    return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
+        minAvailableReplicas)
+        : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas);
+  }
+
+  private static Map<String, Map<String, String>> getNextStrictReplicaGroupAssignment(
+      Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment,
+      int minAvailableReplicas) {
     Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
+    Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>();
+    for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> currentInstanceStateMap = entry.getValue();
+      Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName);
+      SingleSegmentAssignment assignment =
+          getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas);
+      Set<String> assignedInstances = assignment._instanceStateMap.keySet();
+      Set<String> availableInstances = assignment._availableInstances;
+      availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> {
+        if (currentAvailableInstances == null) {
+          // First segment assigned to these instances, use the new assignment and update the available instances
+          nextAssignment.put(segmentName, assignment._instanceStateMap);
+          return availableInstances;
+        } else {
+          // There are other segments assigned to the same instances, check the available instances to see if adding the
+          // new assignment can still hold the minimum available replicas requirement
+          availableInstances.retainAll(currentAvailableInstances);
+          if (availableInstances.size() >= minAvailableReplicas) {
+            // New assignment can be added
+            nextAssignment.put(segmentName, assignment._instanceStateMap);
+            return availableInstances;
+          } else {
+            // New assignment cannot be added, use the current instance state map
+            nextAssignment.put(segmentName, currentInstanceStateMap);
+            return currentAvailableInstances;
+          }
+        }
+      });
+    }
+    return nextAssignment;
+  }
 
+  private static Map<String, Map<String, String>> getNextNonStrictReplicaGroupAssignment(
+      Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment,
+      int minAvailableReplicas) {
+    Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
     for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) {
       String segmentName = entry.getKey();
       nextAssignment.put(segmentName,
-          getNextInstanceStateMap(entry.getValue(), targetAssignment.get(segmentName), minAvailableReplicas));
+          getNextSingleSegmentAssignment(entry.getValue(), targetAssignment.get(segmentName),
+              minAvailableReplicas)._instanceStateMap);
     }
-
     return nextAssignment;
   }
 
+  /**
+   * Returns the next assignment for a segment based on the current instance state map and the target instance state map
+   * with regards to the minimum available replicas requirement.
+   */
   @VisibleForTesting
-  @SuppressWarnings("Duplicates")
-  static Map<String, String> getNextInstanceStateMap(Map<String, String> currentInstanceStateMap,
+  static SingleSegmentAssignment getNextSingleSegmentAssignment(Map<String, String> currentInstanceStateMap,
       Map<String, String> targetInstanceStateMap, int minAvailableReplicas) {
     Map<String, String> nextInstanceStateMap = new TreeMap<>();
 
@@ -586,6 +648,7 @@ public class TableRebalancer {
         }
       }
     }
+    Set<String> availableInstances = new TreeSet<>(nextInstanceStateMap.keySet());
 
     // Add target instances until the number of instances matched
     int instancesToAdd = targetInstanceStateMap.size() - nextInstanceStateMap.size();
@@ -601,6 +664,21 @@ public class TableRebalancer {
       }
     }
 
-    return nextInstanceStateMap;
+    return new SingleSegmentAssignment(nextInstanceStateMap, availableInstances);
+  }
+
+  /**
+   * Assignment result for a single segment.
+   */
+  @VisibleForTesting
+  static class SingleSegmentAssignment {
+    final Map<String, String> _instanceStateMap;
+    // Instances that are common in both current instance state and next instance state of the segment
+    final Set<String> _availableInstances;
+
+    SingleSegmentAssignment(Map<String, String> instanceStateMap, Set<String> availableInstances) {
+      _instanceStateMap = instanceStateMap;
+      _availableInstances = availableInstances;
+    }
   }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
index a4e41f9..681ed2c 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.controller.helix.core.rebalance;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
 import org.testng.annotations.Test;
 
@@ -43,27 +45,31 @@ public class TableRebalancerTest {
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE);
     Map<String, String> targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3"), ONLINE);
-    Map<String, String> nextInstanceStateMap =
-        TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 0);
-    assertEquals(nextInstanceStateMap, targetInstanceStateMap);
+    TableRebalancer.SingleSegmentAssignment assignment =
+        TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, Collections.singleton("host1"));
 
     // Without common instance, next assignment should be the same as target assignment
     targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host3", "host4"), ONLINE);
-    nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 0);
-    assertEquals(nextInstanceStateMap, targetInstanceStateMap);
+    assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertTrue(assignment._availableInstances.isEmpty());
 
     // With increasing number of replicas, next assignment should be the same as target assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host3", "host4", "host5"), ONLINE);
-    nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 0);
-    assertEquals(nextInstanceStateMap, targetInstanceStateMap);
+    assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertTrue(assignment._availableInstances.isEmpty());
 
     // With decreasing number of replicas, next assignment should be the same as target assignment
     currentInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE);
     targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5"), ONLINE);
-    nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 0);
-    assertEquals(nextInstanceStateMap, targetInstanceStateMap);
+    assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertTrue(assignment._availableInstances.isEmpty());
   }
 
   @Test
@@ -73,35 +79,41 @@ public class TableRebalancerTest {
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE);
     Map<String, String> targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host4"), ONLINE);
-    Map<String, String> nextInstanceStateMap =
-        TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 1);
-    assertEquals(nextInstanceStateMap, targetInstanceStateMap);
+    TableRebalancer.SingleSegmentAssignment assignment =
+        TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
 
     // With 1 common instance, next assignment should be the same as target assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE);
-    nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 1);
-    assertEquals(nextInstanceStateMap, targetInstanceStateMap);
+    assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, Collections.singleton("host1"));
 
     // Without common instance, next assignment should have 1 common instances with current assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5", "host6"), ONLINE);
     // [host1, host4, host5]
-    nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 1);
-    assertEquals(getNumCommonInstances(currentInstanceStateMap, nextInstanceStateMap), 1);
+    assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+    assertEquals(assignment._availableInstances, Collections.singleton("host1"));
     // Next round should make the assignment the same as target assignment
-    assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, targetInstanceStateMap, 1),
-        targetInstanceStateMap);
+    assignment =
+        TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host4", "host5")));
 
     // With increasing number of replicas, next assignment should have 1 common instances with current assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5", "host6", "host7"), ONLINE);
     // [host1, host4, host5, host6]
-    nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 1);
-    assertEquals(getNumCommonInstances(currentInstanceStateMap, nextInstanceStateMap), 1);
+    assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+    assertEquals(assignment._availableInstances, Collections.singleton("host1"));
     // Next round should make the assignment the same as target assignment
-    assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, targetInstanceStateMap, 1),
-        targetInstanceStateMap);
+    assignment =
+        TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host4", "host5", "host6")));
 
     // With decreasing number of replicas, next assignment should have 1 common instances with current assignment
     currentInstanceStateMap =
@@ -109,11 +121,13 @@ public class TableRebalancerTest {
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7"), ONLINE);
     // [host1, host5, host6]
-    nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 1);
-    assertEquals(getNumCommonInstances(currentInstanceStateMap, nextInstanceStateMap), 1);
+    assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+    assertEquals(assignment._availableInstances, Collections.singleton("host1"));
     // Next round should make the assignment the same as target assignment
-    assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, targetInstanceStateMap, 1),
-        targetInstanceStateMap);
+    assignment =
+        TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
   }
 
   @Test
@@ -123,72 +137,310 @@ public class TableRebalancerTest {
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3", "host4"), ONLINE);
     Map<String, String> targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3", "host5"), ONLINE);
-    Map<String, String> nextInstanceStateMap =
-        TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 2);
-    assertEquals(nextInstanceStateMap, targetInstanceStateMap);
+    TableRebalancer.SingleSegmentAssignment assignment =
+        TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
 
     // With 2 common instances, next assignment should be the same as target assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6"), ONLINE);
-    nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 2);
-    assertEquals(nextInstanceStateMap, targetInstanceStateMap);
+    assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
 
     // With 1 common instance, next assignment should have 2 common instances with current assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5", "host6", "host7"), ONLINE);
-    nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 2);
     // [host1, host2, host5, host6]
-    assertEquals(getNumCommonInstances(currentInstanceStateMap, nextInstanceStateMap), 2);
+    assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
     // Next round should make the assignment the same as target assignment
-    assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, targetInstanceStateMap, 2),
-        targetInstanceStateMap);
+    assignment =
+        TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host5", "host6")));
 
     // Without common instance, next assignment should have 2 common instances with current assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7", "host8"), ONLINE);
     // [host1, host2, host5, host6]
-    nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 2);
-    assertEquals(getNumCommonInstances(currentInstanceStateMap, nextInstanceStateMap), 2);
+    assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
     // Next round should make the assignment the same as target assignment
-    assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, targetInstanceStateMap, 2),
-        targetInstanceStateMap);
+    assignment =
+        TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
 
     // With increasing number of replicas, next assignment should have 1 common instances with current assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7", "host8", "host9"), ONLINE);
     // [host1, host2, host5, host6, host7]
-    nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 2);
-    assertEquals(getNumCommonInstances(currentInstanceStateMap, nextInstanceStateMap), 2);
+    assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
     // Next round should make the assignment the same as target assignment
-    assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, targetInstanceStateMap, 2),
-        targetInstanceStateMap);
+    assignment =
+        TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6", "host7")));
 
     // With decreasing number of replicas, next assignment should have 2 common instances with current assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7"), ONLINE);
     // [host1, host2, host5]
-    Map<String, String> firstRoundInstanceStateMap =
-        TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 2);
-    assertEquals(getNumCommonInstances(currentInstanceStateMap, firstRoundInstanceStateMap), 2);
+    assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
     // Next round should have 2 common instances with first round assignment
     // [host1, host5, host6]
-    Map<String, String> secondRoundInstanceStateMap =
-        TableRebalancer.getNextInstanceStateMap(firstRoundInstanceStateMap, targetInstanceStateMap, 2);
-    assertEquals(getNumCommonInstances(firstRoundInstanceStateMap, secondRoundInstanceStateMap), 2);
+    assignment =
+        TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host5")));
     // Next round should make the assignment the same as target assignment
-    assertEquals(TableRebalancer.getNextInstanceStateMap(secondRoundInstanceStateMap, targetInstanceStateMap, 2),
-        targetInstanceStateMap);
+    assignment =
+        TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
   }
 
-  private int getNumCommonInstances(Map<String, String> currentInstanceStateMap,
-      Map<String, String> nextInstanceStateMap) {
-    int numCommonInstances = 0;
-    for (String instanceId : currentInstanceStateMap.keySet()) {
-      if (nextInstanceStateMap.containsKey(instanceId)) {
-        numCommonInstances++;
-      }
-    }
-    return numCommonInstances;
+  @Test
+  public void testStrictReplicaGroup() {
+    // Current assignment:
+    // {
+    //   "segment1": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
+    currentAssignment
+        .put("segment1", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
+    currentAssignment
+        .put("segment2", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE));
+    currentAssignment
+        .put("segment3", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
+    currentAssignment
+        .put("segment4", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE));
+
+    // Target assignment:
+    // {
+    //   "segment1": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   }
+    // }
+    Map<String, Map<String, String>> targetAssignment = new TreeMap<>();
+    targetAssignment
+        .put("segment1", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host5"), ONLINE));
+    targetAssignment
+        .put("segment2", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE));
+    targetAssignment
+        .put("segment3", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host5"), ONLINE));
+    targetAssignment
+        .put("segment4", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE));
+
+    // Next assignment with 2 minimum available replicas with or without strict replica-group should reach the target
+    // assignment
+    Map<String, Map<String, String>> nextAssignment =
+        TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, false);
+    assertEquals(nextAssignment, targetAssignment);
+    nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, true);
+    assertEquals(nextAssignment, targetAssignment);
+
+    // Current assignment:
+    // {
+    //   "segment1": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    currentAssignment = new TreeMap<>();
+    currentAssignment
+        .put("segment1", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
+    currentAssignment
+        .put("segment2", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE));
+    currentAssignment
+        .put("segment3", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
+    currentAssignment
+        .put("segment4", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE));
+
+    // Target assignment:
+    // {
+    //   "segment1": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host1": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host1": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   }
+    // }
+    targetAssignment = new TreeMap<>();
+    targetAssignment
+        .put("segment1", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE));
+    targetAssignment
+        .put("segment2", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE));
+    targetAssignment
+        .put("segment3", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE));
+    targetAssignment
+        .put("segment4", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE));
+
+    // Next assignment with 2 minimum available replicas without strict replica-group:
+    // (This assignment will move "segment1" and "segment3" from "host3" to "host4", and move "segment2" and "segment4"
+    // from "host3" to "host1". "host1" and "host4" might be unavailable for strict replica-group routing, which breaks
+    // the minimum available replicas requirement.)
+    // {
+    //   "segment1": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, false);
+    assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+    assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+    assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+    assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+
+    // Next assignment with 2 minimum available replicas with strict replica-group:
+    // (This assignment will only move "segment1" and "segment3" from "host3" to "host4". Only "host4" can be
+    // unavailable for strict replica-group routing during the rebalance, which meets the minimum available replicas
+    // requirement.)
+    // {
+    //   "segment1": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, true);
+    assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+    assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host2", "host3", "host4")));
+    assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+    assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host2", "host3", "host4")));
+
+    // Next assignment with 2 minimum available replicas with strict replica-group:
+    // {
+    //   "segment1": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, true);
+    assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host2", "host4", "host6")));
+    assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+    assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host2", "host4", "host6")));
+    assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+
+    // Next assignment with 2 minimum available replicas with strict replica-group should reach the target assignment
+    nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, true);
+    assertEquals(nextAssignment, targetAssignment);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org