You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/01 07:17:14 UTC

(pinot) 01/05: Enhance the minimizeDataMovement to keep the existing pool assignment

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch maintain-pool-selection-for-minimizeDataMovement
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 9550545be3ea35b2874d79a62d396806c0f0f23f
Author: jlli_LinkedIn <jl...@linkedin.com>
AuthorDate: Sun Nov 5 10:55:11 2023 -0800

    Enhance the minimizeDataMovement to keep the existing pool assignment
---
 .../assignment/InstanceAssignmentConfigUtils.java  |   2 +-
 .../common/utils/config/TableConfigSerDeTest.java  |   2 +-
 .../instance/FDAwareInstancePartitionSelector.java |   6 +-
 .../instance/InstanceAssignmentDriver.java         |  10 +-
 .../instance/InstancePartitionSelector.java        |   4 +-
 .../instance/InstancePartitionSelectorFactory.java |  18 +-
 .../InstanceReplicaGroupPartitionSelector.java     |  78 +++++++--
 .../instance/InstanceTagPoolSelector.java          |  63 ++++++-
 .../MirrorServerSetInstancePartitionSelector.java  |   4 +-
 ...anceAssignmentRestletResourceStatelessTest.java |   6 +-
 .../instance/InstanceAssignmentTest.java           | 193 ++++++++++++++++-----
 .../InstanceReplicaGroupPartitionSelectorTest.java |   2 +-
 .../TableRebalancerClusterStatelessTest.java       |   4 +-
 .../table/assignment/InstanceAssignmentConfig.java |  16 +-
 .../InstanceReplicaGroupPartitionConfig.java       |   2 +
 15 files changed, 311 insertions(+), 99 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
index ebf38d308f..13cc270954 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
@@ -122,7 +122,7 @@ public class InstanceAssignmentConfigUtils {
           replicaGroupStrategyConfig.getNumInstancesPerPartition(), 0, 0, minimizeDataMovement, null);
     }
 
-    return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig);
+    return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, minimizeDataMovement);
   }
 
   public static boolean isMirrorServerSetAssignment(TableConfig tableConfig,
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index ed9d605af0..74f857a102 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -212,7 +212,7 @@ public class TableConfigSerDeTest {
       InstanceAssignmentConfig instanceAssignmentConfig =
           new InstanceAssignmentConfig(new InstanceTagPoolConfig("tenant_OFFLINE", true, 3, null),
               new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")),
-              new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false, null));
+              new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false, null), null, false);
       TableConfig tableConfig = tableConfigBuilder.setInstanceAssignmentConfigMap(
           Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig)).build();
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
index 294971615a..de96d4da4d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
@@ -50,8 +50,8 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector
   private static final Logger LOGGER = LoggerFactory.getLogger(FDAwareInstancePartitionSelector.class);
 
   public FDAwareInstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
-      String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions) {
-    super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions);
+      String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
+    super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement);
   }
 
   /**
@@ -152,7 +152,7 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector
        * initialize the new replicaGroupBasedAssignmentState for assignment,
        * place existing instances in their corresponding positions
        */
-      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) {
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
         int numExistingReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
         int numExistingPartitions = _existingInstancePartitions.getNumPartitions();
         /*
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index 6d869b86c1..09866c1ed7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -64,8 +64,8 @@ public class InstanceAssignmentDriver {
   }
 
   public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType,
-      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, @Nullable
-      InstancePartitions preConfiguredInstancePartitions) {
+      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions,
+      @Nullable InstancePartitions preConfiguredInstancePartitions) {
     String tableNameWithType = _tableConfig.getTableName();
     InstanceAssignmentConfig assignmentConfig =
         InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType);
@@ -88,8 +88,10 @@ public class InstanceAssignmentDriver {
     String tableNameWithType = _tableConfig.getTableName();
     LOGGER.info("Starting {} instance assignment for table {}", instancePartitionsName, tableNameWithType);
 
+    boolean minimizeDataMovement = instanceAssignmentConfig.isMinimizeDataMovement();
     InstanceTagPoolSelector tagPoolSelector =
-        new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType);
+        new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType,
+            minimizeDataMovement, existingInstancePartitions);
     Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = tagPoolSelector.selectInstances(instanceConfigs);
 
     InstanceConstraintConfig constraintConfig = instanceAssignmentConfig.getConstraintConfig();
@@ -106,7 +108,7 @@ public class InstanceAssignmentDriver {
     InstancePartitionSelector instancePartitionSelector =
         InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(),
             instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions,
-            preConfiguredInstancePartitions);
+            preConfiguredInstancePartitions, minimizeDataMovement);
     InstancePartitions instancePartitions = new InstancePartitions(instancePartitionsName);
     instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions);
     return instancePartitions;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
index 396b869924..5f92db2426 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
@@ -29,12 +29,14 @@ abstract class InstancePartitionSelector {
   protected final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig;
   protected final String _tableNameWithType;
   protected final InstancePartitions _existingInstancePartitions;
+  protected final boolean _minimizeDataMovement;
 
   public InstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
-      String tableNameWithType, InstancePartitions existingInstancePartitions) {
+      String tableNameWithType, InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
     _replicaGroupPartitionConfig = replicaGroupPartitionConfig;
     _tableNameWithType = tableNameWithType;
     _existingInstancePartitions = existingInstancePartitions;
+    _minimizeDataMovement = minimizeDataMovement;
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
index 256aa89b02..8a343b1598 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller.helix.core.assignment.instance;
 
 import java.util.Arrays;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
@@ -31,25 +32,18 @@ public class InstancePartitionSelectorFactory {
 
   public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
       InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType,
-      InstancePartitions existingInstancePartitions) {
-    return getInstance(partitionSelector, instanceReplicaGroupPartitionConfig, tableNameWithType,
-        existingInstancePartitions, null);
-  }
-
-  public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
-      InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType,
-      InstancePartitions existingInstancePartitions, InstancePartitions preConfiguredInstancePartitions
-  ) {
+      InstancePartitions existingInstancePartitions, @Nullable InstancePartitions preConfiguredInstancePartitions,
+      boolean minimizeDataMovement) {
     switch (partitionSelector) {
       case FD_AWARE_INSTANCE_PARTITION_SELECTOR:
         return new FDAwareInstancePartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
-            existingInstancePartitions);
+            existingInstancePartitions, minimizeDataMovement);
       case INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR:
         return new InstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
-            existingInstancePartitions);
+            existingInstancePartitions, minimizeDataMovement);
       case MIRROR_SERVER_SET_PARTITION_SELECTOR:
         return new MirrorServerSetInstancePartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
-            existingInstancePartitions, preConfiguredInstancePartitions);
+            existingInstancePartitions, preConfiguredInstancePartitions, minimizeDataMovement);
       default:
         throw new IllegalStateException("Unexpected PartitionSelector: " + partitionSelector + ", should be from"
             + Arrays.toString(InstanceAssignmentConfig.PartitionSelector.values()));
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index de1e681d17..79e95db7a6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -22,18 +22,21 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.TreeMap;
 import javax.annotation.Nullable;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.apache.pinot.spi.utils.Pairs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,8 +49,8 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
   private static final Logger LOGGER = LoggerFactory.getLogger(InstanceReplicaGroupPartitionSelector.class);
 
   public InstanceReplicaGroupPartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
-      String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions) {
-    super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions);
+      String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
+    super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement);
   }
 
   /**
@@ -73,16 +76,65 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
       Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
-      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-        // Pick one pool for each replica-group based on the table name hash
-        int pool = pools.get((tableNameHash + replicaId) % numPools);
-        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
-        replicaGroupIdToPoolMap.put(replicaId, pool);
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+        Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+        for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+          String instanceName = instanceConfig.getInstanceName();
+          candidateInstances.add(instanceName);
+          instanceToPoolMap.put(instanceName, pool);
+        }
+      }
 
-        Set<String> candidateInstances =
-            poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
-        List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool);
-        instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName()));
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
+        // Keep the same pool for the replica group if it's already been used for the table.
+        int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+        int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);
+        for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) {
+          boolean foundExistingReplicaGroup = false;
+          for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingReplicaGroup; partitionId++) {
+            List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            for (String existingInstance : existingInstances) {
+              Integer existingPool = instanceToPoolMap.get(existingInstance);
+              if (existingPool != null & pools.contains(existingPool)) {
+                poolToReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new ArrayList<>()).add(replicaGroupId);
+                replicaGroupIdToPoolMap.put(replicaGroupId, existingPool);
+                foundExistingReplicaGroup = true;
+                break;
+              }
+            }
+          }
+        }
+        // Use a min heap to track the least frequently picked pool among all the pools
+        PriorityQueue<Pairs.IntPair> minHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator());
+        for (int pool : pools) {
+          int numExistingReplicaGroups =
+              poolToReplicaGroupIdsMap.get(pool) != null ? poolToReplicaGroupIdsMap.get(pool).size() : 0;
+          minHeap.add(new Pairs.IntPair(numExistingReplicaGroups, pool));
+        }
+        for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
+          if (replicaGroupIdToPoolMap.containsKey(replicaId)) {
+            continue;
+          }
+          // Increment the frequency for a given pool and put it back to the min heap to rotate the pool selection.
+          Pairs.IntPair pair = minHeap.remove();
+          int pool = pair.getRight();
+          pair.setLeft(pair.getLeft() + 1);
+          minHeap.add(pair);
+          poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
+          replicaGroupIdToPoolMap.put(replicaId, pool);
+        }
+      } else {
+        // Current default way to assign pool to replica groups.
+        for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
+          // Pick one pool for each replica-group based on the table name hash
+          int pool = pools.get((tableNameHash + replicaId) % numPools);
+          poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
+          replicaGroupIdToPoolMap.put(replicaId, pool);
+        }
       }
       LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap,
           _tableNameWithType);
@@ -132,7 +184,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
       LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}",
           numPartitions, numInstancesPerPartition, _tableNameWithType);
 
-      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) {
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
         // Minimize data movement.
         int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
         int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
@@ -257,7 +309,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
       }
 
       List<String> instancesToSelect;
-      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) {
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
         // Minimize data movement.
         List<String> existingInstances = _existingInstancePartitions.getInstances(0, 0);
         LinkedHashSet<String> candidateInstances = new LinkedHashSet<>();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 5aefd1ad69..755e7aa713 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -21,11 +21,15 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import javax.annotation.Nullable;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
 import org.slf4j.Logger;
@@ -41,9 +45,17 @@ public class InstanceTagPoolSelector {
   private final InstanceTagPoolConfig _tagPoolConfig;
   private final String _tableNameWithType;
 
-  public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType) {
+  private final boolean _minimizeDataMovement;
+
+  private final InstancePartitions _existingInstancePartitions;
+
+  public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType,
+      boolean minimizeDataMovement,
+      @Nullable InstancePartitions existingInstancePartitions) {
     _tagPoolConfig = tagPoolConfig;
     _tableNameWithType = tableNameWithType;
+    _minimizeDataMovement = minimizeDataMovement;
+    _existingInstancePartitions = existingInstancePartitions;
   }
 
   /**
@@ -70,12 +82,14 @@ public class InstanceTagPoolSelector {
     if (_tagPoolConfig.isPoolBased()) {
       // Pool based selection
 
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
       // Extract the pool information from the instance configs
       for (InstanceConfig instanceConfig : candidateInstanceConfigs) {
         Map<String, String> poolMap = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY);
         if (poolMap != null && poolMap.containsKey(tag)) {
           int pool = Integer.parseInt(poolMap.get(tag));
           poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(instanceConfig);
+          instanceToPoolMap.put(instanceConfig.getInstanceName(), pool);
         }
       }
       Preconditions.checkState(!poolToInstanceConfigsMap.isEmpty(),
@@ -96,9 +110,8 @@ public class InstanceTagPoolSelector {
         int numPools = poolToInstanceConfigsMap.size();
         int numPoolsToSelect = _tagPoolConfig.getNumPools();
         if (numPoolsToSelect > 0) {
-          Preconditions
-              .checkState(numPoolsToSelect <= numPools, "Not enough instance pools (%s in the cluster, asked for %s)",
-                  numPools, numPoolsToSelect);
+          Preconditions.checkState(numPoolsToSelect <= numPools,
+              "Not enough instance pools (%s in the cluster, asked for %s)", numPools, numPoolsToSelect);
         } else {
           numPoolsToSelect = numPools;
         }
@@ -109,11 +122,45 @@ public class InstanceTagPoolSelector {
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+        if (_minimizeDataMovement && _existingInstancePartitions != null) {
+          Set<Integer> existingPools = new HashSet<>(numPoolsToSelect);
+          // Keep the same pool if it's already been used for the table.
+          int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+          int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+          for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
+            boolean foundExistingPoolForReplicaGroup = false;
+            for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingPoolForReplicaGroup;
+                partitionId++) {
+              List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              for (String existingInstance : existingInstances) {
+                Integer existingPool = instanceToPoolMap.get(existingInstance);
+                if (existingPool != null & pools.contains(existingPool)) {
+                  poolsToSelect.add(existingPool);
+                  existingPools.add(existingPool);
+                  foundExistingPoolForReplicaGroup = true;
+                  break;
+                }
+              }
+            }
+          }
+          LOGGER.info("Keep the same pool: {} for table: {}", existingPools, _tableNameWithType);
+          // Pick a pool from remainingPools that isn't used before.
+          List<Integer> remainingPools = new ArrayList<>(pools);
+          remainingPools.retainAll(existingPools);
+          // Skip selecting the existing pool.
+          for (int i = 0; i < numPoolsToSelect; i++) {
+            if (existingPools.contains(i)) {
+              continue;
+            }
+            poolsToSelect.add(remainingPools.remove(i % remainingPools.size()));
+          }
+        } else {
+          // Select pools based on the table name hash to evenly distribute the tables
+          List<Integer> poolsInCluster = new ArrayList<>(pools);
+          for (int i = 0; i < numPoolsToSelect; i++) {
+            poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+          }
         }
       }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
index 6b4086615a..f273866eeb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
@@ -76,8 +76,8 @@ public class MirrorServerSetInstancePartitionSelector extends InstancePartitionS
 
   public MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
       String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions,
-      InstancePartitions preConfiguredInstancePartitions) {
-    super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions);
+      InstancePartitions preConfiguredInstancePartitions, boolean minimizeDataMovement) {
+    super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement);
     _preConfiguredInstancePartitions = preConfiguredInstancePartitions;
     _numTargetInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
     _numTargetReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
index dedc79384e..9feb8844c8 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
@@ -118,7 +118,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
     // Add OFFLINE instance assignment config to the offline table config
     InstanceAssignmentConfig offlineInstanceAssignmentConfig = new InstanceAssignmentConfig(
         new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), false, 0, null), null,
-        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null));
+        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null), null, false);
     offlineTableConfig.setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), offlineInstanceAssignmentConfig));
     _helixResourceManager.setExistingTableConfig(offlineTableConfig);
@@ -136,7 +136,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
     // Add CONSUMING instance assignment config to the real-time table config
     InstanceAssignmentConfig consumingInstanceAssignmentConfig = new InstanceAssignmentConfig(
         new InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(SERVER_TENANT_NAME), false, 0, null), null,
-        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null));
+        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null), null, false);
     realtimeTableConfig.setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.CONSUMING.toString(), consumingInstanceAssignmentConfig));
     _helixResourceManager.setExistingTableConfig(realtimeTableConfig);
@@ -164,7 +164,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
             null)));
     InstanceAssignmentConfig tierInstanceAssignmentConfig = new InstanceAssignmentConfig(
         new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), false, 0, null), null,
-        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null));
+        new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null), null, false);
     Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = new HashMap<>();
     instanceAssignmentConfigMap.put(InstancePartitionsType.OFFLINE.toString(), offlineInstanceAssignmentConfig);
     instanceAssignmentConfigMap.put(TIER_NAME, tierInstanceAssignmentConfig);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index b25a529e10..a6220c00a2 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -374,7 +374,7 @@ public class InstanceAssignmentTest {
       TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
           .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
               new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                  InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                  InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
           .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured"))
           .build();
       InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig);
@@ -480,7 +480,7 @@ public class InstanceAssignmentTest {
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
         .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
     InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig);
     InstancePartitions preConfigured = new InstancePartitions("preConfigured");
@@ -561,7 +561,7 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
         .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
     preConfigured = new InstancePartitions("preConfigured");
@@ -664,7 +664,7 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
         .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
     preConfigured = new InstancePartitions("preConfigured");
@@ -756,7 +756,7 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
         .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
     preConfigured = new InstancePartitions("preConfigured");
@@ -851,7 +851,7 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
         .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
     preConfigured = new InstancePartitions("preConfigured");
@@ -956,7 +956,7 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
         .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
     preConfigured = new InstancePartitions("preConfigured");
@@ -1063,7 +1063,7 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
         .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
     preConfigured = new InstancePartitions("preConfigured");
@@ -1156,7 +1156,7 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
         .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -1230,7 +1230,7 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+                InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
         .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -1311,7 +1311,7 @@ public class InstanceAssignmentTest {
         new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false, null);
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-            new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))).build();
+            new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false))).build();
     InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig);
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
@@ -1364,7 +1364,7 @@ public class InstanceAssignmentTest {
     // Select all 3 pools in pool selection
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
     // All instances in pool 2 should be assigned to replica-group 0, and all instances in pool 0 should be assigned to
@@ -1386,7 +1386,7 @@ public class InstanceAssignmentTest {
     // Select pool 0 and 1 in pool selection
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1));
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
@@ -1408,7 +1408,7 @@ public class InstanceAssignmentTest {
     numReplicaGroups = numPools;
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // [pool0, pool1]
@@ -1438,7 +1438,7 @@ public class InstanceAssignmentTest {
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null);
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true)));
     // Reset the instance configs to have only two pools.
     instanceConfigs.clear();
     numInstances = 10;
@@ -1487,7 +1487,7 @@ public class InstanceAssignmentTest {
     // Select pool 0 and 1 in pool selection
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1));
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true)));
 
     // Get the latest existingInstancePartitions from last computation.
     existingInstancePartitions = instancePartitions;
@@ -1514,7 +1514,7 @@ public class InstanceAssignmentTest {
     numReplicaGroups = 3;
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true)));
 
     // Get the latest existingInstancePartitions from last computation.
     existingInstancePartitions = instancePartitions;
@@ -1593,7 +1593,7 @@ public class InstanceAssignmentTest {
     numReplicaGroups = 2;
     replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true)));
 
     // Get the latest existingInstancePartitions from last computation.
     existingInstancePartitions = instancePartitions;
@@ -1693,6 +1693,109 @@ public class InstanceAssignmentTest {
     assertEquals(instancePartitions.getInstances(0, 1),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
             SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+
+    // The below is the test suite for testing out minimizeDataMovement with pool configs
+    // Add the third pool with same number of instances but keep number of pools the same (i.e. 2)
+    numPools = 3;
+    numInstances = numPools * numInstancesPerPool;
+    for (int i = numInstances + 4; i < numInstances + 9; i++) {
+      InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      int pool = numPools - 1;
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2, but since minimizeDataMovement is enabled,
+    // same pools would be re-used.
+    // [pool0, pool1]
+    //  r0     r1
+    // Thus, the instance partition assignment remains the same as the previous one.
+    //     pool 0: [ i12, i4,  i0,  i1, i10 ]
+    //     pool 1: [  i7, i9, i11, i13,  i6 ]
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0,
+            SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+
+    // Set tag pool config to 3.
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true)));
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Putting the existingPoolToInstancesMap shouldn't change the instance assignment,
+    // as there are only 2 replica groups needed.
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+    // But since Pool 0 and Pool 1 is already being used for the table, the numReplica remains at 2,
+    // so the 3rd pool (Pool 2) won't be picked up.
+    // Thus, the instance partition assignment remains the same as the existing one.
+    // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
+    // replica-group 1
+    // Now in poolToInstancesMap:
+    //     pool 0: [ i12, i4,  i0,  i1, i10 ]
+    //     pool 1: [  i7, i9, i11, i13,  i6 ]
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0,
+            SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+
+    // Set replica group from 2 to 3
+    numReplicaGroups = 3;
+    replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null);
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true)));
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Now that 1 more replica group is needed, Pool 2 will be chosen for the 3rd replica group
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+    // [pool0, pool1, pool2]
+    //  r0     r1     r2
+    // Each replica-group should have 2 instances assigned
+    // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+    // Latest instances from ZK:
+    //   pool 0: [ i3, i4, i0, i1, i2 ]
+    //   pool 1: [ i8, i9, i5, i6, i7 ]
+    //   pool 2: [ i22,i23,i19,i20,i21]
+    // Thus, the new assignment will become:
+    //   pool 0: [ i12, i4,  i0,  i1, i10 ]
+    //   pool 1: [  i7, i9, i11, i13,  i6 ]
+    //   pool 2: [ i22, i23, i19, i20,i21 ]
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0,
+            SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 22, SERVER_INSTANCE_ID_PREFIX + 23, SERVER_INSTANCE_ID_PREFIX + 19,
+            SERVER_INSTANCE_ID_PREFIX + 20, SERVER_INSTANCE_ID_PREFIX + 21));
   }
 
   @Test
@@ -1720,7 +1823,7 @@ public class InstanceAssignmentTest {
     InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // No instance with correct tag
     try {
@@ -1750,7 +1853,7 @@ public class InstanceAssignmentTest {
     // Enable pool
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // No instance has correct pool configured
     try {
@@ -1784,7 +1887,7 @@ public class InstanceAssignmentTest {
 
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 3, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // Ask for too many pools
     try {
@@ -1796,7 +1899,7 @@ public class InstanceAssignmentTest {
 
     tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 2));
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // Ask for pool that does not exist
     try {
@@ -1810,7 +1913,7 @@ public class InstanceAssignmentTest {
     replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(false, 6, 0, 0, 0, 0, false, null
     );
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // Ask for too many instances
     try {
@@ -1824,7 +1927,7 @@ public class InstanceAssignmentTest {
     replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 0, 0, false, null
     );
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // Number of replica-groups must be positive
     try {
@@ -1836,7 +1939,7 @@ public class InstanceAssignmentTest {
 
     replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 11, 0, 0, 0, false, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // Ask for too many replica-groups
     try {
@@ -1849,7 +1952,7 @@ public class InstanceAssignmentTest {
 
     replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 3, 0, 0, false, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // Ask for too many instances
     try {
@@ -1861,7 +1964,7 @@ public class InstanceAssignmentTest {
 
     replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 3, false, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // Ask for too many instances per partition
     try {
@@ -1874,7 +1977,7 @@ public class InstanceAssignmentTest {
 
     replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 0, false, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
     // pool0: [i3, i4, i0, i1, i2]
@@ -1914,7 +2017,8 @@ public class InstanceAssignmentTest {
     try {
       tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
           .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-              new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, "ILLEGAL_SELECTOR"))).build();
+              new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, "ILLEGAL_SELECTOR", false)))
+          .build();
     } catch (IllegalArgumentException e) {
       assertEquals(e.getMessage(),
           "No enum constant org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig.PartitionSelector"
@@ -1943,7 +2047,8 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
+        .build();
     driver = new InstanceAssignmentDriver(tableConfig);
     try {
       instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
@@ -1976,7 +2081,8 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
+        .build();
     driver = new InstanceAssignmentDriver(tableConfig);
     try {
       instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
@@ -2017,7 +2123,8 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
+        .build();
     driver = new InstanceAssignmentDriver(tableConfig);
     try {
       instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
@@ -2055,7 +2162,8 @@ public class InstanceAssignmentTest {
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
+        .build();
     InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig);
 
     InstancePartitions instancePartitions =
@@ -2127,7 +2235,8 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
             new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+                InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true)))
+        .build();
     driver = new InstanceAssignmentDriver(tableConfig);
     // existingInstancePartitions = instancePartitions
     instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, instancePartitions);
@@ -2208,7 +2317,7 @@ public class InstanceAssignmentTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
             Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
-                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
         .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerReplicaGroup))
         .setSegmentPartitionConfig(segmentPartitionConfig).build();
     driver = new InstanceAssignmentDriver(tableConfig);
@@ -2282,7 +2391,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
             .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
-                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -2338,7 +2447,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
             .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
-                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -2405,7 +2514,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
             .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
-                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -2471,7 +2580,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
             .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
-                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -2542,7 +2651,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
             .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
-                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -2593,7 +2702,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
             .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
-                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
@@ -2657,7 +2766,7 @@ public class InstanceAssignmentTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
             .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
                 new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
-                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+                    InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true)))
             .build();
     driver = new InstanceAssignmentDriver(tableConfig);
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
index 889206437f..2fdef27796 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
@@ -64,7 +64,7 @@ public class InstanceReplicaGroupPartitionSelectorTest {
         new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null);
 
     InstanceReplicaGroupPartitionSelector selector =
-        new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", existing);
+        new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", existing, true);
 
     String[] serverNames = {"rg0-0", "rg0-1", "rg1-0", "rg1-1"};
     String[] poolNumbers = {"0", "0", "1", "1"};
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 5d679c0380..1df7109ef2 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -195,7 +195,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest {
     InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
     _helixResourceManager.updateTableConfig(tableConfig);
 
     // No need to reassign instances because instances should be automatically assigned when updating the table config
@@ -481,7 +481,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest {
     InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false, null);
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(TIER_A_NAME,
-        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
     _helixResourceManager.updateTableConfig(tableConfig);
 
     rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
index 391ba4812d..ad4b22ecaf 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
@@ -41,13 +41,17 @@ public class InstanceAssignmentConfig extends BaseJsonConfig {
       "Configuration for the instance replica-group and partition of the instance assignment (mandatory)")
   private final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig;
 
+  @JsonPropertyDescription("Configuration to minimize data movement for pool and instance assignment")
+  private final boolean _minimizeDataMovement;
+
   @JsonCreator
   public InstanceAssignmentConfig(
       @JsonProperty(value = "tagPoolConfig", required = true) InstanceTagPoolConfig tagPoolConfig,
       @JsonProperty("constraintConfig") @Nullable InstanceConstraintConfig constraintConfig,
       @JsonProperty(value = "replicaGroupPartitionConfig", required = true)
           InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
-      @JsonProperty("partitionSelector") @Nullable String partitionSelector) {
+      @JsonProperty("partitionSelector") @Nullable String partitionSelector,
+      @JsonProperty("minimizeDataMovement") boolean minimizeDataMovement) {
     Preconditions.checkArgument(tagPoolConfig != null, "'tagPoolConfig' must be configured");
     Preconditions
         .checkArgument(replicaGroupPartitionConfig != null, "'replicaGroupPartitionConfig' must be configured");
@@ -57,11 +61,7 @@ public class InstanceAssignmentConfig extends BaseJsonConfig {
     _partitionSelector =
         partitionSelector == null ? PartitionSelector.INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR
             : PartitionSelector.valueOf(partitionSelector);
-  }
-
-  public InstanceAssignmentConfig(InstanceTagPoolConfig tagPoolConfig, InstanceConstraintConfig constraintConfig,
-      InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig) {
-    this(tagPoolConfig, constraintConfig, replicaGroupPartitionConfig, null);
+    _minimizeDataMovement = minimizeDataMovement;
   }
 
   public PartitionSelector getPartitionSelector() {
@@ -81,6 +81,10 @@ public class InstanceAssignmentConfig extends BaseJsonConfig {
     return _replicaGroupPartitionConfig;
   }
 
+  public boolean isMinimizeDataMovement() {
+    return _minimizeDataMovement;
+  }
+
   public enum PartitionSelector {
     FD_AWARE_INSTANCE_PARTITION_SELECTOR, INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR,
     MIRROR_SERVER_SET_PARTITION_SELECTOR
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java
index adc72e8f1c..1bc40cba21 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java
@@ -56,6 +56,8 @@ public class InstanceReplicaGroupPartitionConfig extends BaseJsonConfig {
       "Name of the column used for partition, if not provided table level replica group will be used")
   private final String _partitionColumn;
 
+  // TODO: remove this config in the next official release
+  @Deprecated
   private final boolean _minimizeDataMovement;
 
   @JsonCreator


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org