You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2022/04/07 03:23:12 UTC

[pinot] branch minimize-instance-movement created (now 17139574d5)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch minimize-instance-movement
in repository https://gitbox.apache.org/repos/asf/pinot.git


      at 17139574d5 Minimize data movement between instances in pools

This branch includes the following new commits:

     new 17139574d5 Minimize data movement between instances in pools

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[pinot] 01/01: Minimize data movement between instances in pools

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch minimize-instance-movement
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 17139574d51fe9c6649067c71c0f7ab9e56f3e43
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Wed Apr 6 20:22:52 2022 -0700

    Minimize data movement between instances in pools
---
 .../common/assignment/InstancePartitions.java      |  66 +++-
 .../PinotInstanceAssignmentRestletResource.java    |  38 +-
 .../api/resources/PinotTableRestletResource.java   |  10 +-
 .../helix/core/PinotHelixResourceManager.java      |   2 +-
 .../instance/InstanceAssignmentDriver.java         |   7 +-
 .../InstanceReplicaGroupPartitionSelector.java     |  63 ++--
 ...stanceReplicaGroupPartitionSelectorFactory.java |  47 +++
 ...ementInstanceReplicaGroupPartitionSelector.java | 277 +++++++++++++++
 .../core/rebalance/RebalanceConfigConstants.java   |   4 +
 .../helix/core/rebalance/TableRebalancer.java      |  34 +-
 .../instance/InstanceAssignmentTest.java           | 385 +++++++++++++++++++--
 .../apache/pinot/tools/PinotTableRebalancer.java   |   3 +-
 .../tools/admin/command/RebalanceTableCommand.java |   6 +-
 13 files changed, 868 insertions(+), 74 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
index c511077187..20fa7478ac 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -58,24 +60,39 @@ import org.apache.pinot.spi.utils.JsonUtils;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class InstancePartitions {
   private static final char PARTITION_REPLICA_GROUP_SEPARATOR = '_';
+  private static final String POOLS_KEY = "pools";
+  private static final String REPLICA_GROUP_SEPARATOR = "/";
 
   private final String _instancePartitionsName;
+  // A map to store the partition and its associated list of instances.
+  // The partition key would be like "0_0", where the 1st number denotes the partition id,
+  // and the 2nd one denotes the replica group id.
   private final Map<String, List<String>> _partitionToInstancesMap;
+  // A map to store the selected pool numbers and their associated list of replica groups.
+  private final Map<Integer, List<Integer>> _poolToReplicaGroupsMap;
   private int _numPartitions;
   private int _numReplicaGroups;
 
   public InstancePartitions(String instancePartitionsName) {
     _instancePartitionsName = instancePartitionsName;
     _partitionToInstancesMap = new TreeMap<>();
+    _poolToReplicaGroupsMap = new TreeMap<>();
   }
 
   @JsonCreator
   private InstancePartitions(
       @JsonProperty(value = "instancePartitionsName", required = true) String instancePartitionsName,
       @JsonProperty(value = "partitionToInstancesMap", required = true)
-          Map<String, List<String>> partitionToInstancesMap) {
+          Map<String, List<String>> partitionToInstancesMap,
+      @JsonProperty(value = "poolToReplicaGroupsMap") Map<String, String> poolToReplicaGroupsMap) {
     _instancePartitionsName = instancePartitionsName;
     _partitionToInstancesMap = partitionToInstancesMap;
+    _poolToReplicaGroupsMap = new TreeMap<>();
+    if (poolToReplicaGroupsMap != null) {
+      for (Map.Entry<String, String> entry : poolToReplicaGroupsMap.entrySet()) {
+        _poolToReplicaGroupsMap.put(Integer.parseInt(entry.getKey()), extractReplicaGroups(entry.getValue()));
+      }
+    }
     for (String key : partitionToInstancesMap.keySet()) {
       int separatorIndex = key.indexOf(PARTITION_REPLICA_GROUP_SEPARATOR);
       int partitionId = Integer.parseInt(key.substring(0, separatorIndex));
@@ -105,6 +122,11 @@ public class InstancePartitions {
     return _numReplicaGroups;
   }
 
+  @JsonIgnore
+  public Map<Integer, List<Integer>> getPoolToReplicaGroupsMap() {
+    return _poolToReplicaGroupsMap;
+  }
+
   public List<String> getInstances(int partitionId, int replicaGroupId) {
     return _partitionToInstancesMap
         .get(Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR + replicaGroupId);
@@ -117,13 +139,53 @@ public class InstancePartitions {
     _numReplicaGroups = Integer.max(_numReplicaGroups, replicaGroupId + 1);
   }
 
+  public void setPoolToReplicaGroupsMap(Map<Integer, List<Integer>> poolToReplicaGroupsMap) {
+    _poolToReplicaGroupsMap.putAll(poolToReplicaGroupsMap);
+  }
+
   public static InstancePartitions fromZNRecord(ZNRecord znRecord) {
-    return new InstancePartitions(znRecord.getId(), znRecord.getListFields());
+    return new InstancePartitions(znRecord.getId(), znRecord.getListFields(), znRecord.getMapField(POOLS_KEY));
+  }
+
+  private static List<Integer> extractReplicaGroups(String instancesRawString) {
+    if (instancesRawString == null || instancesRawString.length() == 0) {
+      return Collections.emptyList();
+    }
+    String[] replicaGroupStringArray = instancesRawString.split(REPLICA_GROUP_SEPARATOR);
+    List<Integer> instances = new ArrayList<>(replicaGroupStringArray.length);
+    for (String replicaGroupString : replicaGroupStringArray) {
+      instances.add(Integer.parseInt(replicaGroupString));
+    }
+    return instances;
+  }
+
+  private String convertReplicaGroupsToString(List<Integer> replicaGroups) {
+    if (replicaGroups == null || replicaGroups.isEmpty()) {
+      return "";
+    }
+    StringBuilder stringBuilder = new StringBuilder();
+    for (Integer replicaGroup : replicaGroups) {
+      if (stringBuilder.length() == 0) {
+        stringBuilder.append(replicaGroup);
+      } else {
+        stringBuilder.append(REPLICA_GROUP_SEPARATOR).append(replicaGroup);
+      }
+    }
+    return stringBuilder.toString();
+  }
+
+  private Map<String, String> convertListToStringMap() {
+    Map<String, String> convertedMap = new TreeMap<>();
+    for (Map.Entry<Integer, List<Integer>> entry : _poolToReplicaGroupsMap.entrySet()) {
+      convertedMap.put(Integer.toString(entry.getKey()), convertReplicaGroupsToString(entry.getValue()));
+    }
+    return convertedMap;
   }
 
   public ZNRecord toZNRecord() {
     ZNRecord znRecord = new ZNRecord(_instancePartitionsName);
     znRecord.setListFields(_partitionToInstancesMap);
+    znRecord.setMapField(POOLS_KEY, convertListToStringMap());
     return znRecord;
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
index 8d9e6cc7dc..40f7bfd0ce 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
@@ -124,7 +124,9 @@ public class PinotInstanceAssignmentRestletResource {
       @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName,
       @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type") @Nullable
           InstancePartitionsType instancePartitionsType,
-      @ApiParam(value = "Whether to do dry-run") @DefaultValue("false") @QueryParam("dryRun") boolean dryRun) {
+      @ApiParam(value = "Whether to do dry-run") @DefaultValue("false") @QueryParam("dryRun") boolean dryRun,
+      @ApiParam(value = "Whether to retain current instance sequence") @DefaultValue("false")
+      @QueryParam("retainInstanceSequence") boolean retainInstanceSequence) {
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>();
     List<InstanceConfig> instanceConfigs = _resourceManager.getAllHelixInstanceConfigs();
 
@@ -136,8 +138,8 @@ public class PinotInstanceAssignmentRestletResource {
         try {
           if (InstanceAssignmentConfigUtils
               .allowInstanceAssignment(offlineTableConfig, InstancePartitionsType.OFFLINE)) {
-            instancePartitionsMap.put(InstancePartitionsType.OFFLINE, new InstanceAssignmentDriver(offlineTableConfig)
-                .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs));
+            assignInstancesForInstancePartitionsType(instancePartitionsMap, offlineTableConfig, instanceConfigs,
+                InstancePartitionsType.OFFLINE, retainInstanceSequence);
           }
         } catch (IllegalStateException e) {
           throw new ControllerApplicationException(LOGGER, "Caught IllegalStateException", Response.Status.BAD_REQUEST,
@@ -152,19 +154,18 @@ public class PinotInstanceAssignmentRestletResource {
       TableConfig realtimeTableConfig = _resourceManager.getRealtimeTableConfig(tableName);
       if (realtimeTableConfig != null) {
         try {
-          InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(realtimeTableConfig);
           if (instancePartitionsType == InstancePartitionsType.CONSUMING || instancePartitionsType == null) {
             if (InstanceAssignmentConfigUtils
                 .allowInstanceAssignment(realtimeTableConfig, InstancePartitionsType.CONSUMING)) {
-              instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
-                  instanceAssignmentDriver.assignInstances(InstancePartitionsType.CONSUMING, instanceConfigs));
+              assignInstancesForInstancePartitionsType(instancePartitionsMap, realtimeTableConfig, instanceConfigs,
+                  InstancePartitionsType.CONSUMING, retainInstanceSequence);
             }
           }
           if (instancePartitionsType == InstancePartitionsType.COMPLETED || instancePartitionsType == null) {
             if (InstanceAssignmentConfigUtils
                 .allowInstanceAssignment(realtimeTableConfig, InstancePartitionsType.COMPLETED)) {
-              instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
-                  instanceAssignmentDriver.assignInstances(InstancePartitionsType.COMPLETED, instanceConfigs));
+              assignInstancesForInstancePartitionsType(instancePartitionsMap, realtimeTableConfig, instanceConfigs,
+                  InstancePartitionsType.COMPLETED, retainInstanceSequence);
             }
           }
         } catch (IllegalStateException e) {
@@ -191,6 +192,27 @@ public class PinotInstanceAssignmentRestletResource {
     return instancePartitionsMap;
   }
 
+  /**
+   * Assign instances given the type of instancePartitions.
+   * @param instancePartitionsMap the empty map to be filled.
+   * @param tableConfig table config
+   * @param instanceConfigs list of instance configs
+   * @param instancePartitionsType type of instancePartitions
+   * @param retainInstanceSequence whether to retain instance sequence
+   */
+  private void assignInstancesForInstancePartitionsType(
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, TableConfig tableConfig,
+      List<InstanceConfig> instanceConfigs, InstancePartitionsType instancePartitionsType,
+      boolean retainInstanceSequence) {
+    InstancePartitions existingInstancePartitions = null;
+    if (retainInstanceSequence) {
+      existingInstancePartitions = InstancePartitionsUtils
+          .fetchOrComputeInstancePartitions(_resourceManager.getHelixZkManager(), tableConfig, instancePartitionsType);
+    }
+    instancePartitionsMap.put(instancePartitionsType, new InstanceAssignmentDriver(tableConfig)
+        .assignInstances(instancePartitionsType, instanceConfigs, existingInstancePartitions));
+  }
+
   private void persistInstancePartitionsHelper(InstancePartitions instancePartitions) {
     try {
       LOGGER.info("Persisting instance partitions: {}", instancePartitions);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 46e7a2e7da..1db480acce 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -569,9 +569,12 @@ public class PinotTableRestletResource {
           boolean downtime, @ApiParam(
       value = "For no-downtime rebalance, minimum number of replicas to keep alive during rebalance, or maximum "
           + "number of replicas allowed to be unavailable if value is negative") @DefaultValue("1")
-  @QueryParam("minAvailableReplicas") int minAvailableReplicas, @ApiParam(
-      value = "Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime contract cannot "
-          + "be achieved)") @DefaultValue("false") @QueryParam("bestEfforts") boolean bestEfforts) {
+  @QueryParam("minAvailableReplicas") int minAvailableReplicas,
+      @ApiParam(value = "Whether to use best-efforts to rebalance (not fail the rebalance"
+          + " when the no-downtime contract cannot be achieved)") @DefaultValue("false")
+      @QueryParam("bestEfforts") boolean bestEfforts,
+      @ApiParam(value = "Whether to retain instance sequence during rebalancing in order to minimize data movement")
+      @DefaultValue("false") @QueryParam("retainInstancesSequence") boolean retainInstancesSequence) {
 
     String tableNameWithType = constructTableNameWithType(tableName, tableTypeStr);
 
@@ -583,6 +586,7 @@ public class PinotTableRestletResource {
     rebalanceConfig.addProperty(RebalanceConfigConstants.DOWNTIME, downtime);
     rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME, minAvailableReplicas);
     rebalanceConfig.addProperty(RebalanceConfigConstants.BEST_EFFORTS, bestEfforts);
+    rebalanceConfig.addProperty(RebalanceConfigConstants.RETAIN_INSTANCE_SEQUENCE, retainInstancesSequence);
 
     try {
       if (dryRun || downtime) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 3b901171e8..8fdcf48c60 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1649,7 +1649,7 @@ public class PinotHelixResourceManager {
       List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
       for (InstancePartitionsType instancePartitionsType : instancePartitionsTypesToAssign) {
         InstancePartitions instancePartitions =
-            instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs);
+            instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, null);
         LOGGER.info("Persisting instance partitions: {}", instancePartitions);
         InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
       }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index 1440fae204..444f4934ea 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -52,7 +52,7 @@ public class InstanceAssignmentDriver {
   }
 
   public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType,
-      List<InstanceConfig> instanceConfigs) {
+      List<InstanceConfig> instanceConfigs, InstancePartitions existingInstancePartitions) {
     String tableNameWithType = _tableConfig.getTableName();
     LOGGER.info("Starting {} instance assignment for table: {}", instancePartitionsType, tableNameWithType);
 
@@ -73,8 +73,9 @@ public class InstanceAssignmentDriver {
       poolToInstanceConfigsMap = constraintApplier.applyConstraint(poolToInstanceConfigsMap);
     }
 
-    InstanceReplicaGroupPartitionSelector replicaPartitionSelector =
-        new InstanceReplicaGroupPartitionSelector(assignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType);
+    InstanceReplicaGroupPartitionSelector replicaPartitionSelector = InstanceReplicaGroupPartitionSelectorFactory
+        .generateInstanceReplicaGroupPartitionSelector(assignmentConfig.getReplicaGroupPartitionConfig(),
+            tableNameWithType, existingInstancePartitions);
     InstancePartitions instancePartitions = new InstancePartitions(
         instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)));
     replicaPartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index 3e83bf7720..13cbc9cd07 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -37,8 +37,8 @@ import org.slf4j.LoggerFactory;
 public class InstanceReplicaGroupPartitionSelector {
   private static final Logger LOGGER = LoggerFactory.getLogger(InstanceReplicaGroupPartitionSelector.class);
 
-  private final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig;
-  private final String _tableNameWithType;
+  protected final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig;
+  protected final String _tableNameWithType;
 
   public InstanceReplicaGroupPartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
       String tableNameWithType) {
@@ -163,31 +163,44 @@ public class InstanceReplicaGroupPartitionSelector {
       }
     } else {
       // Non-replica-group based selection
+      selectForNonReplicaGroupBased(poolToInstanceConfigsMap, instancePartitions);
+    }
+  }
 
-      // Pick one pool based on the table name hash
-      int pool = pools.get(tableNameHash % numPools);
-      LOGGER.info("Selecting pool: {} for table: {}", pool, _tableNameWithType);
-      List<InstanceConfig> instanceConfigs = poolToInstanceConfigsMap.get(pool);
-      int numInstanceConfigs = instanceConfigs.size();
-
-      // Assign all instances if not configured
-      int numInstancesToSelect = _replicaGroupPartitionConfig.getNumInstances();
-      if (numInstancesToSelect > 0) {
-        Preconditions.checkState(numInstancesToSelect <= numInstanceConfigs,
-            "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstanceConfigs,
-            numInstancesToSelect);
-      } else {
-        numInstancesToSelect = numInstanceConfigs;
-      }
+  protected void selectForNonReplicaGroupBased(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
+      InstancePartitions instancePartitions) {
+    int numPools = poolToInstanceConfigsMap.size();
+    Preconditions.checkState(numPools != 0, "No pool qualified for selection");
 
-      List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
-      for (int i = 0; i < numInstancesToSelect; i++) {
-        instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
-      }
-      instancesToSelect.sort(null);
-      LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType);
-      // Set the instances as partition 0 replica 0
-      instancePartitions.setInstances(0, 0, instancesToSelect);
+    int tableNameHash = Math.abs(_tableNameWithType.hashCode());
+    List<Integer> pools = new ArrayList<>(poolToInstanceConfigsMap.keySet());
+    pools.sort(null);
+    LOGGER.info("Starting instance replica-group/partition selection for table: {} with hash: {} from pools: {}",
+        _tableNameWithType, tableNameHash, pools);
+
+    // Pick one pool based on the table name hash
+    int pool = pools.get(tableNameHash % numPools);
+    LOGGER.info("Selecting pool: {} for table: {}", pool, _tableNameWithType);
+    List<InstanceConfig> instanceConfigs = poolToInstanceConfigsMap.get(pool);
+    int numInstanceConfigs = instanceConfigs.size();
+
+    // Assign all instances if not configured
+    int numInstancesToSelect = _replicaGroupPartitionConfig.getNumInstances();
+    if (numInstancesToSelect > 0) {
+      Preconditions.checkState(numInstancesToSelect <= numInstanceConfigs,
+          "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstanceConfigs,
+          numInstancesToSelect);
+    } else {
+      numInstancesToSelect = numInstanceConfigs;
+    }
+
+    List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
+    for (int i = 0; i < numInstancesToSelect; i++) {
+      instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
     }
+    instancesToSelect.sort(null);
+    LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType);
+    // Set the instances as partition 0 replica 0
+    instancePartitions.setInstances(0, 0, instancesToSelect);
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorFactory.java
new file mode 100644
index 0000000000..36e16e8d27
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorFactory.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.instance;
+
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+
+/**
+ * A factory class to generate {@link InstanceReplicaGroupPartitionSelector}.
+ */
+public class InstanceReplicaGroupPartitionSelectorFactory {
+
+  private InstanceReplicaGroupPartitionSelectorFactory() {
+  }
+
+  public static InstanceReplicaGroupPartitionSelector generateInstanceReplicaGroupPartitionSelector(
+      InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType,
+      InstancePartitions existingInstancePartitions) {
+    InstanceReplicaGroupPartitionSelector replicaPartitionSelector;
+    if (existingInstancePartitions == null) {
+      replicaPartitionSelector =
+          new InstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType);
+    } else {
+      // If existing instance partitions is not null, use the customized selector to minimize data movement.
+      replicaPartitionSelector =
+          new MinimizedDataMovementInstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig,
+              tableNameWithType, existingInstancePartitions);
+    }
+    return replicaPartitionSelector;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MinimizedDataMovementInstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MinimizedDataMovementInstanceReplicaGroupPartitionSelector.java
new file mode 100644
index 0000000000..0af1264d3a
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MinimizedDataMovementInstanceReplicaGroupPartitionSelector.java
@@ -0,0 +1,277 @@
+package org.apache.pinot.controller.helix.core.assignment.instance;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An extended class of {@link InstanceReplicaGroupPartitionSelector} to minimize data movement between instances.
+ * Currently the following scenarios are supported:
+ *    * swap instances within a pool
+ *    * add / remove instances per replica group
+ *    * increase / decrease number of replica groups
+ *
+ * TODO: Support the remaining scenarios:
+ *    * add / remove pools
+ */
+public class MinimizedDataMovementInstanceReplicaGroupPartitionSelector extends InstanceReplicaGroupPartitionSelector {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(MinimizedDataMovementInstanceReplicaGroupPartitionSelector.class);
+
+  private final InstancePartitions _existingInstancePartitions;
+
+  public MinimizedDataMovementInstanceReplicaGroupPartitionSelector(
+      InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, String tableNameWithType,
+      InstancePartitions existingInstancePartitions) {
+    super(replicaGroupPartitionConfig, tableNameWithType);
+    _existingInstancePartitions = existingInstancePartitions;
+  }
+
+  @Override
+  public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
+      InstancePartitions instancePartitions) {
+    int numPools = poolToInstanceConfigsMap.size();
+    Preconditions.checkState(numPools != 0, "No pool qualified for selection");
+
+    int tableNameHash = Math.abs(_tableNameWithType.hashCode());
+    List<Integer> pools = new ArrayList<>(poolToInstanceConfigsMap.keySet());
+    pools.sort(null);
+    Preconditions.checkState(pools.containsAll(_existingInstancePartitions.getPoolToReplicaGroupsMap().keySet()),
+        String.format("The existing pool no longer exists in ZK any more. Existing pools: %s. Latest pools: %s",
+            _existingInstancePartitions.getPoolToReplicaGroupsMap().keySet(), pools));
+    LOGGER.info("Starting instance replica-group/partition selection for table: {} with hash: {} from pools: {}",
+        _tableNameWithType, tableNameHash, pools);
+
+    if (_replicaGroupPartitionConfig.isReplicaGroupBased()) {
+      // Replica-group based selection
+
+      // Find out the mapping between pool and replica groups.
+      int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups();
+      Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive");
+      Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
+      Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
+      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
+        // Pick one pool for each replica-group based on the table name hash
+        int pool = pools.get((tableNameHash + replicaId) % numPools);
+        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
+        replicaGroupIdToPoolMap.put(replicaId, pool);
+      }
+      LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap,
+          _tableNameWithType);
+
+      // Finalize the number of instances per replica group.
+      int numInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
+      if (numInstancesPerReplicaGroup > 0) {
+        // Check if we have enough instances if number of instances per replica-group is configured
+        for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
+          int pool = entry.getKey();
+          int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size();
+          int numInstancesToSelect = numInstancesPerReplicaGroup * entry.getValue().size();
+          Preconditions.checkState(numInstancesToSelect <= numInstancesInPool,
+              "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstancesInPool,
+              numInstancesToSelect);
+        }
+      } else {
+        // Use as many instances as possible if number of instances per replica-group is not configured
+        numInstancesPerReplicaGroup = Integer.MAX_VALUE;
+        for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
+          int pool = entry.getKey();
+          int numReplicaGroupsInPool = entry.getValue().size();
+          int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size();
+          Preconditions.checkState(numReplicaGroupsInPool <= numInstancesInPool,
+              "Not enough qualified instances from pool: %s, cannot select %s replica-groups from %s instances", pool,
+              numReplicaGroupsInPool, numInstancesInPool);
+          numInstancesPerReplicaGroup =
+              Math.min(numInstancesPerReplicaGroup, numInstancesInPool / numReplicaGroupsInPool);
+        }
+      }
+      LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup,
+          _tableNameWithType);
+
+      // Assign instances within a replica-group to one partition if not configured
+      int numPartitions = _replicaGroupPartitionConfig.getNumPartitions();
+      if (numPartitions <= 0) {
+        numPartitions = 1;
+      }
+      // Assign all instances within a replica-group to each partition if not configured
+      int numInstancesPerPartition = _replicaGroupPartitionConfig.getNumInstancesPerPartition();
+      if (numInstancesPerPartition > 0) {
+        Preconditions.checkState(numInstancesPerPartition <= numInstancesPerReplicaGroup,
+            "Number of instances per partition: %s must be smaller or equal to number of instances per replica-group:"
+                + " %s", numInstancesPerPartition, numInstancesPerReplicaGroup);
+      } else {
+        numInstancesPerPartition = numInstancesPerReplicaGroup;
+      }
+      LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}",
+          numPartitions, numInstancesPerPartition, _tableNameWithType);
+
+      // Step 1: Identify candidate instances from latest list of instance configs in ZK.
+      Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<InstanceConfig> instanceConfigs = entry.getValue();
+        for (InstanceConfig instanceConfig : instanceConfigs) {
+          poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>())
+              .add(instanceConfig.getInstanceName());
+        }
+      }
+
+      Map<Integer, Map<String, String>> poolToGoneInstancesAndReplacedInstancesMap = new TreeMap<>();
+      Map<String, List<String>> existingPartitionToLatestInstancesMap = new TreeMap<>();
+      Map<String, List<String>> existingPartitionToInstancesMap =
+          _existingInstancePartitions.getPartitionToInstancesMap();
+      Map<Integer, Set<String>> poolToExistingAliveInstancesMap = new TreeMap<>();
+
+      int maxNumberOfInstancesPerInstancePartitionAssignment = Integer.MIN_VALUE;
+      for (List<String> instances : existingPartitionToInstancesMap.values()) {
+        maxNumberOfInstancesPerInstancePartitionAssignment =
+            Math.max(maxNumberOfInstancesPerInstancePartitionAssignment, instances.size());
+      }
+
+      // Step 2: by reusing the existing mapping, find out the missing instances.
+      for (int replicaGroupId = 0; replicaGroupId < _existingInstancePartitions.getNumReplicaGroups();
+          replicaGroupId++) {
+        Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+        if (pool == null) {
+          // Skip validating replica group if it's no longer needed.
+          continue;
+        }
+        for (int partitionId = 0; partitionId < _existingInstancePartitions.getNumPartitions(); partitionId++) {
+          List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+          List<String> latestInstancesInMap = existingPartitionToLatestInstancesMap
+              .computeIfAbsent(partitionId + "_" + replicaGroupId, k -> new ArrayList<>());
+
+          for (String existingInstance : existingInstances) {
+            // The instance still exists in the ZK.
+            if (poolToReplicaGroupIdsMap.containsKey(pool) && poolToCandidateInstancesMap.containsKey(pool)
+                && poolToCandidateInstancesMap.get(pool).contains(existingInstance)) {
+              poolToExistingAliveInstancesMap.computeIfAbsent(pool, k -> new HashSet<>()).add(existingInstance);
+              latestInstancesInMap.add(existingInstance);
+            } else {
+              // The instance no longer exists
+              poolToGoneInstancesAndReplacedInstancesMap.computeIfAbsent(pool, k -> new TreeMap<>())
+                  .put(existingInstance, null);
+              latestInstancesInMap.add(null);
+            }
+          }
+        }
+      }
+
+      // Step 3: Find out all new instances in each pool.
+      Map<Integer, Deque<String>> poolToNewCandidateInstancesMap = new TreeMap<>();
+      for (Map.Entry<Integer, Set<String>> entry : poolToCandidateInstancesMap.entrySet()) {
+        Integer pool = entry.getKey();
+        Set<String> candidateInstancesInPool = entry.getValue();
+        Set<String> existingStillAliveInstances =
+            poolToExistingAliveInstancesMap.computeIfAbsent(pool, k -> new HashSet<>());
+        for (String candidateInstance : candidateInstancesInPool) {
+          if (!existingStillAliveInstances.contains(candidateInstance)) {
+            poolToNewCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedList<>()).add(candidateInstance);
+          }
+        }
+      }
+
+      // Step 4: Find the 1:1 mapping between the gone instance and the new instance.
+      for (Map.Entry<Integer, Map<String, String>> entry : poolToGoneInstancesAndReplacedInstancesMap.entrySet()) {
+        Integer pool = entry.getKey();
+        Map<String, String> goneInstanceToNewInstanceMap = entry.getValue();
+        Deque<String> newInstancesInPool =
+            poolToNewCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedList<>());
+        goneInstanceToNewInstanceMap.replaceAll((k, v) -> {
+          if (!newInstancesInPool.isEmpty()) {
+            return newInstancesInPool.pollFirst();
+          } else {
+            return v;
+          }
+        });
+      }
+
+      // Step 5: Fill the vacant positions with the new instances.
+      Map<String, List<String>> newInstancePartitionsAssignmentMap = new TreeMap<>();
+      int finalNumInstancesPerPartition = numInstancesPerPartition;
+      for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+        for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
+          Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+          Map<String, String> goneInstanceToNewInstanceMap =
+              poolToGoneInstancesAndReplacedInstancesMap.computeIfAbsent(pool, k -> new TreeMap<>());
+          Set<String> candidateInstancesMap = poolToCandidateInstancesMap.get(pool);
+          String partitionToReplicaGroupKey = partitionId + "_" + replicaGroupId;
+          List<String> existingInstances =
+              existingPartitionToInstancesMap.computeIfAbsent(partitionToReplicaGroupKey, k -> new ArrayList<>());
+
+          // Construct an empty list to store the latest instances.
+          List<String> latestInstanceAssignment =
+              newInstancePartitionsAssignmentMap.computeIfAbsent(partitionToReplicaGroupKey, k -> {
+                List<String> instances = new ArrayList<>(finalNumInstancesPerPartition);
+                for (int i = 0; i < finalNumInstancesPerPartition; i++) {
+                  instances.add(null);
+                }
+                return instances;
+              });
+
+          // Traverse the existing list of instances, fill the vacant positions with new instances from the map.
+          for (int i = 0; i < existingInstances.size() && i < finalNumInstancesPerPartition; i++) {
+            String existingInstance = existingInstances.get(i);
+            String replacedInstance = goneInstanceToNewInstanceMap.get(existingInstance);
+            if (replacedInstance != null) {
+              latestInstanceAssignment.set(i, replacedInstance);
+              candidateInstancesMap.remove(replacedInstance);
+            } else {
+              // If the instance does exist in the gone map but there is no new instance to replace its position,
+              // skip adding anything into the assignment.
+              if (!goneInstanceToNewInstanceMap.containsKey(existingInstance)) {
+                latestInstanceAssignment.set(i, existingInstance);
+                candidateInstancesMap.remove(existingInstance);
+              }
+            }
+          }
+          // If the new number of instances per partition is larger than the previous one, extend the vacant positions.
+          if (finalNumInstancesPerPartition > existingInstances.size()) {
+            Iterator<String> candidateInstancesInPoolIterator = candidateInstancesMap.iterator();
+            for (int i = existingInstances.size(); i < finalNumInstancesPerPartition; i++) {
+              if (candidateInstancesInPoolIterator.hasNext()) {
+                String candidateInstance = candidateInstancesInPoolIterator.next();
+                latestInstanceAssignment.set(i, candidateInstance);
+                candidateInstancesInPoolIterator.remove();
+              }
+            }
+          }
+
+          // Fill up the vacant positions if any.
+          for (int i = 0; i < latestInstanceAssignment.size(); i++) {
+            Iterator<String> candidateInstancesInPoolIterator = candidateInstancesMap.iterator();
+            if (latestInstanceAssignment.get(i) == null) {
+              if (candidateInstancesInPoolIterator.hasNext()) {
+                String candidateInstance = candidateInstancesInPoolIterator.next();
+                latestInstanceAssignment.set(i, candidateInstance);
+                candidateInstancesInPoolIterator.remove();
+              }
+            }
+          }
+
+          instancePartitions.setInstances(partitionId, replicaGroupId, latestInstanceAssignment);
+        }
+      }
+
+      // Persist poolToReplicaGroupsMap to ZK.
+      instancePartitions.setPoolToReplicaGroupsMap(poolToReplicaGroupIdsMap);
+    } else {
+      // Non-replica-group based selection
+      selectForNonReplicaGroupBased(poolToInstanceConfigsMap, instancePartitions);
+    }
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java
index 70bebabc17..d08fbbfd84 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java
@@ -33,6 +33,10 @@ public class RebalanceConfigConstants {
   public static final String REASSIGN_INSTANCES = "reassignInstances";
   public static final boolean DEFAULT_REASSIGN_INSTANCES = false;
 
+  // Whether to retain the sequence for the existing instances
+  public static final String RETAIN_INSTANCE_SEQUENCE = "retainInstancesSequence";
+  public static final boolean DEFAULT_RETAIN_INSTANCE_SEQUENCE = false;
+
   // Whether to reassign CONSUMING segments
   public static final String INCLUDE_CONSUMING = "includeConsuming";
   public static final boolean DEFAULT_INCLUDE_CONSUMING = false;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index d6701c8363..8f86ac6fa9 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -145,6 +145,8 @@ public class TableRebalancer {
         tableConfig.getRoutingConfig().getInstanceSelectorType());
     boolean bestEfforts = rebalanceConfig.getBoolean(RebalanceConfigConstants.BEST_EFFORTS,
         RebalanceConfigConstants.DEFAULT_BEST_EFFORTS);
+    boolean retainInstanceSequence = rebalanceConfig.getBoolean(RebalanceConfigConstants.RETAIN_INSTANCE_SEQUENCE,
+        RebalanceConfigConstants.DEFAULT_RETAIN_INSTANCE_SEQUENCE);
     LOGGER.info(
         "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, "
             + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, bestEfforts: {}",
@@ -194,7 +196,7 @@ public class TableRebalancer {
     // Calculate instance partitions map
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap;
     try {
-      instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, dryRun);
+      instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, dryRun, retainInstanceSequence);
     } catch (Exception e) {
       LOGGER.warn(
           "Caught exception while fetching/calculating instance partitions for table: {}, aborting the rebalance",
@@ -323,10 +325,12 @@ public class TableRebalancer {
           currentIdealState = idealState;
           currentAssignment = currentIdealState.getRecord().getMapFields();
           // Re-calculate the instance partitions in case the instance configs changed during the rebalance
-          instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, false);
+          instancePartitionsMap =
+              getInstancePartitionsMap(tableConfig, reassignInstances, false, retainInstanceSequence);
           tierToInstancePartitionsMap = getTierToInstancePartitionsMap(tableNameWithType, sortedTiers);
-          targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers,
-              tierToInstancePartitionsMap, rebalanceConfig);
+          targetAssignment = segmentAssignment
+              .rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers, tierToInstancePartitionsMap,
+                  rebalanceConfig);
           expectedVersion = currentIdealState.getRecord().getVersion();
         } catch (Exception e) {
           LOGGER.warn(
@@ -381,21 +385,24 @@ public class TableRebalancer {
   }
 
   private Map<InstancePartitionsType, InstancePartitions> getInstancePartitionsMap(TableConfig tableConfig,
-      boolean reassignInstances, boolean dryRun) {
+      boolean reassignInstances, boolean dryRun, boolean retainInstanceSequence) {
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>();
     if (tableConfig.getTableType() == TableType.OFFLINE) {
       instancePartitionsMap.put(InstancePartitionsType.OFFLINE,
-          getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, dryRun));
+          getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, dryRun,
+              retainInstanceSequence));
     } else {
       instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
-          getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, dryRun));
+          getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, dryRun,
+              retainInstanceSequence));
       String tableNameWithType = tableConfig.getTableName();
       if (InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) {
         LOGGER.info(
             "COMPLETED segments should be relocated, fetching/computing COMPLETED instance partitions for table: {}",
             tableNameWithType);
         instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
-            getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, dryRun));
+            getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, dryRun,
+                retainInstanceSequence));
       } else {
         LOGGER.info(
             "COMPLETED segments should not be relocated, skipping fetching/computing COMPLETED instance partitions "
@@ -413,14 +420,21 @@ public class TableRebalancer {
   }
 
   private InstancePartitions getInstancePartitions(TableConfig tableConfig,
-      InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean dryRun) {
+      InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean dryRun,
+      boolean retainInstanceSequence) {
     String tableNameWithType = tableConfig.getTableName();
     if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, instancePartitionsType)) {
       if (reassignInstances) {
+        InstancePartitions existingInstancePartitions = null;
+        if (retainInstanceSequence) {
+          existingInstancePartitions = InstancePartitionsUtils
+              .fetchOrComputeInstancePartitions(_helixManager, tableConfig, instancePartitionsType);
+        }
         LOGGER.info("Reassigning {} instances for table: {}", instancePartitionsType, tableNameWithType);
         InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
         InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType,
-            _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true));
+            _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true),
+            existingInstancePartitions);
         if (!dryRun) {
           LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions);
           InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index f1fc049ff5..197b4919df 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -71,7 +71,8 @@ public class InstanceAssignmentTest {
     }
 
     // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 2 instances
-    InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    InstancePartitions instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     // Instances of index 4 to 7 are not assigned because of the hash-based rotation
@@ -95,7 +96,7 @@ public class InstanceAssignmentTest {
 
     // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3
     // instances should be assigned to 2 partitions, each with 2 instances
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
     assertEquals(instancePartitions.getNumPartitions(), numPartitions);
     // Instance of index 7 is not assigned because of the hash-based rotation
@@ -123,6 +124,85 @@ public class InstanceAssignmentTest {
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
     assertEquals(instancePartitions.getInstances(1, 2),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 6));
+
+    // ===== Test against the cases when the existing instancePartitions isn't null. =====
+    // Put the existing instancePartitions as the parameter to the InstanceAssignmentDriver.
+    // The returned instance partition should be the same as the last computed one.
+
+    // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3
+    // instances should be assigned to 2 partitions, each with 2 instances
+    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, instancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+    assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+    // Instance of index 7 is not assigned because of the hash-based rotation
+    // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8
+    // [i8, i9, i0, i1, i2, i3, i4, i5, i6, i7]
+    //  r0, r1, r2, r0, r1, r2, r0, r1, r2
+    // r0: [i8, i1, i4]
+    //      p0, p0, p1
+    //      p1
+    // r1: [i9, i2, i5]
+    //      p0, p0, p1
+    //      p1
+    // r2: [i0, i3, i6]
+    //      p0, p0, p1
+    //      p1
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(1, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 9));
+    assertEquals(instancePartitions.getInstances(1, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
+    assertEquals(instancePartitions.getInstances(1, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 6));
+
+    // Remove two instances (i2, i6) and add two new instances (i10, i11).
+    instanceConfigs.remove(6);
+    instanceConfigs.remove(2);
+    for (int i = numInstances; i < numInstances + 2; i++) {
+      InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3
+    // instances should be assigned to 2 partitions, each with 2 instances
+    // Leverage the latest instancePartitions from last computation as the parameter.
+    // Data movement is minimized so that: i2 -> i10, i6 -> i11
+    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, instancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+    assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+    // Instance of index 7 is not assigned because of the hash-based rotation
+    // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8
+    // [i8, i9, i0, i1, i10, i3, i4, i5, i11, i7]
+    //  r0, r1, r2, r0, r1, r2, r0, r1, r2
+    // r0: [i8, i1, i4]
+    //      p0, p0, p1
+    //      p1
+    // r1: [i9, i10, i5]
+    //      p0, p0, p1
+    //      p1
+    // r2: [i0, i3, i11]
+    //      p0, p0, p1
+    //      p1
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(1, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 9));
+    assertEquals(instancePartitions.getInstances(1, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
+    assertEquals(instancePartitions.getInstances(1, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 11));
   }
 
   @Test
@@ -155,7 +235,8 @@ public class InstanceAssignmentTest {
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
     // replica-group 1
-    InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    InstancePartitions instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -182,7 +263,7 @@ public class InstanceAssignmentTest {
     // Pool 0 and 2 will be selected in the pool selection
     // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 2 should be assigned to
     // replica-group 1
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -200,7 +281,7 @@ public class InstanceAssignmentTest {
     // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
     // All instances in pool 2 should be assigned to replica-group 0, and all instances in pool 0 should be assigned to
     // replica-group 1
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -218,7 +299,7 @@ public class InstanceAssignmentTest {
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
     // replica-group 1
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -244,7 +325,7 @@ public class InstanceAssignmentTest {
     //          r0  r2  r0  r2
     // pool 1: [i8, i9, i5, i6, i7]
     //          r1  r1
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0),
@@ -253,6 +334,269 @@ public class InstanceAssignmentTest {
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
     assertEquals(instancePartitions.getInstances(0, 2),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4));
+
+    // ===== Test against the cases when the existing instancePartitions isn't null. =====
+    // Reset the number of replica groups to 2 and pools to 2.
+    numReplicaGroups = 2;
+    numPools = 2;
+    replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0);
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+    // Reset the instance configs to have only two pools.
+    instanceConfigs.clear();
+    numInstances = 10;
+    for (int i = 0; i < numInstances; i++) {
+      InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      int pool = i / numInstancesPerPool;
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Use all pools, the instancePartitions should be the same as the one without using
+    // the existing partition to instances map.
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
+    // replica-group 1.
+    // [pool0, pool1]
+    //  r0     r1
+    InstancePartitions existingInstancePartitions = null;
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
+
+    // Get the latest existingPoolToInstancesMap from last computation and try again.
+    // The actual assignment should be the same as last one.
+    existingInstancePartitions = instancePartitions;
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
+
+    // Select pool 0 and 1 in pool selection
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1));
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Putting the existingPoolToInstancesMap shouldn't change the instance assignment.
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
+    // replica-group 1
+    // Now in poolToInstancesMap:
+    // pool 0: [ i3, i4, i0, i1, i2 ]
+    // pool 1: [ i8, i9, i5, i6, i7 ]
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
+
+    // Assign instances from 2 pools to 3 replica-groups
+    numReplicaGroups = 3;
+    replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0);
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // [pool0, pool1]
+    //  r0     r1
+    //  r2
+    // Each replica-group should have 2 instances assigned
+    // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+    // Latest instances from ZK:
+    //   pool 0: [ i3, i4, i0, i1, i2 ]
+    //   pool 1: [ i8, i9, i5, i6, i7 ]
+    // Due to the fact that in existing instancePartition instances are sorted, i0 and i1 will be retained for r0,
+    // i5 and i6 will be retained for r1. i3 and i4 are picked up from latest instances in the target pool.
+    // Thus, the new assignment will be as follows:
+    //   pool 0: [i0, i1, i2, i3, i4]
+    //            r0  r0      r2  r2
+    //   pool 1: [i5, i6, i7, i8, i9]
+    //            r1  r1
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+
+    // Remove one instance from each of the pools and add one more back.
+    instanceConfigs.remove(5);
+    instanceConfigs.remove(3);
+    int poolCount = 0;
+    for (int i = numInstances; i < numInstances + 2; i++) {
+      InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      int pool = poolCount++;
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // [pool0, pool1]
+    //  r0     r1
+    //  r2
+    // Each replica-group should have 2 instances assigned
+    // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+    // Latest instances from ZK:
+    //     pool 0: [ i2,  i4,  i0, i1, i10 ]
+    //     pool 1: [ i8,  i9, i11, i6,  i7 ]
+    // i3 gets swapped out, the next available instance i2 will take its place.
+    // Similarly, i5 is swapped out and i8 will take its place.
+    // Thus, the new assignment will be as follows:
+    //     pool 0: [ i2,  i4,  i0, i1, i10 ]
+    //               r2   r2   r0  r0
+    //     pool 1: [ i8,  i9, i11, i6,  i7 ]
+    //               r1            r1
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 6));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 4));
+
+    // Reduce number of replica groups from 3 to 2.
+    numReplicaGroups = 2;
+    replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0);
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // [pool0, pool1]
+    //  r0     r1
+    // Each replica-group should have 2 instances assigned
+    // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+    // Latest instances from ZK:
+    //     pool 0: [ i2,  i4,  i0, i1, i10 ]
+    //     pool 1: [ i8,  i9, i11, i6,  i7 ]
+    // In the existing instancePartitions, r0 already has [i0, i1], append the rest
+    // available instances (ie. [i2, i4, i10]) to the tail.
+    // r1 already has [i8, i6], append the rest available instances (ie. [i9, i11, i7]) to the tail.
+    // Thus, the new assignment will become:
+    //     pool 0: [ i0, i1, i2,  i4, i10 ]
+    //               r0   r0   r0  r0  r0
+    //     pool 1: [ i8, i6, i9, i11,  i7 ]
+    //               r1   r1  r1  r1   r1
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 9,
+            SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 7));
+
+    // Add 1 more instances to each pool
+    poolCount = 0;
+    for (int i = numInstances + 2; i < numInstances + 4; i++) {
+      InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      int pool = poolCount++;
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // [pool0, pool1]
+    //  r0     r1
+    // Each replica-group should have 2 instances assigned
+    // Math.abs("myTable_OFFLINE".hashCode()) % 6 = 2
+    // Latest instances from ZK:
+    //     pool 0: [ i10, i12, i2, i4,  i0,  i1 ]
+    //     pool 1: [  i6,  i7, i8, i9, i11, i13 ]
+    // There is one more empty position for each of the replica groups.
+    // Append the newly added instances (ie. i12 and i13) to the tails.
+    // Thus, the new assignment will become:
+    //     pool 0: [ i0, i1, i2,  i4, i10, i12 ]
+    //     pool 1: [ i8, i6, i9, i11,  i7, i13 ]
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 12));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 9,
+            SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 13));
+
+    // Remove one instances from each of the pools, ie. i2 and i8.
+    instanceConfigs.remove(6);
+    instanceConfigs.remove(2);
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // [pool0, pool1]
+    //  r0     r1
+    // Each replica-group should have 2 instances assigned
+    // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+    // Latest instances from ZK:
+    //     pool 0: [ i12, i4,  i0, i1, i10 ]
+    //     pool 1: [ i7,  i9, i11, i13, i6 ]
+    // Since i2 and i8 got removed from the pools,
+    // the tail instances (ie. i12 and 13) will be used to fill their vacant position.
+    // Thus, the new assignment will become:
+    //     pool 0: [ i0, i1, i12,  i4, i10 ]
+    //     pool 1: [ i13, i6, i9, i11,  i7 ]
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 12,
+            SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 9,
+            SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 7));
   }
 
   @Test
@@ -270,7 +614,7 @@ public class InstanceAssignmentTest {
     // No instance assignment config
     assertFalse(InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, InstancePartitionsType.OFFLINE));
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Instance assignment is not allowed for the given table config");
@@ -284,7 +628,7 @@ public class InstanceAssignmentTest {
 
     // No instance with correct tag
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "No enabled instance has the tag: tenant_OFFLINE");
@@ -295,7 +639,8 @@ public class InstanceAssignmentTest {
     }
 
     // All instances should be assigned as replica-group 0 partition 0
-    InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    InstancePartitions instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), 1);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     List<String> expectedInstances = new ArrayList<>(numInstances);
@@ -311,7 +656,7 @@ public class InstanceAssignmentTest {
 
     // No instance has correct pool configured
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "No enabled instance has the pool configured for the tag: tenant_OFFLINE");
@@ -328,7 +673,7 @@ public class InstanceAssignmentTest {
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // All instances in pool 0 should be assigned as replica-group 0 partition 0
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), 1);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     expectedInstances.clear();
@@ -343,7 +688,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many pools
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Not enough instance pools (2 in the cluster, asked for 3)");
@@ -355,7 +700,7 @@ public class InstanceAssignmentTest {
 
     // Ask for pool that does not exist
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Cannot find all instance pools configured: [0, 2]");
@@ -368,7 +713,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many instances
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Not enough qualified instances from pool: 0 (5 in the pool, asked for 6)");
@@ -381,7 +726,7 @@ public class InstanceAssignmentTest {
 
     // Number of replica-groups must be positive
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Number of replica-groups must be positive");
@@ -393,7 +738,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many replica-groups
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(),
@@ -406,7 +751,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many instances
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Not enough qualified instances from pool: 0 (5 in the pool, asked for 6)");
@@ -418,7 +763,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many instances per partition
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(),
@@ -434,7 +779,7 @@ public class InstanceAssignmentTest {
     //         r0  r2  r0  r2
     // pool1: [i8, i9, i5, i6, i7]
     //         r1  r1
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
     assertEquals(instancePartitions.getNumReplicaGroups(), 3);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0),
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
index b2bfab19f1..6758d17ff5 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
@@ -36,7 +36,7 @@ public class PinotTableRebalancer extends PinotZKChanger {
 
   public PinotTableRebalancer(String zkAddress, String clusterName, boolean dryRun, boolean reassignInstances,
       boolean includeConsuming, boolean bootstrap, boolean downtime, int minReplicasToKeepUpForNoDowntime,
-      boolean bestEffort) {
+      boolean bestEffort, boolean retainInstancesSequence) {
     super(zkAddress, clusterName);
     _rebalanceConfig.addProperty(RebalanceConfigConstants.DRY_RUN, dryRun);
     _rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES, reassignInstances);
@@ -46,6 +46,7 @@ public class PinotTableRebalancer extends PinotZKChanger {
     _rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
         minReplicasToKeepUpForNoDowntime);
     _rebalanceConfig.addProperty(RebalanceConfigConstants.BEST_EFFORTS, bestEffort);
+    _rebalanceConfig.addProperty(RebalanceConfigConstants.RETAIN_INSTANCE_SEQUENCE, retainInstancesSequence);
   }
 
   public RebalanceResult rebalance(String tableNameWithType) {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
index 6f15c8632b..d7148668d0 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
@@ -77,6 +77,10 @@ public class RebalanceTableCommand extends AbstractBaseAdminCommand implements C
           + " cannot be achieved, false by default)")
   private boolean _bestEfforts = false;
 
+  @CommandLine.Option(names = {"-retainInstancesSequence"},
+      description = "Whether to retain instance sequence during rebalancing in order to minimize data movement")
+  private boolean _retainInstancesSequence = false;
+
   @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, help = true, description = "Print this message")
   private boolean _help = false;
 
@@ -94,7 +98,7 @@ public class RebalanceTableCommand extends AbstractBaseAdminCommand implements C
       throws Exception {
     PinotTableRebalancer tableRebalancer =
         new PinotTableRebalancer(_zkAddress, _clusterName, _dryRun, _reassignInstances, _includeConsuming, _bootstrap,
-            _downtime, _minAvailableReplicas, _bestEfforts);
+            _downtime, _minAvailableReplicas, _bestEfforts, _retainInstancesSequence);
     RebalanceResult rebalanceResult = tableRebalancer.rebalance(_tableNameWithType);
     LOGGER
         .info("Got rebalance result: {} for table: {}", JsonUtils.objectToString(rebalanceResult), _tableNameWithType);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org