You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/01 07:17:18 UTC

(pinot) 05/05: Enhance algorithm

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch maintain-pool-selection-for-minimizeDataMovement
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit a286bd868f434c72e32351890566d684ec14a378
Author: Xiaotian (Jackie) Jiang <ja...@gmail.com>
AuthorDate: Wed Jan 31 23:15:41 2024 -0800

    Enhance algorithm
---
 .../instance/FDAwareInstancePartitionSelector.java |   7 +-
 .../instance/InstancePartitionSelector.java        |  10 +-
 .../InstanceReplicaGroupPartitionSelector.java     | 768 +++++++++++----------
 .../instance/InstanceTagPoolSelector.java          |  56 +-
 .../instance/InstanceAssignmentTest.java           |  43 +-
 .../InstanceReplicaGroupPartitionSelectorTest.java | 141 ++--
 .../java/org/apache/pinot/spi/utils/Pairs.java     |  23 +-
 7 files changed, 562 insertions(+), 486 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
index de96d4da4d..89d64272e3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
@@ -109,10 +109,7 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector
     return new ImmutablePair<>(numReplicaGroups, numInstancesPerReplicaGroup);
   }
 
-  /**
-   * Selects instances based on the replica-group/partition config, and stores the result into the given instance
-   * partitions.
-   */
+  @Override
   public void selectInstances(Map<Integer, List<InstanceConfig>> faultDomainToInstanceConfigsMap,
       InstancePartitions instancePartitions) {
 
@@ -152,7 +149,7 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector
        * initialize the new replicaGroupBasedAssignmentState for assignment,
        * place existing instances in their corresponding positions
        */
-      if (_minimizeDataMovement && _existingInstancePartitions != null) {
+      if (_minimizeDataMovement) {
         int numExistingReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
         int numExistingPartitions = _existingInstancePartitions.getNumPartitions();
         /*
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
index 335070b003..b80ad8bba9 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
 
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
@@ -32,13 +33,14 @@ abstract class InstancePartitionSelector {
   protected final boolean _minimizeDataMovement;
 
   public InstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
-      String tableNameWithType, InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
+      String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
     _replicaGroupPartitionConfig = replicaGroupPartitionConfig;
     _tableNameWithType = tableNameWithType;
     _existingInstancePartitions = existingInstancePartitions;
-    // For backward compatibility, enable minimize data movement when it is enabled in top level or instance
-    // partition selector level.
-    _minimizeDataMovement = minimizeDataMovement || replicaGroupPartitionConfig.isMinimizeDataMovement();
+    // For backward compatibility, enable minimize data movement when it is enabled in top level or instance partition
+    // selector level
+    _minimizeDataMovement = (minimizeDataMovement || replicaGroupPartitionConfig.isMinimizeDataMovement())
+        && existingInstancePartitions != null;
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index 505006f1d3..8da6dbe2f6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -19,24 +19,22 @@
 package org.apache.pinot.controller.helix.core.assignment.instance;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.TreeMap;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
-import org.apache.pinot.spi.utils.Pairs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,10 +51,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
     super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement);
   }
 
-  /**
-   * Selects instances based on the replica-group/partition config, and stores the result into the given instance
-   * partitions.
-   */
+  @Override
   public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
       InstancePartitions instancePartitions) {
     int numPools = poolToInstanceConfigsMap.size();
@@ -65,393 +60,448 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
     int tableNameHash = Math.abs(_tableNameWithType.hashCode());
     List<Integer> pools = new ArrayList<>(poolToInstanceConfigsMap.keySet());
     pools.sort(null);
-    LOGGER.info("Starting instance replica-group/partition selection for table: {} with hash: {} from pools: {}",
-        _tableNameWithType, tableNameHash, pools);
+    LOGGER.info("Starting instance replica-group/partition selection for table: {} with hash: {} from pools: {}, "
+        + "minimize data movement: {}", _tableNameWithType, tableNameHash, pools, _minimizeDataMovement);
 
     if (_replicaGroupPartitionConfig.isReplicaGroupBased()) {
-      // Replica-group based selection
-
-      int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups();
-      Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive");
-      Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
-      Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>();
-      Map<Integer, Set<Integer>> existingPoolToExistingReplicaGroupIdsMap = new TreeMap<>();
-      Map<Integer, Set<String>> existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>();
-      Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
-      Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
-      Map<String, Integer> instanceToPoolMap = new HashMap<>();
-      for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
-        Integer pool = entry.getKey();
-        List<InstanceConfig> instanceConfigsInPool = entry.getValue();
-        Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
-        for (InstanceConfig instanceConfig : instanceConfigsInPool) {
-          String instanceName = instanceConfig.getInstanceName();
-          candidateInstances.add(instanceName);
-          instanceToPoolMap.put(instanceName, pool);
-        }
+      if (_minimizeDataMovement) {
+        replicaGroupBasedMinimumMovement(poolToInstanceConfigsMap, instancePartitions, pools, tableNameHash);
+      } else {
+        replicaGroupBasedSimple(poolToInstanceConfigsMap, instancePartitions, pools, tableNameHash);
       }
+    } else {
+      nonReplicaGroupBased(poolToInstanceConfigsMap, instancePartitions, pools, tableNameHash);
+    }
+  }
 
-      if (_minimizeDataMovement && _existingInstancePartitions != null) {
-        // Collect the stats between the existing pools, existing replica groups, and existing instances.
-        int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
-        int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
-        for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
-          for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
-            List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
-            for (String existingInstance : existingInstances) {
-              Integer existingPool = instanceToPoolMap.get(existingInstance);
-              if (existingPool != null) {
-                existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>())
-                    .add(existingInstance);
-                existingPoolToExistingReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new HashSet<>())
-                    .add(replicaGroupId);
-                existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>())
-                    .add(existingInstance);
-              }
-            }
-          }
-        }
-
-        // Use a max heap to track the number of servers used for the given pools,
-        // so that pool with max number of existing instances will be considered first.
-        PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false));
-        for (int pool : pools) {
-          maxHeap.add(
-              new Pairs.IntPair(existingPoolsToExistingInstancesMap.computeIfAbsent(pool, k -> new HashSet<>()).size(),
-                  pool));
-        }
+  private void nonReplicaGroupBased(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
+      InstancePartitions instancePartitions, List<Integer> pools, int tableNameHash) {
+    // Pick one pool based on the table name hash
+    int pool = pools.get(Math.abs(tableNameHash % pools.size()));
+    LOGGER.info("Selecting pool: {} for table: {}", pool, _tableNameWithType);
+    List<InstanceConfig> instanceConfigs = poolToInstanceConfigsMap.get(pool);
+    int numInstances = instanceConfigs.size();
 
-        // Get the maximum number of replica groups per pool.
-        int maxNumberOfReplicaGroupPerPool = numReplicaGroups / pools.size();
-        // Given a pool number, assign replica group which has the max number of existing instances.
-        // Repeat this process until the max number of replica groups per pool is reached.
-        while (!maxHeap.isEmpty()) {
-          Pairs.IntPair pair = maxHeap.remove();
-          int poolNumber = pair.getRight();
-          for (int i = 0; i < maxNumberOfReplicaGroupPerPool; i++) {
-            Set<Integer> existingReplicaGroups = existingPoolToExistingReplicaGroupIdsMap.get(poolNumber);
-            if (existingReplicaGroups == null || existingReplicaGroups.isEmpty()) {
-              continue;
-            }
-            int targetReplicaGroupId = -1;
-            int maxNumInstances = 0;
-            for (int existingReplicaGroupId : existingReplicaGroups) {
-              int numExistingInstances =
-                  existingReplicaGroupIdToExistingInstancesMap.getOrDefault(existingReplicaGroupId, new HashSet<>())
-                      .size();
-              if (numExistingInstances > maxNumInstances) {
-                maxNumInstances = numExistingInstances;
-                targetReplicaGroupId = existingReplicaGroupId;
-              }
-            }
-            // If target existing replica group cannot be found, it means it should be chosen from a new replica group.
-            if (targetReplicaGroupId > -1) {
-              poolToReplicaGroupIdsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(targetReplicaGroupId);
-              replicaGroupIdToPoolMap.put(targetReplicaGroupId, poolNumber);
-              // Clear the stats so that the same replica group won't be picked up again in later iteration.
-              existingReplicaGroupIdToExistingInstancesMap.get(targetReplicaGroupId).clear();
-            }
-          }
-        }
+    // Assign all instances if not configured
+    int numInstancesToSelect = _replicaGroupPartitionConfig.getNumInstances();
+    if (numInstancesToSelect > 0) {
+      Preconditions.checkState(numInstancesToSelect <= numInstances,
+          "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstances,
+          numInstancesToSelect);
+    } else {
+      numInstancesToSelect = numInstances;
+    }
 
-        // If there is any new replica group added, choose pool which is least frequently picked up.
-        // Use a min heap to track the least frequently picked pool among all the pools.
-        PriorityQueue<Pairs.IntPair> minHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator());
-        for (int pool : pools) {
-          int numExistingReplicaGroups =
-              poolToReplicaGroupIdsMap.get(pool) != null ? poolToReplicaGroupIdsMap.get(pool).size() : 0;
-          minHeap.add(new Pairs.IntPair(numExistingReplicaGroups, pool));
-        }
-        for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-          if (replicaGroupIdToPoolMap.containsKey(replicaId)) {
-            continue;
-          }
-          // Increment the frequency for a given pool and put it back to the min heap to rotate the pool selection.
-          Pairs.IntPair pair = minHeap.remove();
-          int pool = pair.getRight();
-          pair.setLeft(pair.getLeft() + 1);
-          minHeap.add(pair);
-          poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
-          replicaGroupIdToPoolMap.put(replicaId, pool);
-        }
-      } else {
-        // Current default way to assign pool to replica groups.
-        for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-          // Pick one pool for each replica-group based on the table name hash
-          int pool = pools.get((tableNameHash + replicaId) % numPools);
-          poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
-          replicaGroupIdToPoolMap.put(replicaId, pool);
-        }
-      }
-      LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap,
-          _tableNameWithType);
-
-      int numInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
-      if (numInstancesPerReplicaGroup > 0) {
-        // Check if we have enough instances if number of instances per replica-group is configured
-        for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
-          int pool = entry.getKey();
-          int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size();
-          int numInstancesToSelect = numInstancesPerReplicaGroup * entry.getValue().size();
-          Preconditions.checkState(numInstancesToSelect <= numInstancesInPool,
-              "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstancesInPool,
-              numInstancesToSelect);
-        }
-      } else {
-        // Use as many instances as possible if number of instances per replica-group is not configured
-        numInstancesPerReplicaGroup = Integer.MAX_VALUE;
-        for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
-          int pool = entry.getKey();
-          int numReplicaGroupsInPool = entry.getValue().size();
-          int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size();
-          Preconditions.checkState(numReplicaGroupsInPool <= numInstancesInPool,
-              "Not enough qualified instances from pool: %s, cannot select %s replica-groups from %s instances", pool,
-              numReplicaGroupsInPool, numInstancesInPool);
-          numInstancesPerReplicaGroup =
-              Math.min(numInstancesPerReplicaGroup, numInstancesInPool / numReplicaGroupsInPool);
-        }
+    List<String> instancesToSelect;
+    if (_minimizeDataMovement) {
+      List<String> existingInstances = _existingInstancePartitions.getInstances(0, 0);
+      LinkedHashSet<String> candidateInstances = Sets.newLinkedHashSetWithExpectedSize(instanceConfigs.size());
+      instanceConfigs.forEach(k -> candidateInstances.add(k.getInstanceName()));
+      instancesToSelect =
+          selectInstancesWithMinimumMovement(numInstancesToSelect, candidateInstances, existingInstances);
+      LOGGER.info("Selecting instances: {} for table: {}, existing instances: {}", instancesToSelect,
+          _tableNameWithType, existingInstances);
+    } else {
+      instancesToSelect = new ArrayList<>(numInstancesToSelect);
+      for (int i = 0; i < numInstancesToSelect; i++) {
+        instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
       }
-      LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup,
-          _tableNameWithType);
+      LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType);
+    }
+    // Set the instances as partition 0 replica 0
+    instancePartitions.setInstances(0, 0, instancesToSelect);
+  }
+
+  /**
+   * Selects the instances with minimum movement.
+   * For each instance in the existing instances, if it is still alive, keep it in the same position. Then fill the
+   * vacant positions with the remaining candidate instances.
+   * NOTE: This method will modify the candidate instances.
+   */
+  private static List<String> selectInstancesWithMinimumMovement(int numInstancesToSelect,
+      LinkedHashSet<String> candidateInstances, List<String> existingInstances) {
+    // Initialize the list with empty positions to fill
+    List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
+    for (int i = 0; i < numInstancesToSelect; i++) {
+      instancesToSelect.add(null);
+    }
 
-      // Assign instances within a replica-group to one partition if not configured
-      int numPartitions = _replicaGroupPartitionConfig.getNumPartitions();
-      if (numPartitions <= 0) {
-        numPartitions = 1;
+    // Keep the existing instances that are still alive
+    int numInstancesToCheck = Math.min(numInstancesToSelect, existingInstances.size());
+    for (int i = 0; i < numInstancesToCheck; i++) {
+      String existingInstance = existingInstances.get(i);
+      if (candidateInstances.remove(existingInstance)) {
+        instancesToSelect.set(i, existingInstance);
       }
-      // Assign all instances within a replica-group to each partition if not configured
-      int numInstancesPerPartition = _replicaGroupPartitionConfig.getNumInstancesPerPartition();
-      if (numInstancesPerPartition > 0) {
-        Preconditions.checkState(numInstancesPerPartition <= numInstancesPerReplicaGroup,
-            "Number of instances per partition: %s must be smaller or equal to number of instances per replica-group:"
-                + " %s", numInstancesPerPartition, numInstancesPerReplicaGroup);
-      } else {
-        numInstancesPerPartition = numInstancesPerReplicaGroup;
+    }
+
+    // Fill the vacant positions with the remaining candidate instances
+    Iterator<String> iterator = candidateInstances.iterator();
+    for (int i = 0; i < numInstancesToSelect; i++) {
+      if (instancesToSelect.get(i) == null) {
+        instancesToSelect.set(i, iterator.next());
       }
-      LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}",
-          numPartitions, numInstancesPerPartition, _tableNameWithType);
-
-      if (_minimizeDataMovement && _existingInstancePartitions != null) {
-        // Minimize data movement.
-        int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
-        int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
-        int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);
-
-        existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>();
-        // Step 1: find out the replica groups and their existing instances,
-        //   so that these instances can be filtered out and won't be chosen for the other replica group.
-        for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) {
-          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
-          if (pool == null) {
-            // Skip the replica group if it's no longer needed.
-            continue;
-          }
+    }
 
-          for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
-            List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
-            existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>())
-                .addAll(existingInstances);
-          }
-        }
+    return instancesToSelect;
+  }
 
-        for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) {
-          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
-          // Step 2: filter out instances that belong to other replica groups which should not be the candidate.
-          LinkedHashSet<String> candidateInstances = new LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
-          for (int otherReplicaGroupId = 0;
-              otherReplicaGroupId < existingNumReplicaGroups && otherReplicaGroupId < numReplicaGroups;
-              otherReplicaGroupId++) {
-            if (replicaGroupId != otherReplicaGroupId) {
-              candidateInstances.removeAll(existingReplicaGroupIdToExistingInstancesMap.get(otherReplicaGroupId));
-            }
-          }
-          LinkedHashSet<String> chosenCandidateInstances = new LinkedHashSet<>();
-          for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
-            List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
-            // Step 3: figure out the missing instances and the new instances to fill their vacant positions.
-            List<String> instancesToSelect =
-                getInstancesWithMinimumMovement(numInstancesPerPartition, candidateInstances, existingInstances);
-            chosenCandidateInstances.addAll(instancesToSelect);
-            instancePartitions.setInstances(partitionId, replicaGroupId, instancesToSelect);
-          }
-          // Remove instances that are already been chosen.
-          poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances);
-        }
+  private void replicaGroupBasedSimple(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
+      InstancePartitions instancePartitions, List<Integer> pools, int tableNameHash) {
+    int numPools = pools.size();
+    int numReplicaGroups = getNumReplicaGroups();
 
-        // If the new number of replica groups is greater than the existing number of replica groups.
-        for (int replicaGroupId = existingNumReplicaGroups; replicaGroupId < numReplicaGroups; replicaGroupId++) {
-          int pool = replicaGroupIdToPoolMap.get(replicaGroupId);
-          LinkedHashSet<String> candidateInstances = new LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
-
-          Set<String> chosenCandidateInstances = new HashSet<>();
-          for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
-            List<String> existingInstances = Collections.emptyList();
-            List<String> instancesToSelect =
-                getInstancesWithMinimumMovement(numInstancesPerPartition, candidateInstances, existingInstances);
-            chosenCandidateInstances.addAll(instancesToSelect);
-            instancePartitions.setInstances(partitionId, replicaGroupId, instancesToSelect);
-          }
-          // Remove instances that are already been chosen.
-          poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances);
-        }
-      } else {
-        // Pick instances based on the sorted list of instance names.
-        String[][] replicaGroupIdToInstancesMap = new String[numReplicaGroups][numInstancesPerReplicaGroup];
-        for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
-          List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(entry.getKey());
-          List<Integer> replicaGroupIdsInPool = entry.getValue();
-
-          // Use round-robin to assign instances to each replica-group so that they get instances with similar picking
-          // priority
-          // E.g. (within a pool, 10 instances, 2 replica-groups, 3 instances per replica-group)
-          // [i0, i1, i2, i3, i4, i5, i6, i7, i8, i9]
-          //  r0  r1  r0  r1  r0  r1
-          int instanceIdInPool = 0;
-          for (int instanceIdInReplicaGroup = 0; instanceIdInReplicaGroup < numInstancesPerReplicaGroup;
-              instanceIdInReplicaGroup++) {
-            for (int replicaGroupId : replicaGroupIdsInPool) {
-              replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup] =
-                  instanceConfigsInPool.get(instanceIdInPool++).getInstanceName();
-            }
-          }
+    // Pick one pool for each replica-group based on the table name hash
+    Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
+    int startIndex = Math.abs(tableNameHash % numPools);
+    for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
+      int pool = pools.get((startIndex + replicaGroupId) % numPools);
+      poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaGroupId);
+    }
+    LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap,
+        _tableNameWithType);
+
+    int numInstancesPerReplicaGroup =
+        getNumInstancesPerReplicaGroup(poolToInstanceConfigsMap, poolToReplicaGroupIdsMap);
+    LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup,
+        _tableNameWithType);
+    int numPartitions = getNumPartitions();
+    int numInstancesPerPartition = getNumInstancesPerPartition(numInstancesPerReplicaGroup);
+    LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}",
+        numPartitions, numInstancesPerPartition, _tableNameWithType);
+
+    // Pick instances based on the sorted list of instance names
+    String[][] replicaGroupIdToInstancesMap = new String[numReplicaGroups][numInstancesPerReplicaGroup];
+    for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
+      List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(entry.getKey());
+      List<Integer> replicaGroupIdsInPool = entry.getValue();
+
+      // Use round-robin to assign instances to each replica-group so that they get instances with similar picking
+      // priority
+      // E.g. (within a pool, 10 instances, 2 replica-groups, 3 instances per replica-group)
+      // [i0, i1, i2, i3, i4, i5, i6, i7, i8, i9]
+      //  r0  r1  r0  r1  r0  r1
+      int instanceIdInPool = 0;
+      for (int instanceIdInReplicaGroup = 0; instanceIdInReplicaGroup < numInstancesPerReplicaGroup;
+          instanceIdInReplicaGroup++) {
+        for (int replicaGroupId : replicaGroupIdsInPool) {
+          replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup] =
+              instanceConfigsInPool.get(instanceIdInPool++).getInstanceName();
         }
+      }
+    }
 
-        // Assign consecutive instances within a replica-group to each partition.
-        // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances per partition)
-        // [i0, i1, i2, i3, i4]
-        //  p0  p0  p0  p1  p1
-        //  p1  p2  p2  p2
-        for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
-          int instanceIdInReplicaGroup = 0;
-          for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
-            List<String> instancesInPartition = new ArrayList<>(numInstancesPerPartition);
-            for (int instanceIdInPartition = 0; instanceIdInPartition < numInstancesPerPartition;
-                instanceIdInPartition++) {
-              instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
-              instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup;
-            }
-            LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}",
-                instancesInPartition, replicaGroupId, partitionId, _tableNameWithType);
-            instancePartitions.setInstances(partitionId, replicaGroupId, instancesInPartition);
-          }
+    // Assign consecutive instances within a replica-group to each partition
+    // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances per partition)
+    // [i0, i1, i2, i3, i4]
+    //  p0  p0  p0  p1  p1
+    //  p1  p2  p2  p2
+    for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
+      String[] instancesInReplicaGroup = replicaGroupIdToInstancesMap[replicaGroupId];
+      int instanceIdInReplicaGroup = 0;
+      for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+        List<String> instances = new ArrayList<>(numInstancesPerPartition);
+        for (int i = 0; i < numInstancesPerPartition; i++) {
+          instances.add(instancesInReplicaGroup[instanceIdInReplicaGroup]);
+          instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup;
         }
+        LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}", instances,
+            replicaGroupId, partitionId, _tableNameWithType);
+        instancePartitions.setInstances(partitionId, replicaGroupId, instances);
       }
-    } else {
-      // Non-replica-group based selection
-
-      // Pick one pool based on the table name hash
-      int pool = pools.get(tableNameHash % numPools);
-      LOGGER.info("Selecting pool: {} for table: {}", pool, _tableNameWithType);
-      List<InstanceConfig> instanceConfigs = poolToInstanceConfigsMap.get(pool);
-      int numInstanceConfigs = instanceConfigs.size();
-
-      // Assign all instances if not configured
-      int numInstancesToSelect = _replicaGroupPartitionConfig.getNumInstances();
-      if (numInstancesToSelect > 0) {
-        Preconditions.checkState(numInstancesToSelect <= numInstanceConfigs,
-            "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstanceConfigs,
+    }
+  }
+
+  private int getNumReplicaGroups() {
+    int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups();
+    Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive");
+    return numReplicaGroups;
+  }
+
+  private int getNumInstancesPerReplicaGroup(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
+      Map<Integer, List<Integer>> poolToReplicaGroupIdsMap) {
+    int numInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
+    if (numInstancesPerReplicaGroup > 0) {
+      // Check if we have enough instances if number of instances per replica-group is configured
+      for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
+        int pool = entry.getKey();
+        int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size();
+        int numInstancesToSelect = numInstancesPerReplicaGroup * entry.getValue().size();
+        Preconditions.checkState(numInstancesToSelect <= numInstancesInPool,
+            "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstancesInPool,
             numInstancesToSelect);
-      } else {
-        numInstancesToSelect = numInstanceConfigs;
       }
-
-      List<String> instancesToSelect;
-      if (_minimizeDataMovement && _existingInstancePartitions != null) {
-        // Minimize data movement.
-        List<String> existingInstances = _existingInstancePartitions.getInstances(0, 0);
-        LinkedHashSet<String> candidateInstances = new LinkedHashSet<>();
-        instanceConfigs.forEach(k -> candidateInstances.add(k.getInstanceName()));
-        instancesToSelect =
-            getInstancesWithMinimumMovement(numInstancesToSelect, candidateInstances, existingInstances);
-      } else {
-        // Select instances sequentially.
-        instancesToSelect = new ArrayList<>(numInstancesToSelect);
-        for (int i = 0; i < numInstancesToSelect; i++) {
-          instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
-        }
+    } else {
+      // Use as many instances as possible if number of instances per replica-group is not configured
+      numInstancesPerReplicaGroup = Integer.MAX_VALUE;
+      for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
+        int pool = entry.getKey();
+        int numReplicaGroupsInPool = entry.getValue().size();
+        int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size();
+        Preconditions.checkState(numReplicaGroupsInPool <= numInstancesInPool,
+            "Not enough qualified instances from pool: %s, cannot select %s replica-groups from %s instances", pool,
+            numReplicaGroupsInPool, numInstancesInPool);
+        numInstancesPerReplicaGroup =
+            Math.min(numInstancesPerReplicaGroup, numInstancesInPool / numReplicaGroupsInPool);
       }
-      LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType);
-      // Set the instances as partition 0 replica 0
-      instancePartitions.setInstances(0, 0, instancesToSelect);
     }
+    return numInstancesPerReplicaGroup;
   }
 
-  /**
-   * Select instances with minimum movement.
-   * This algorithm can solve the following scenarios:
-   *    * swap an instance
-   *    * add/remove replica groups
-   *    * increase/decrease number of instances per replica group
-   * TODO: handle the scenarios that selected pools are changed.
-   * TODO: improve the algorithm by doing the following steps:
-   *         1. assign the existing instances for all partitions;
-   *         2. assign the vacant positions based on the partitions already assigned to each instance.
-   * @param numInstancesToSelect number of instances to select
-   * @param candidateInstances candidate instances to be selected
-   * @param existingInstances list of existing instances
-   */
-  private static List<String> getInstancesWithMinimumMovement(int numInstancesToSelect,
-      LinkedHashSet<String> candidateInstances, List<String> existingInstances) {
-    // Initialize the list with empty positions to fill.
-    List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
-    for (int i = 0; i < numInstancesToSelect; i++) {
-      instancesToSelect.add(null);
+  private int getNumPartitions() {
+    // Assign instances within a replica-group to one partition if not configured
+    int numPartitions = _replicaGroupPartitionConfig.getNumPartitions();
+    if (numPartitions <= 0) {
+      numPartitions = 1;
     }
-    Deque<String> newlyAddedInstances = new LinkedList<>();
+    return numPartitions;
+  }
+
+  private int getNumInstancesPerPartition(int numInstancesPerReplicaGroup) {
+    // Assign all instances within a replica-group to each partition if not configured
+    int numInstancesPerPartition = _replicaGroupPartitionConfig.getNumInstancesPerPartition();
+    if (numInstancesPerPartition > 0) {
+      Preconditions.checkState(numInstancesPerPartition <= numInstancesPerReplicaGroup,
+          "Number of instances per partition: %s must be smaller or equal to number of instances per replica-group: %s",
+          numInstancesPerPartition, numInstancesPerReplicaGroup);
+    } else {
+      numInstancesPerPartition = numInstancesPerReplicaGroup;
+    }
+    return numInstancesPerPartition;
+  }
+
+  private void replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
+      InstancePartitions instancePartitions, List<Integer> pools, int tableNameHash) {
+    int numPools = pools.size();
+    int numReplicaGroups = getNumReplicaGroups();
 
-    // Find out the existing instances that are still alive.
-    Set<String> existingInstancesStillAlive = new HashSet<>();
-    for (String existingInstance : existingInstances) {
-      if (candidateInstances.contains(existingInstance)) {
-        existingInstancesStillAlive.add(existingInstance);
+    Map<String, Integer> instanceToPoolMap = new HashMap<>();
+    for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+      int pool = entry.getKey();
+      for (InstanceConfig instanceConfig : entry.getValue()) {
+        instanceToPoolMap.put(instanceConfig.getInstanceName(), pool);
       }
     }
 
-    // Find out the newly added instances.
-    for (String candidateInstance : candidateInstances) {
-      if (!existingInstancesStillAlive.contains(candidateInstance)) {
-        newlyAddedInstances.add(candidateInstance);
+    // Calculate the mapping from pool to replica-groups assigned to the pool
+    List<Set<String>> replicaGroupIdToExistingInstancesMap = new ArrayList<>(numReplicaGroups);
+    Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
+    int maxReplicaGroupsPerPool = (numReplicaGroups + numPools - 1) / numPools;
+    int startIndex = Math.abs(tableNameHash % numPools);
+
+    int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+    int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+    for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
+      // For each replica-group, gather number of existing instances within each pool
+      Set<String> existingInstanceSet = new HashSet<>();
+      replicaGroupIdToExistingInstancesMap.add(existingInstanceSet);
+      Map<Integer, Integer> poolToNumExistingInstancesMap = new TreeMap<>();
+      if (replicaGroupId < existingNumReplicaGroups) {
+        for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
+          List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+          existingInstanceSet.addAll(existingInstances);
+          for (String existingInstance : existingInstances) {
+            Integer existingPool = instanceToPoolMap.get(existingInstance);
+            if (existingPool != null) {
+              poolToNumExistingInstancesMap.merge(existingPool, 1, Integer::sum);
+            }
+          }
+        }
+      }
+      // Sort the pools based on the number of existing instances in the pool in descending order, then use the table
+      // name hash to break even
+      // Triple stores (pool, numExistingInstances, poolIndex) for sorting
+      List<Triple<Integer, Integer, Integer>> triples = new ArrayList<>(numPools);
+      for (int i = 0; i < numPools; i++) {
+        int pool = pools.get((startIndex + replicaGroupId + i) % numPools);
+        triples.add(Triple.of(pool, poolToNumExistingInstancesMap.getOrDefault(pool, 0), i));
+      }
+      triples.sort((o1, o2) -> {
+        int result = Integer.compare(o2.getMiddle(), o1.getMiddle());
+        return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight());
+      });
+      for (Triple<Integer, Integer, Integer> triple : triples) {
+        int pool = triple.getLeft();
+        List<Integer> replicaGroupIds = poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>());
+        if (replicaGroupIds.size() < maxReplicaGroupsPerPool) {
+          replicaGroupIds.add(replicaGroupId);
+          break;
+        }
       }
     }
+    LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap,
+        _tableNameWithType);
 
-    int numExistingInstances = existingInstances.size();
-    for (int i = 0; i < numInstancesToSelect; i++) {
-      String existingInstance = i < numExistingInstances ? existingInstances.get(i) : null;
-      String selectedInstance;
-      if (existingInstance != null && candidateInstances.contains(existingInstance)) {
-        selectedInstance = existingInstance;
-        existingInstancesStillAlive.remove(selectedInstance);
-      } else {
-        selectedInstance = newlyAddedInstances.poll();
+    int numInstancesPerReplicaGroup =
+        getNumInstancesPerReplicaGroup(poolToInstanceConfigsMap, poolToReplicaGroupIdsMap);
+    LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup,
+        _tableNameWithType);
+    int numPartitions = getNumPartitions();
+    int numInstancesPerPartition = getNumInstancesPerPartition(numInstancesPerReplicaGroup);
+    LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}",
+        numPartitions, numInstancesPerPartition, _tableNameWithType);
+
+    List<List<String>> replicaGroupIdToInstancesMap = new ArrayList<>(numReplicaGroups);
+    for (int i = 0; i < numReplicaGroups; i++) {
+      replicaGroupIdToInstancesMap.add(new ArrayList<>(numInstancesPerReplicaGroup));
+    }
+    for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
+      // For each pool, keep the existing instances that are still alive within each replica-group
+      int pool = entry.getKey();
+      List<Integer> replicaGroupIds = entry.getValue();
+      List<String> newInstances = new ArrayList<>();
+      for (InstanceConfig instanceConfig : poolToInstanceConfigsMap.get(pool)) {
+        String instanceName = instanceConfig.getInstanceName();
+        boolean isExistingInstance = false;
+        for (int replicaGroupId : replicaGroupIds) {
+          List<String> instances = replicaGroupIdToInstancesMap.get(replicaGroupId);
+          if (instances.size() == numInstancesPerReplicaGroup) {
+            continue;
+          }
+          if (replicaGroupIdToExistingInstancesMap.get(replicaGroupId).contains(instanceName)) {
+            instances.add(instanceName);
+            isExistingInstance = true;
+            break;
+          }
+        }
+        if (!isExistingInstance) {
+          newInstances.add(instanceName);
+        }
       }
-      instancesToSelect.set(i, selectedInstance);
-      // If it's an existing alive instance, or it's for a new replica group, add the new instance to the tail,
-      // so that it won't be firstly chosen for the next partition.
-      // For newly added instances to fill the existing replica group, the sequence cannot change;
-      // otherwise there is no guarantee that same vacant position will be filled with the same new instance.
-      // The 'selectedInstance' object can still be null if there is no new instances from the candidate list.
-      if (selectedInstance != null && (i < numExistingInstances || existingInstances.isEmpty())) {
-        candidateInstances.remove(selectedInstance);
-        candidateInstances.add(selectedInstance);
+      // Fill the vacant positions with the new instances. First fill the replica groups with the least instances, then
+      // use round-robin to assign instances to each replica-group so that they get instances with similar picking
+      // priority.
+      int numInstancesToFill = numInstancesPerReplicaGroup * replicaGroupIds.size();
+      for (int replicaGroupId : replicaGroupIds) {
+        numInstancesToFill -= replicaGroupIdToInstancesMap.get(replicaGroupId).size();
+      }
+      for (int i = 0; i < numInstancesToFill; i++) {
+        int leastNumInstances = Integer.MAX_VALUE;
+        int replicaGroupIdWithLeastInstances = -1;
+        for (int replicaGroupId : replicaGroupIds) {
+          int numInstances = replicaGroupIdToInstancesMap.get(replicaGroupId).size();
+          if (numInstances < leastNumInstances) {
+            leastNumInstances = numInstances;
+            replicaGroupIdWithLeastInstances = replicaGroupId;
+          }
+        }
+        replicaGroupIdToInstancesMap.get(replicaGroupIdWithLeastInstances).add(newInstances.get(i));
       }
     }
 
-    // If there are still some vacant positions in the instance list,
-    // try to fill with instances which are either left over or newly added.
-    for (int i = 0; i < instancesToSelect.size(); i++) {
-      if (instancesToSelect.get(i) == null) {
-        if (!existingInstancesStillAlive.isEmpty()) {
-          Iterator<String> iterator = existingInstancesStillAlive.iterator();
-          String existingInstanceLeftOver = iterator.next();
-          instancesToSelect.set(i, existingInstanceLeftOver);
-          iterator.remove();
-        } else if (!newlyAddedInstances.isEmpty()) {
-          // pick a new instance to fill its vacant position.
-          String newInstance = newlyAddedInstances.pollFirst();
-          instancesToSelect.set(i, newInstance);
+    if (numPartitions == 1) {
+      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
+        List<String> instancesInReplicaGroup = replicaGroupIdToInstancesMap.get(replicaGroupId);
+        if (replicaGroupId < existingNumReplicaGroups) {
+          List<String> existingInstances = _existingInstancePartitions.getInstances(0, replicaGroupId);
+          LinkedHashSet<String> candidateInstances = new LinkedHashSet<>(instancesInReplicaGroup);
+          List<String> instances =
+              selectInstancesWithMinimumMovement(numInstancesPerReplicaGroup, candidateInstances, existingInstances);
+          LOGGER.info(
+              "Selecting instances: {} for replica-group: {}, partition: 0 for table: {}, existing instances: {}",
+              instances, replicaGroupId, _tableNameWithType, existingInstances);
+          instancePartitions.setInstances(0, replicaGroupId, instances);
+        } else {
+          LOGGER.info("Selecting instances: {} for replica-group: {}, partition: 0 for table: {}, "
+              + "there is no existing instances", instancesInReplicaGroup, replicaGroupId, _tableNameWithType);
+          instancePartitions.setInstances(0, replicaGroupId, instancesInReplicaGroup);
+        }
+      }
+    } else {
+      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
+        List<String> instancesInReplicaGroup = replicaGroupIdToInstancesMap.get(replicaGroupId);
+        if (replicaGroupId < existingNumReplicaGroups) {
+          int maxNumPartitionsPerInstance = (numInstancesPerReplicaGroup + numPartitions - 1) / numPartitions;
+          Map<String, Integer> instanceToNumPartitionsMap =
+              Maps.newHashMapWithExpectedSize(numInstancesPerReplicaGroup);
+          for (String instance : instancesInReplicaGroup) {
+            instanceToNumPartitionsMap.put(instance, 0);
+          }
+
+          List<List<String>> partitionIdToInstancesMap = new ArrayList<>(numPartitions);
+          List<Set<String>> partitionIdToInstanceSetMap = new ArrayList<>(numPartitions);
+          List<List<String>> partitionIdToExistingInstancesMap = new ArrayList<>(existingNumPartitions);
+          for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+            // Initialize the list with empty positions to fill
+            List<String> instances = new ArrayList<>(numInstancesPerPartition);
+            for (int i = 0; i < numInstancesPerPartition; i++) {
+              instances.add(null);
+            }
+            partitionIdToInstancesMap.add(instances);
+            Set<String> instanceSet = Sets.newHashSetWithExpectedSize(numInstancesPerPartition);
+            partitionIdToInstanceSetMap.add(instanceSet);
+
+            // Keep the existing instances that are still alive
+            if (partitionId < existingNumPartitions) {
+              List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              partitionIdToExistingInstancesMap.add(existingInstances);
+              int numInstancesToCheck = Math.min(numInstancesPerPartition, existingInstances.size());
+              for (int i = 0; i < numInstancesToCheck; i++) {
+                String existingInstance = existingInstances.get(i);
+                Integer numPartitionsOnInstance = instanceToNumPartitionsMap.get(existingInstance);
+                if (numPartitionsOnInstance != null && numPartitionsOnInstance < maxNumPartitionsPerInstance) {
+                  instances.set(i, existingInstance);
+                  instanceSet.add(existingInstance);
+                  instanceToNumPartitionsMap.put(existingInstance, numPartitionsOnInstance + 1);
+                }
+              }
+            }
+          }
+
+          // Fill the vacant positions with instance that serves the least partitions
+          for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+            List<String> instances = partitionIdToInstancesMap.get(partitionId);
+            Set<String> instanceSet = partitionIdToInstanceSetMap.get(partitionId);
+            int numInstancesToFill = numInstancesPerPartition - instanceSet.size();
+            if (numInstancesToFill > 0) {
+              // Triple stores (instance, numPartitionsOnInstance, instanceIndex) for sorting
+              List<Triple<String, Integer, Integer>> triples = new ArrayList<>(numInstancesPerReplicaGroup);
+              for (int i = 0; i < numInstancesPerReplicaGroup; i++) {
+                String instance = instancesInReplicaGroup.get(i);
+                if (!instanceSet.contains(instance)) {
+                  triples.add(Triple.of(instance, instanceToNumPartitionsMap.get(instance), i));
+                }
+              }
+              triples.sort((o1, o2) -> {
+                int result = Integer.compare(o1.getMiddle(), o2.getMiddle());
+                return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight());
+              });
+              int instanceIdToFill = 0;
+              for (int i = 0; i < numInstancesPerPartition; i++) {
+                if (instances.get(i) == null) {
+                  String instance = triples.get(instanceIdToFill++).getLeft();
+                  instances.set(i, instance);
+                  instanceToNumPartitionsMap.put(instance, instanceToNumPartitionsMap.get(instance) + 1);
+                }
+              }
+            }
+
+            if (partitionId < existingNumPartitions) {
+              LOGGER.info(
+                  "Selecting instances: {} for replica-group: {}, partition: {} for table: {}, existing instances: {}",
+                  instances, replicaGroupId, partitionId, _tableNameWithType,
+                  partitionIdToExistingInstancesMap.get(partitionId));
+            } else {
+              LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}, "
+                  + "there is no existing instances", instances, replicaGroupId, partitionId, _tableNameWithType);
+            }
+            instancePartitions.setInstances(partitionId, replicaGroupId, instances);
+          }
+        } else {
+          // Assign consecutive instances within a replica-group to each partition
+          int instanceIdInReplicaGroup = 0;
+          for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+            List<String> instances = new ArrayList<>(numInstancesPerPartition);
+            for (int i = 0; i < numInstancesPerPartition; i++) {
+              instances.add(instancesInReplicaGroup.get(instanceIdInReplicaGroup));
+              instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup;
+            }
+            LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}, "
+                + "there is no existing instances", instances, replicaGroupId, partitionId, _tableNameWithType);
+            instancePartitions.setInstances(partitionId, replicaGroupId, instances);
+          }
         }
       }
     }
-    return instancesToSelect;
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 940968432b..28d58bbbcd 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -22,18 +22,17 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.TreeMap;
 import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
-import org.apache.pinot.spi.utils.Pairs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,16 +45,14 @@ public class InstanceTagPoolSelector {
 
   private final InstanceTagPoolConfig _tagPoolConfig;
   private final String _tableNameWithType;
-
   private final boolean _minimizeDataMovement;
-
   private final InstancePartitions _existingInstancePartitions;
 
   public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType,
       boolean minimizeDataMovement, @Nullable InstancePartitions existingInstancePartitions) {
     _tagPoolConfig = tagPoolConfig;
     _tableNameWithType = tableNameWithType;
-    _minimizeDataMovement = minimizeDataMovement;
+    _minimizeDataMovement = minimizeDataMovement && existingInstancePartitions != null;
     _existingInstancePartitions = existingInstancePartitions;
   }
 
@@ -104,7 +101,7 @@ public class InstanceTagPoolSelector {
       // Calculate the pools to select based on the selection config
       Set<Integer> pools = poolToInstanceConfigsMap.keySet();
       List<Integer> poolsToSelect = _tagPoolConfig.getPools();
-      if (poolsToSelect != null && !poolsToSelect.isEmpty()) {
+      if (!CollectionUtils.isEmpty(poolsToSelect)) {
         Preconditions.checkState(pools.containsAll(poolsToSelect), "Cannot find all instance pools configured: %s",
             poolsToSelect);
       } else {
@@ -123,45 +120,44 @@ public class InstanceTagPoolSelector {
           return poolToInstanceConfigsMap;
         }
 
+        // Select pools based on the table name hash to evenly distribute the tables
+        List<Integer> poolsInCluster = new ArrayList<>(pools);
+        int startIndex = Math.abs(tableNameHash % numPools);
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        if (_minimizeDataMovement && _existingInstancePartitions != null) {
-          Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>();
-          // Keep the same pool if it's already been used for the table.
+        if (_minimizeDataMovement) {
+          assert _existingInstancePartitions != null;
+          Map<Integer, Integer> poolToNumExistingInstancesMap = new TreeMap<>();
           int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
           int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
-          for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
-            for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
+          for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
+            for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
               List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
               for (String existingInstance : existingInstances) {
                 Integer existingPool = instanceToPoolMap.get(existingInstance);
                 if (existingPool != null) {
-                  if (!existingPoolsToExistingInstancesMap.containsKey(existingPool)) {
-                    existingPoolsToExistingInstancesMap.put(existingPool, new HashSet<>());
-                  }
-                  existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>())
-                      .add(existingInstance);
+                  poolToNumExistingInstancesMap.merge(existingPool, 1, Integer::sum);
                 }
               }
             }
           }
-
-          // Use a max heap to track the number of servers used for all the pools.
-          PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false));
-          for (int pool : pools) {
-            maxHeap.add(new Pairs.IntPair(existingPoolsToExistingInstancesMap.get(pool).size(), pool));
+          // Sort the pools based on the number of existing instances in the pool in descending order, then use the
+          // table name hash to break even
+          // Triple stores (pool, numExistingInstances, poolIndex) for sorting
+          List<Triple<Integer, Integer, Integer>> triples = new ArrayList<>(numPools);
+          for (int i = 0; i < numPools; i++) {
+            int pool = poolsInCluster.get((startIndex + i) % numPools);
+            triples.add(Triple.of(pool, poolToNumExistingInstancesMap.getOrDefault(pool, 0), i));
           }
-
-          // Pick the pools from the max heap, so that data movement be minimized.
+          triples.sort((o1, o2) -> {
+            int result = Integer.compare(o2.getMiddle(), o1.getMiddle());
+            return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight());
+          });
           for (int i = 0; i < numPoolsToSelect; i++) {
-            Pairs.IntPair pair = maxHeap.remove();
-            poolsToSelect.add(pair.getRight());
+            poolsToSelect.add(triples.get(i).getLeft());
           }
-          LOGGER.info("The selected pools: " + poolsToSelect);
         } else {
-          // Select pools based on the table name hash to evenly distribute the tables
-          List<Integer> poolsInCluster = new ArrayList<>(pools);
           for (int i = 0; i < numPoolsToSelect; i++) {
-            poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+            poolsToSelect.add(poolsInCluster.get((startIndex + i) % numPools));
           }
         }
       }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index a6220c00a2..113d4e1649 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -50,7 +50,9 @@ import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.fail;
 
 
 public class InstanceAssignmentTest {
@@ -198,8 +200,8 @@ public class InstanceAssignmentTest {
     // r0: [i8, i1, i4]
     //      p0, p0, p1
     //      p1
-    // r1: [i9, i10, i5]
-    //      p0, p0, p1
+    // r1: [i9, i5, i10]
+    //      p0, p1, p0
     //      p1
     // r2: [i0, i3, i11]
     //      p0, p0, p1
@@ -217,7 +219,7 @@ public class InstanceAssignmentTest {
     assertEquals(instancePartitions.getInstances(1, 2),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 0));
 
-    // Add 2 more instances to the ZK and increase the number of instances per replica group from 2 to 3.
+    // Add 2 more instances to the ZK and increase the number of instances per partition from 2 to 3.
     for (int i = numInstances + 2; i < numInstances + 4; i++) {
       InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
       instanceConfig.addTag(OFFLINE_TAG);
@@ -233,34 +235,29 @@ public class InstanceAssignmentTest {
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 12 = 2
     // [i10, i11, i12, i13, i3, i4, i5, i11, i7, i8, i9, i0, i1]
-    // For r0, the candidate instances are [i12, i13, i4, i7, i8, i1].
-    //   For p0, since the existing assignment is [i8, i1], the next available instance from the candidates is i12.
-    //   For p1, the existing assignment is [i4, i8], the next available instance is also i12.
-    // r0: [i12, i4, i8, i1]
-    // For r1, the candidate instances become [i10, i13, i5, i7, i9].
-    //   For p0, since the existing assignment is [i9, i10], the next available instance is i13 (new instance).
-    //   For p1, the existing assignment is [i5, i9], the next available one from the candidates is i10, but since
-    //   i10 is already used in the former partition, it got added to the tail, so the next available one is i13.
-    // r1: [i10, i13, i5, i9]
-    // For r2, the candidate instances become [i11, i3, i7, i0].
-    //   For p0, the existing assignment is [i0, i3], the next available instance from the candidates is i11.
-    //   For p1, the existing assignment is [i11, i0], the next available instance from the candidates is i3, but
-    //   since i3 is already used in the former partition, it got appended to the tail, so the next available one is i7.
-    // r2: [i11, i3, i7, i0]
+    // r0: [i8, i1, i4, i12]
+    //      p0, p0, p1, p0
+    //      p1, p1
+    // r1: [i9, i5, i10, i13]
+    //      p0, p1, p0,  p0
+    //      p1,     p1
+    // r2: [i0, i3, i11, i7]
+    //      p0, p0, p1,  p0
+    //      p1, p1
     assertEquals(instancePartitions.getInstances(0, 0),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 12));
     assertEquals(instancePartitions.getInstances(1, 0),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 12));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 1));
     assertEquals(instancePartitions.getInstances(0, 1),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13));
     assertEquals(instancePartitions.getInstances(1, 1),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 13));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 10));
     assertEquals(instancePartitions.getInstances(0, 2),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 11));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 7));
     assertEquals(instancePartitions.getInstances(1, 2),
-        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 7));
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
 
-    // Reduce the number of instances per replica group from 3 to 2.
+    // Reduce the number of instances per partition from 3 to 2.
     numInstancesPerPartition = 2;
     tableConfig.getValidationConfig()
         .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerPartition));
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
index fdb6292f26..288b789aee 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
@@ -31,36 +31,66 @@ import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+
 
 public class InstanceReplicaGroupPartitionSelectorTest {
 
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
+  //@formatter:off
   private static final String INSTANCE_CONFIG_TEMPLATE =
-      "{\n" + "  \"id\": \"Server_pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
-          + "  \"simpleFields\": {\n" + "    \"HELIX_ENABLED\": \"true\",\n"
-          + "    \"HELIX_ENABLED_TIMESTAMP\": \"1688959934305\",\n"
-          + "    \"HELIX_HOST\": \"pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local\",\n"
-          + "    \"HELIX_PORT\": \"8098\",\n" + "    \"adminPort\": \"8097\",\n" + "    \"grpcPort\": \"8090\",\n"
-          + "    \"queryMailboxPort\": \"46347\",\n" + "    \"queryServerPort\": \"45031\",\n"
-          + "    \"shutdownInProgress\": \"false\"\n" + "  },\n" + "  \"mapFields\": {\n"
-          + "    \"SYSTEM_RESOURCE_INFO\": {\n" + "      \"numCores\": \"16\",\n"
-          + "      \"totalMemoryMB\": \"126976\",\n" + "      \"maxHeapSizeMB\": \"65536\"\n" + "    },\n"
-          + "    \"pool\": {\n" + "      \"DefaultTenant_OFFLINE\": \"${pool}\",\n"
-          + "      \"${poolName}\": \"${pool}\",\n" + "      \"AllReplicationGroups\": \"1\"\n" + "    }\n" + "  },\n"
-          + "  \"listFields\": {\n" + "    \"TAG_LIST\": [\n" + "      \"DefaultTenant_OFFLINE\",\n"
-          + "      \"DefaultTenant_REALTIME\",\n" + "      \"${poolName}\",\n" + "      \"AllReplicationGroups\"\n"
-          + "    ]\n" + "  }\n" + "}";
+      "{\n"
+    + "  \"id\": \"Server_pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+    + "  \"simpleFields\": {\n"
+    + "    \"HELIX_ENABLED\": \"true\",\n"
+    + "    \"HELIX_ENABLED_TIMESTAMP\": \"1688959934305\",\n"
+    + "    \"HELIX_HOST\": \"pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local\",\n"
+    + "    \"HELIX_PORT\": \"8098\",\n"
+    + "    \"adminPort\": \"8097\",\n"
+    + "    \"grpcPort\": \"8090\",\n"
+    + "    \"queryMailboxPort\": \"46347\",\n"
+    + "    \"queryServerPort\": \"45031\",\n"
+    + "    \"shutdownInProgress\": \"false\"\n"
+    + "  },\n"
+    + "  \"mapFields\": {\n"
+    + "    \"SYSTEM_RESOURCE_INFO\": {\n"
+    + "      \"numCores\": \"16\",\n"
+    + "      \"totalMemoryMB\": \"126976\",\n"
+    + "      \"maxHeapSizeMB\": \"65536\"\n"
+    + "    },\n"
+    + "    \"pool\": {\n"
+    + "      \"DefaultTenant_OFFLINE\": \"${pool}\",\n"
+    + "      \"${poolName}\": \"${pool}\",\n"
+    + "      \"AllReplicationGroups\": \"1\"\n"
+    + "    }\n"
+    + "  },\n"
+    + "  \"listFields\": {\n"
+    + "    \"TAG_LIST\": [\n"
+    + "      \"DefaultTenant_OFFLINE\",\n"
+    + "      \"DefaultTenant_REALTIME\",\n"
+    + "      \"${poolName}\",\n"
+    + "      \"AllReplicationGroups\"\n"
+    + "    ]\n"
+    + "  }\n"
+    + "}";
+  //@formatter:on
 
   @Test
   public void testPoolsWhenOneMorePoolAddedAndOneMoreReplicaGroupsNeeded()
       throws JsonProcessingException {
+    //@formatter:off
     String existingPartitionsJson =
-        "    {\n" + "      \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
-            + "      \"partitionToInstancesMap\": {\n" + "        \"0_0\": [\n"
-            + "          \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
-            + "          \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
-            + "        ]\n" + "      }\n" + "    }\n";
+        "{\n"
+      + "  \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+      + "  \"partitionToInstancesMap\": {\n"
+      + "    \"0_0\": [\n"
+      + "      \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+      + "      \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+      + "    ]\n"
+      + "  }\n"
+      + "}";
+    //@formatter:on
     InstancePartitions existing = OBJECT_MAPPER.readValue(existingPartitionsJson, InstancePartitions.class);
     InstanceReplicaGroupPartitionConfig config =
         new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null);
@@ -94,33 +124,47 @@ public class InstanceReplicaGroupPartitionSelectorTest {
 
     // Now that 1 more pool is added and 1 more RG is needed, a new set called "0_1" is generated,
     // and the instances from Pool 1 are assigned to this new replica.
+    //@formatter:off
     String expectedInstancePartitions =
-        "    {\n" + "      \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
-            + "      \"partitionToInstancesMap\": {\n" + "        \"0_0\": [\n"
-            + "          \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
-            + "          \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
-            + "        ],\n" + "        \"0_1\": [\n"
-            + "          \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
-            + "          \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
-            + "        ]\n" + "      }\n" + "  }\n";
+        "{\n"
+      + "  \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+      + "  \"partitionToInstancesMap\": {\n"
+      + "    \"0_0\": [\n"
+      + "      \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+      + "      \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+      + "    ],\n"
+      + "    \"0_1\": [\n"
+      + "      \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+      + "      \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+      + "    ]\n"
+      + "  }\n"
+      + "}";
+    //@formatter:on
     InstancePartitions expectedPartitions =
         OBJECT_MAPPER.readValue(expectedInstancePartitions, InstancePartitions.class);
-    assert assignedPartitions.equals(expectedPartitions);
+    assertEquals(assignedPartitions, expectedPartitions);
   }
 
   @Test
   public void testSelectPoolsWhenExistingReplicaGroupMapsToMultiplePools()
       throws JsonProcessingException {
     // The "rg0-2" instance used to belong to Pool 1, but now it belongs to Pool 0.
+    //@formatter:off
     String existingPartitionsJson =
-        "    {\n" + "      \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
-            + "      \"partitionToInstancesMap\": {\n" + "        \"0_0\": [\n"
-            + "          \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
-            + "          \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
-            + "        ],\n" + "        \"0_1\": [\n"
-            + "          \"Server_pinot-server-rg0-2.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
-            + "          \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
-            + "        ]\n" + "      }\n" + "  }\n";
+        "{\n"
+      + "  \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+      + "  \"partitionToInstancesMap\": {\n"
+      + "    \"0_0\": [\n"
+      + "      \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+      + "      \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+      + "    ],\n"
+      + "    \"0_1\": [\n"
+      + "      \"Server_pinot-server-rg0-2.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+      + "      \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+      + "    ]\n"
+      + "  }\n"
+      + "}";
+    //@formatter:on
     InstancePartitions existing = OBJECT_MAPPER.readValue(existingPartitionsJson, InstancePartitions.class);
     InstanceReplicaGroupPartitionConfig config =
         new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null);
@@ -150,17 +194,24 @@ public class InstanceReplicaGroupPartitionSelectorTest {
 
     // The "rg0-2" instance is replaced by "rg1-0" (which belongs to Pool 1), as "rg0-2" no longer belongs to Pool 1.
     // And "rg1-0" remains the same position as it's always under Pool 1.
+    //@formatter:off
     String expectedInstancePartitions =
-        "    {\n" + "      \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
-            + "      \"partitionToInstancesMap\": {\n" + "        \"0_0\": [\n"
-            + "          \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
-            + "          \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
-            + "        ],\n" + "        \"0_1\": [\n"
-            + "          \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
-            + "          \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
-            + "        ]\n" + "      }\n" + "  }\n";
+        "{\n"
+      + "  \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+      + "  \"partitionToInstancesMap\": {\n"
+      + "    \"0_0\": [\n"
+      + "      \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+      + "      \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+      + "    ],\n"
+      + "    \"0_1\": [\n"
+      + "      \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+      + "      \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+      + "    ]\n"
+      + "  }\n"
+      + "}";
+    //@formatter:on
     InstancePartitions expectedPartitions =
         OBJECT_MAPPER.readValue(expectedInstancePartitions, InstancePartitions.class);
-    assert assignedPartitions.equals(expectedPartitions);
+    assertEquals(assignedPartitions, expectedPartitions);
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java
index 45645387af..be18d35e50 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java
@@ -30,11 +30,7 @@ public class Pairs {
   }
 
   public static Comparator<IntPair> intPairComparator() {
-    return new AscendingIntPairComparator(true);
-  }
-
-  public static Comparator<IntPair> intPairComparator(boolean ascending) {
-    return new AscendingIntPairComparator(ascending);
+    return new AscendingIntPairComparator();
   }
 
   public static class IntPair {
@@ -83,26 +79,13 @@ public class Pairs {
   }
 
   public static class AscendingIntPairComparator implements Comparator<IntPair> {
-    private boolean _ascending;
-
-    public AscendingIntPairComparator(boolean ascending) {
-      _ascending = ascending;
-    }
 
     @Override
     public int compare(IntPair pair1, IntPair pair2) {
       if (pair1._left != pair2._left) {
-        if (_ascending) {
-          return Integer.compare(pair1._left, pair2._left);
-        } else {
-          return Integer.compare(pair2._left, pair1._left);
-        }
+        return Integer.compare(pair1._left, pair2._left);
       } else {
-        if (_ascending) {
-          return Integer.compare(pair1._right, pair2._right);
-        } else {
-          return Integer.compare(pair2._right, pair1._right);
-        }
+        return Integer.compare(pair1._right, pair2._right);
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org