You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2022/03/30 17:54:48 UTC

[pinot] 01/01: Add retainInstancesSequence feature to table rebalance to minimize data movement between instances

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch retain-instance-sequence
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 564cb9a9669b7d7390a003eb3dcc0b848689b3ec
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Wed Mar 30 10:54:14 2022 -0700

    Add retainInstancesSequence feature to table rebalance to minimize data movement between instances
---
 .../common/assignment/InstancePartitions.java      |  77 ++++-
 .../PinotInstanceAssignmentRestletResource.java    |  15 +-
 .../helix/core/PinotHelixResourceManager.java      |   2 +-
 .../instance/InstanceAssignmentDriver.java         |  43 ++-
 .../instance/InstanceTagPoolSelector.java          | 125 ++++++--
 .../core/rebalance/RebalanceConfigConstants.java   |   4 +
 .../helix/core/rebalance/TableRebalancer.java      |  30 +-
 .../instance/InstanceAssignmentTest.java           | 318 +++++++++++++++++++--
 8 files changed, 542 insertions(+), 72 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
index c511077..811de20 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -58,14 +60,18 @@ import org.apache.pinot.spi.utils.JsonUtils;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class InstancePartitions {
   private static final char PARTITION_REPLICA_GROUP_SEPARATOR = '_';
+  private static final String PARTITIONS_KEY = "partitions";
+  private static final String INSTANCE_SEPARATOR = "/";
 
   private final String _instancePartitionsName;
-  private final Map<String, List<String>> _partitionToInstancesMap;
+  private final Map<String, List<String>> _partitionWithReplicaGroupToInstancesMap;
+  private final Map<Integer, List<String>> _partitionToInstancesMap;
   private int _numPartitions;
   private int _numReplicaGroups;
 
   public InstancePartitions(String instancePartitionsName) {
     _instancePartitionsName = instancePartitionsName;
+    _partitionWithReplicaGroupToInstancesMap = new TreeMap<>();
     _partitionToInstancesMap = new TreeMap<>();
   }
 
@@ -73,10 +79,18 @@ public class InstancePartitions {
   private InstancePartitions(
       @JsonProperty(value = "instancePartitionsName", required = true) String instancePartitionsName,
       @JsonProperty(value = "partitionToInstancesMap", required = true)
-          Map<String, List<String>> partitionToInstancesMap) {
+          Map<String, List<String>> partitionWithReplicaGroupToInstancesMap,
+      @JsonProperty(value = "partitionToInstancesMap", required = true)
+          Map<String, String> partitionsMap) {
     _instancePartitionsName = instancePartitionsName;
-    _partitionToInstancesMap = partitionToInstancesMap;
-    for (String key : partitionToInstancesMap.keySet()) {
+    _partitionWithReplicaGroupToInstancesMap = partitionWithReplicaGroupToInstancesMap;
+    _partitionToInstancesMap = new TreeMap<>();
+    if (partitionsMap != null) {
+      for (Map.Entry<String, String> entry : partitionsMap.entrySet()) {
+        _partitionToInstancesMap.put(Integer.parseInt(entry.getKey()), extractInstances(entry.getValue()));
+      }
+    }
+    for (String key : partitionWithReplicaGroupToInstancesMap.keySet()) {
       int separatorIndex = key.indexOf(PARTITION_REPLICA_GROUP_SEPARATOR);
       int partitionId = Integer.parseInt(key.substring(0, separatorIndex));
       int replicaGroupId = Integer.parseInt(key.substring(separatorIndex + 1));
@@ -91,8 +105,8 @@ public class InstancePartitions {
   }
 
   @JsonProperty
-  public Map<String, List<String>> getPartitionToInstancesMap() {
-    return _partitionToInstancesMap;
+  public Map<String, List<String>> getPartitionWithReplicaGroupToInstancesMap() {
+    return _partitionWithReplicaGroupToInstancesMap;
   }
 
   @JsonIgnore
@@ -105,25 +119,68 @@ public class InstancePartitions {
     return _numReplicaGroups;
   }
 
+  @JsonIgnore
+  public Map<Integer, List<String>> getPartitionToInstancesMap() {
+    return _partitionToInstancesMap;
+  }
+
   public List<String> getInstances(int partitionId, int replicaGroupId) {
-    return _partitionToInstancesMap
+    return _partitionWithReplicaGroupToInstancesMap
         .get(Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR + replicaGroupId);
   }
 
   public void setInstances(int partitionId, int replicaGroupId, List<String> instances) {
     String key = Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR + replicaGroupId;
-    _partitionToInstancesMap.put(key, instances);
+    _partitionWithReplicaGroupToInstancesMap.put(key, instances);
     _numPartitions = Integer.max(_numPartitions, partitionId + 1);
     _numReplicaGroups = Integer.max(_numReplicaGroups, replicaGroupId + 1);
   }
 
+  public void setPartitionToInstancesMap(Map<Integer, List<String>> partitionToInstancesMap) {
+    _partitionToInstancesMap.putAll(partitionToInstancesMap);
+  }
+
   public static InstancePartitions fromZNRecord(ZNRecord znRecord) {
-    return new InstancePartitions(znRecord.getId(), znRecord.getListFields());
+    return new InstancePartitions(znRecord.getId(), znRecord.getListFields(), znRecord.getMapField(PARTITIONS_KEY));
+  }
+
+  private static List<String> extractInstances(String instancesRawString) {
+    if (instancesRawString == null || instancesRawString.length() == 0) {
+      return Collections.emptyList();
+    }
+    String[] instancesArray = instancesRawString.split(INSTANCE_SEPARATOR);
+    List<String> instances = new ArrayList<>(instancesArray.length);
+    Collections.addAll(instances, instancesArray);
+    return instances;
+  }
+
+  private String convertInstancesToString(List<String> instances) {
+    if (instances == null || instances.isEmpty()) {
+      return "";
+    }
+    StringBuilder stringBuilder = new StringBuilder();
+    for (String instance : instances) {
+      if (stringBuilder.length() == 0) {
+        stringBuilder.append(instance);
+      } else {
+        stringBuilder.append(INSTANCE_SEPARATOR).append(instance);
+      }
+    }
+    return stringBuilder.toString();
+  }
+
+  private Map<String, String> convertListToStringMap() {
+    Map<String, String> convertedMap = new TreeMap<>();
+    for (Map.Entry<Integer, List<String>> entry : _partitionToInstancesMap.entrySet()) {
+      convertedMap.put(Integer.toString(entry.getKey()), convertInstancesToString(entry.getValue()));
+    }
+    return convertedMap;
   }
 
   public ZNRecord toZNRecord() {
     ZNRecord znRecord = new ZNRecord(_instancePartitionsName);
-    znRecord.setListFields(_partitionToInstancesMap);
+    znRecord.setListFields(_partitionWithReplicaGroupToInstancesMap);
+    znRecord.setMapField(PARTITIONS_KEY, convertListToStringMap());
     return znRecord;
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
index 8d9e6cc..58e677d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
@@ -137,7 +137,7 @@ public class PinotInstanceAssignmentRestletResource {
           if (InstanceAssignmentConfigUtils
               .allowInstanceAssignment(offlineTableConfig, InstancePartitionsType.OFFLINE)) {
             instancePartitionsMap.put(InstancePartitionsType.OFFLINE, new InstanceAssignmentDriver(offlineTableConfig)
-                .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs));
+                .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()));
           }
         } catch (IllegalStateException e) {
           throw new ControllerApplicationException(LOGGER, "Caught IllegalStateException", Response.Status.BAD_REQUEST,
@@ -156,15 +156,15 @@ public class PinotInstanceAssignmentRestletResource {
           if (instancePartitionsType == InstancePartitionsType.CONSUMING || instancePartitionsType == null) {
             if (InstanceAssignmentConfigUtils
                 .allowInstanceAssignment(realtimeTableConfig, InstancePartitionsType.CONSUMING)) {
-              instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
-                  instanceAssignmentDriver.assignInstances(InstancePartitionsType.CONSUMING, instanceConfigs));
+              instancePartitionsMap.put(InstancePartitionsType.CONSUMING, instanceAssignmentDriver
+                  .assignInstances(InstancePartitionsType.CONSUMING, instanceConfigs, Collections.emptyMap()));
             }
           }
           if (instancePartitionsType == InstancePartitionsType.COMPLETED || instancePartitionsType == null) {
             if (InstanceAssignmentConfigUtils
                 .allowInstanceAssignment(realtimeTableConfig, InstancePartitionsType.COMPLETED)) {
-              instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
-                  instanceAssignmentDriver.assignInstances(InstancePartitionsType.COMPLETED, instanceConfigs));
+              instancePartitionsMap.put(InstancePartitionsType.COMPLETED, instanceAssignmentDriver
+                  .assignInstances(InstancePartitionsType.COMPLETED, instanceConfigs, Collections.emptyMap()));
             }
           }
         } catch (IllegalStateException e) {
@@ -295,8 +295,9 @@ public class PinotInstanceAssignmentRestletResource {
     while (iterator.hasNext()) {
       InstancePartitions instancePartitions = iterator.next();
       boolean oldInstanceFound = false;
-      Map<String, List<String>> partitionToInstancesMap = instancePartitions.getPartitionToInstancesMap();
-      for (List<String> instances : partitionToInstancesMap.values()) {
+      Map<String, List<String>> partitionWithReplicaGroupToInstancesMap =
+          instancePartitions.getPartitionWithReplicaGroupToInstancesMap();
+      for (List<String> instances : partitionWithReplicaGroupToInstancesMap.values()) {
         oldInstanceFound |= Collections.replaceAll(instances, oldInstanceId, newInstanceId);
       }
       if (oldInstanceFound) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ecbd3b7..754146a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1648,7 +1648,7 @@ public class PinotHelixResourceManager {
       List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
       for (InstancePartitionsType instancePartitionsType : instancePartitionsTypesToAssign) {
         InstancePartitions instancePartitions =
-            instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs);
+            instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, Collections.emptyMap());
         LOGGER.info("Persisting instance partitions: {}", instancePartitions);
         InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
       }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index 1440fae..4f60d2c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
@@ -51,16 +52,31 @@ public class InstanceAssignmentDriver {
     _tableConfig = tableConfig;
   }
 
+  /**
+   * Assign instances to InstancePartitions object.
+   * @param instancePartitionsType type of instance partitions
+   * @param instanceConfigs list of instance configs
+   * @param partitionToInstancesMap existing instance with sequence that should be respected. An empty list
+   *                                      means no preceding sequence to respect and the instances would be sorted.
+   */
   public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType,
-      List<InstanceConfig> instanceConfigs) {
+      List<InstanceConfig> instanceConfigs, Map<Integer, List<String>> partitionToInstancesMap) {
+    boolean shouldRetainInstanceSequence = !partitionToInstancesMap.isEmpty();
     String tableNameWithType = _tableConfig.getTableName();
-    LOGGER.info("Starting {} instance assignment for table: {}", instancePartitionsType, tableNameWithType);
+    LOGGER.info("Starting {} instance assignment for table: {}. Should retain instance sequence: {}",
+        instancePartitionsType, tableNameWithType, shouldRetainInstanceSequence);
 
     InstanceAssignmentConfig assignmentConfig =
         InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType);
     InstanceTagPoolSelector tagPoolSelector =
         new InstanceTagPoolSelector(assignmentConfig.getTagPoolConfig(), tableNameWithType);
-    Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = tagPoolSelector.selectInstances(instanceConfigs);
+    Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap =
+        tagPoolSelector.selectInstances(instanceConfigs, partitionToInstancesMap);
+
+    InstancePartitions instancePartitions = new InstancePartitions(
+        instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)));
+    instancePartitions
+        .setPartitionToInstancesMap(extractInstanceNamesFromPoolToInstanceConfigsMap(poolToInstanceConfigsMap));
 
     InstanceConstraintConfig constraintConfig = assignmentConfig.getConstraintConfig();
     List<InstanceConstraintApplier> constraintAppliers = new ArrayList<>();
@@ -75,9 +91,26 @@ public class InstanceAssignmentDriver {
 
     InstanceReplicaGroupPartitionSelector replicaPartitionSelector =
         new InstanceReplicaGroupPartitionSelector(assignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType);
-    InstancePartitions instancePartitions = new InstancePartitions(
-        instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)));
     replicaPartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions);
     return instancePartitions;
   }
+
+  private Map<Integer, List<String>> extractInstanceNamesFromPoolToInstanceConfigsMap(
+      Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap) {
+    Map<Integer, List<String>> partitionToInstancesMap = new TreeMap<>();
+    for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+      Integer pool = entry.getKey();
+      List<InstanceConfig> instanceConfigs = entry.getValue();
+      partitionToInstancesMap.put(pool, extractInstanceNamesFromInstanceConfigs(instanceConfigs));
+    }
+    return partitionToInstancesMap;
+  }
+
+  private List<String> extractInstanceNamesFromInstanceConfigs(List<InstanceConfig> instanceConfigs) {
+    List<String> instanceNames = new ArrayList<>(instanceConfigs.size());
+    for (InstanceConfig instanceConfig : instanceConfigs) {
+      instanceNames.add(instanceConfig.getInstanceName());
+    }
+    return instanceNames;
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 5aefd1a..b56c7ed 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -20,7 +20,10 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
 
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -48,52 +51,105 @@ public class InstanceTagPoolSelector {
 
   /**
    * Returns a map from pool to instance configs based on the tag and pool config for the given instance configs.
+   * @param instanceConfigs list of latest instance configs from ZK.
+   * @param partitionToInstancesMap existing instance with sequence that should be respected. An empty list
+   *                                      means no preceding sequence to respect and the instances would be sorted.
    */
-  public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> instanceConfigs) {
+  public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> instanceConfigs,
+      Map<Integer, List<String>> partitionToInstancesMap) {
     int tableNameHash = Math.abs(_tableNameWithType.hashCode());
     LOGGER.info("Starting instance tag/pool selection for table: {} with hash: {}", _tableNameWithType, tableNameHash);
 
-    // Filter out the instances with the correct tag
+    // Filter out the instances with the correct tag.
     String tag = _tagPoolConfig.getTag();
-    List<InstanceConfig> candidateInstanceConfigs = new ArrayList<>();
+    Map<String, InstanceConfig> candidateInstanceConfigsMap = new LinkedHashMap<>();
     for (InstanceConfig instanceConfig : instanceConfigs) {
       if (instanceConfig.getTags().contains(tag)) {
-        candidateInstanceConfigs.add(instanceConfig);
+        candidateInstanceConfigsMap.put(instanceConfig.getInstanceName(), instanceConfig);
       }
     }
-    candidateInstanceConfigs.sort(Comparator.comparing(InstanceConfig::getInstanceName));
-    int numCandidateInstances = candidateInstanceConfigs.size();
+
+    // Find out newly added instances from the latest copies of instance configs.
+    Deque<String> newlyAddedInstances = new LinkedList<>(candidateInstanceConfigsMap.keySet());
+    for (List<String> existingInstancesWithSequence : partitionToInstancesMap.values()) {
+      newlyAddedInstances.removeAll(existingInstancesWithSequence);
+    }
+
+    int numCandidateInstances = candidateInstanceConfigsMap.size();
     Preconditions.checkState(numCandidateInstances > 0, "No enabled instance has the tag: %s", tag);
     LOGGER.info("{} enabled instances have the tag: {} for table: {}", numCandidateInstances, tag, _tableNameWithType);
 
-    Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new TreeMap<>();
+    // Each pool number associates with a map that key is the instance name and value is the instance config.
+    Map<Integer, Map<String, InstanceConfig>> poolToInstanceConfigsMap = new HashMap<>();
+    // Each pool number associates with a list of newly added instance configs,
+    // so that new instances can be fetched from this list.
+    Map<Integer, Deque<InstanceConfig>> poolToNewInstanceConfigsMap = new HashMap<>();
+
+    // Extract the pool information from the instance configs.
+    for (Map.Entry<String,InstanceConfig> entry : candidateInstanceConfigsMap.entrySet()) {
+      String instanceName = entry.getKey();
+      InstanceConfig instanceConfig = entry.getValue();
+      Map<String, String> poolMap = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY);
+      if (poolMap != null && poolMap.containsKey(tag)) {
+        int pool = Integer.parseInt(poolMap.get(tag));
+        poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new TreeMap<>()).put(instanceName, instanceConfig);
+        if (newlyAddedInstances.contains(instanceName)) {
+          poolToNewInstanceConfigsMap.computeIfAbsent(pool, k -> new LinkedList<>()).add(instanceConfig);
+        }
+      }
+    }
+
+    Map<Integer, List<InstanceConfig>> poolToLatestInstanceConfigsMap = new TreeMap<>();
     if (_tagPoolConfig.isPoolBased()) {
       // Pool based selection
 
-      // Extract the pool information from the instance configs
-      for (InstanceConfig instanceConfig : candidateInstanceConfigs) {
-        Map<String, String> poolMap = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY);
-        if (poolMap != null && poolMap.containsKey(tag)) {
-          int pool = Integer.parseInt(poolMap.get(tag));
-          poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(instanceConfig);
+      for (Map.Entry<Integer, List<String>> entry : partitionToInstancesMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<String> existingInstanceAssignmentInPool = entry.getValue();
+        List<InstanceConfig> candidateInstanceConfigsWithSequence = new ArrayList<>();
+        for (String existingInstance: existingInstanceAssignmentInPool) {
+          InstanceConfig instanceConfig = poolToInstanceConfigsMap.get(pool).get(existingInstance);
+          // Add instances to the candidate list and respect the sequence of the existing instances from the ZK.
+          // The missing/removed instances will be replaced by the newly instances.
+          // If the instance still exists from ZK, then add it to the candidate list.
+          // E.g. if the old instances are: [I1, I2, I3, I4] and the new instance are: [I1, I3, I4, I5, I6],
+          // the removed instance is I2 and the newly added instances are I5 and I6.
+          // The position of I2 would be replaced by I5, the new remaining I6 would be appended to the tail.
+          // Thus, the new order would be [I1, I5, I3, I4, I6].
+          if (instanceConfig != null) {
+            candidateInstanceConfigsWithSequence.add(instanceConfig);
+          } else {
+            // The current chosen instance no longer lives in the cluster any more, thus pick a new instance.
+            candidateInstanceConfigsWithSequence.add(poolToNewInstanceConfigsMap.get(pool).pollFirst());
+          }
         }
+        poolToLatestInstanceConfigsMap.put(pool, candidateInstanceConfigsWithSequence);
       }
+
+      // The preceding list of instances has been traversed. Add the remaining new instances.
+      for (Map.Entry<Integer, Deque<InstanceConfig>> entry : poolToNewInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        Deque<InstanceConfig> remainingNewInstanceConfigs = entry.getValue();
+        poolToLatestInstanceConfigsMap.computeIfAbsent(pool, k -> new ArrayList<>())
+            .addAll(remainingNewInstanceConfigs);
+      }
+
       Preconditions.checkState(!poolToInstanceConfigsMap.isEmpty(),
           "No enabled instance has the pool configured for the tag: %s", tag);
       Map<Integer, Integer> poolToNumInstancesMap = new TreeMap<>();
-      for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToLatestInstanceConfigsMap.entrySet()) {
         poolToNumInstancesMap.put(entry.getKey(), entry.getValue().size());
       }
       LOGGER.info("Number instances for each pool: {} for table: {}", poolToNumInstancesMap, _tableNameWithType);
 
       // Calculate the pools to select based on the selection config
-      Set<Integer> pools = poolToInstanceConfigsMap.keySet();
+      Set<Integer> pools = poolToLatestInstanceConfigsMap.keySet();
       List<Integer> poolsToSelect = _tagPoolConfig.getPools();
       if (poolsToSelect != null && !poolsToSelect.isEmpty()) {
         Preconditions.checkState(pools.containsAll(poolsToSelect), "Cannot find all instance pools configured: %s",
             poolsToSelect);
       } else {
-        int numPools = poolToInstanceConfigsMap.size();
+        int numPools = poolToLatestInstanceConfigsMap.size();
         int numPoolsToSelect = _tagPoolConfig.getNumPools();
         if (numPoolsToSelect > 0) {
           Preconditions
@@ -106,7 +162,7 @@ public class InstanceTagPoolSelector {
         // Directly return the map if all the pools are selected
         if (numPools == numPoolsToSelect) {
           LOGGER.info("Selecting all {} pools: {} for table: {}", numPools, pools, _tableNameWithType);
-          return poolToInstanceConfigsMap;
+          return poolToLatestInstanceConfigsMap;
         }
 
         // Select pools based on the table name hash to evenly distribute the tables
@@ -123,10 +179,37 @@ public class InstanceTagPoolSelector {
     } else {
       // Non-pool based selection
 
-      LOGGER.info("Selecting {} instances for table: {}", numCandidateInstances, _tableNameWithType);
+      LOGGER.info("Selecting {} instances for table: {}", candidateInstanceConfigsMap.size(), _tableNameWithType);
       // Put all instance configs as pool 0
-      poolToInstanceConfigsMap.put(0, candidateInstanceConfigs);
+
+      for (Map.Entry<Integer, List<String>> entry : partitionToInstancesMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<String> existingInstanceAssignmentInPool = entry.getValue();
+        List<InstanceConfig> candidateInstanceConfigsWithSequence = new ArrayList<>();
+        for (String existingInstance: existingInstanceAssignmentInPool) {
+          InstanceConfig instanceConfig = candidateInstanceConfigsMap.get(existingInstance);
+          // Add instances to the candidate list and respect the sequence of the existing instances from the ZK.
+          // The missing/removed instances will be replaced by the newly instances.
+          // If the instance still exists from ZK, then add it to the candidate list.
+          // E.g. if the old instances are: [I1, I2, I3, I4] and the new instance are: [I1, I3, I4, I5, I6],
+          // the removed instance is I2 and the newly added instances are I5 and I6.
+          // The position of I2 would be replaced by I5, the new remaining I6 would be appended to the tail.
+          // Thus, the new order would be [I1, I5, I3, I4, I6].
+          if (instanceConfig != null) {
+            candidateInstanceConfigsWithSequence.add(instanceConfig);
+          } else {
+            // The current chosen instance no longer lives in the cluster any more, thus pick a new instance.
+            candidateInstanceConfigsWithSequence.add(candidateInstanceConfigsMap.get(newlyAddedInstances.pollFirst()));
+          }
+        }
+        poolToLatestInstanceConfigsMap.put(pool, candidateInstanceConfigsWithSequence);
+      }
+      // The preceding list of instances has been traversed. Add the remaining new instances.
+      for (String remainingNewInstance : newlyAddedInstances) {
+        poolToLatestInstanceConfigsMap.computeIfAbsent(0, k -> new ArrayList<>())
+            .add(candidateInstanceConfigsMap.get(remainingNewInstance));
+      }
     }
-    return poolToInstanceConfigsMap;
+    return poolToLatestInstanceConfigsMap;
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java
index 70bebab..d08fbbf 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java
@@ -33,6 +33,10 @@ public class RebalanceConfigConstants {
   public static final String REASSIGN_INSTANCES = "reassignInstances";
   public static final boolean DEFAULT_REASSIGN_INSTANCES = false;
 
+  // Whether to retain the sequence for the existing instances
+  public static final String RETAIN_INSTANCE_SEQUENCE = "retainInstancesSequence";
+  public static final boolean DEFAULT_RETAIN_INSTANCE_SEQUENCE = false;
+
   // Whether to reassign CONSUMING segments
   public static final String INCLUDE_CONSUMING = "includeConsuming";
   public static final boolean DEFAULT_INCLUDE_CONSUMING = false;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index d6701c8..005a1ef 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.rebalance;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -145,6 +146,8 @@ public class TableRebalancer {
         tableConfig.getRoutingConfig().getInstanceSelectorType());
     boolean bestEfforts = rebalanceConfig.getBoolean(RebalanceConfigConstants.BEST_EFFORTS,
         RebalanceConfigConstants.DEFAULT_BEST_EFFORTS);
+    boolean retainInstanceSequence = rebalanceConfig.getBoolean(RebalanceConfigConstants.RETAIN_INSTANCE_SEQUENCE,
+        RebalanceConfigConstants.DEFAULT_RETAIN_INSTANCE_SEQUENCE);
     LOGGER.info(
         "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, "
             + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, bestEfforts: {}",
@@ -194,7 +197,7 @@ public class TableRebalancer {
     // Calculate instance partitions map
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap;
     try {
-      instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, dryRun);
+      instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, dryRun, retainInstanceSequence);
     } catch (Exception e) {
       LOGGER.warn(
           "Caught exception while fetching/calculating instance partitions for table: {}, aborting the rebalance",
@@ -323,7 +326,7 @@ public class TableRebalancer {
           currentIdealState = idealState;
           currentAssignment = currentIdealState.getRecord().getMapFields();
           // Re-calculate the instance partitions in case the instance configs changed during the rebalance
-          instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, false);
+          instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, false, retainInstanceSequence);
           tierToInstancePartitionsMap = getTierToInstancePartitionsMap(tableNameWithType, sortedTiers);
           targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers,
               tierToInstancePartitionsMap, rebalanceConfig);
@@ -381,21 +384,24 @@ public class TableRebalancer {
   }
 
   private Map<InstancePartitionsType, InstancePartitions> getInstancePartitionsMap(TableConfig tableConfig,
-      boolean reassignInstances, boolean dryRun) {
+      boolean reassignInstances, boolean dryRun, boolean retainInstanceSequence) {
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>();
     if (tableConfig.getTableType() == TableType.OFFLINE) {
       instancePartitionsMap.put(InstancePartitionsType.OFFLINE,
-          getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, dryRun));
+          getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, dryRun,
+              retainInstanceSequence));
     } else {
       instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
-          getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, dryRun));
+          getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, dryRun,
+              retainInstanceSequence));
       String tableNameWithType = tableConfig.getTableName();
       if (InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) {
         LOGGER.info(
             "COMPLETED segments should be relocated, fetching/computing COMPLETED instance partitions for table: {}",
             tableNameWithType);
         instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
-            getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, dryRun));
+            getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, dryRun,
+                retainInstanceSequence));
       } else {
         LOGGER.info(
             "COMPLETED segments should not be relocated, skipping fetching/computing COMPLETED instance partitions "
@@ -413,14 +419,22 @@ public class TableRebalancer {
   }
 
   private InstancePartitions getInstancePartitions(TableConfig tableConfig,
-      InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean dryRun) {
+      InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean dryRun,
+      boolean retainInstanceSequence) {
     String tableNameWithType = tableConfig.getTableName();
     if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, instancePartitionsType)) {
       if (reassignInstances) {
+        Map<Integer, List<String>> partitionsToInstancesMap = Collections.emptyMap();
+        if (retainInstanceSequence) {
+          InstancePartitions existingInstancePartitions = InstancePartitionsUtils
+              .fetchOrComputeInstancePartitions(_helixManager, tableConfig, instancePartitionsType);
+          partitionsToInstancesMap = existingInstancePartitions.getPartitionToInstancesMap();
+        }
         LOGGER.info("Reassigning {} instances for table: {}", instancePartitionsType, tableNameWithType);
         InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
         InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType,
-            _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true));
+            _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true),
+            partitionsToInstancesMap);
         if (!dryRun) {
           LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions);
           InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index f1fc049..0bbee49 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -21,7 +21,9 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
@@ -71,7 +73,8 @@ public class InstanceAssignmentTest {
     }
 
     // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 2 instances
-    InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE,
+        instanceConfigs, Collections.emptyMap());
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     // Instances of index 4 to 7 are not assigned because of the hash-based rotation
@@ -95,7 +98,8 @@ public class InstanceAssignmentTest {
 
     // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3
     // instances should be assigned to 2 partitions, each with 2 instances
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions = driver
+        .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
     assertEquals(instancePartitions.getNumPartitions(), numPartitions);
     // Instance of index 7 is not assigned because of the hash-based rotation
@@ -123,6 +127,91 @@ public class InstanceAssignmentTest {
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
     assertEquals(instancePartitions.getInstances(1, 2),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 6));
+
+    // ===== Test against the cases when partitionToInstancesMap isn't empty. =====
+    // Put the existing partition to instances map as the parameter to the InstanceAssignmentDriver
+    // instead of passing an empty map.
+    // The returned instance partition should be the same as the last computed one.
+    Map<Integer, List<String>> existingPartitionToInstancesMap = new HashMap<>();
+    List<String> instances = Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1,
+        SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4,
+        SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7,
+        SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9);
+    existingPartitionToInstancesMap.put(0, instances);
+
+    // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3
+    // instances should be assigned to 2 partitions, each with 2 instances
+    instancePartitions = driver
+        .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+    assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+    // Instance of index 7 is not assigned because of the hash-based rotation
+    // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8
+    // [i8, i9, i0, i1, i2, i3, i4, i5, i6, i7]
+    //  r0, r1, r2, r0, r1, r2, r0, r1, r2
+    // r0: [i8, i1, i4]
+    //      p0, p0, p1
+    //      p1
+    // r1: [i9, i2, i5]
+    //      p0, p0, p1
+    //      p1
+    // r2: [i0, i3, i6]
+    //      p0, p0, p1
+    //      p1
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(1, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 9));
+    assertEquals(instancePartitions.getInstances(1, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
+    assertEquals(instancePartitions.getInstances(1, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 6));
+
+    // Remove two instances (i2, i6) and add two new instances (i10, i11).
+    instanceConfigs.remove(6);
+    instanceConfigs.remove(2);
+    for (int i = numInstances; i < numInstances + 2; i++) {
+      InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      instanceConfigs.add(instanceConfig);
+    }
+    // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3
+    // instances should be assigned to 2 partitions, each with 2 instances
+    instancePartitions = driver
+        .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+    assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+    // Instance of index 7 is not assigned because of the hash-based rotation
+    // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8
+    // [i8, i9, i0, i1, i10, i3, i4, i5, i11, i7]
+    //  r0, r1, r2, r0, r1, r2, r0, r1, r2
+    // r0: [i8, i1, i4]
+    //      p0, p0, p1
+    //      p1
+    // r1: [i9, i10, i5]
+    //      p0, p0, p1
+    //      p1
+    // r2: [i0, i3, i11]
+    //      p0, p0, p1
+    //      p1
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(1, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 9));
+    assertEquals(instancePartitions.getInstances(1, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
+    assertEquals(instancePartitions.getInstances(1, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 11));
   }
 
   @Test
@@ -155,7 +244,8 @@ public class InstanceAssignmentTest {
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
     // replica-group 1
-    InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs,
+        Collections.emptyMap());
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -182,7 +272,8 @@ public class InstanceAssignmentTest {
     // Pool 0 and 2 will be selected in the pool selection
     // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 2 should be assigned to
     // replica-group 1
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -200,7 +291,154 @@ public class InstanceAssignmentTest {
     // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
     // All instances in pool 2 should be assigned to replica-group 0, and all instances in pool 0 should be assigned to
     // replica-group 1
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 12,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 14));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+
+    // Select pool 0 and 1 in pool selection
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1));
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
+    // replica-group 1
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
+
+    // Assign instances from 2 pools to 3 replica-groups
+    numReplicaGroups = numPools;
+    replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0);
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // [pool0, pool1]
+    //  r0     r1
+    //  r2
+    // Each replica-group should have 2 instances assigned
+    // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+    // pool 0: [i3, i4, i0, i1, i2]
+    //          r0  r2  r0  r2
+    // pool 1: [i8, i9, i5, i6, i7]
+    //          r1  r1
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4));
+
+    // ===== Test against the cases when partitionToInstancesMap isn't empty. =====
+    // Reset the number of replica groups to 2 and pools to 2.
+    numReplicaGroups = 2;
+    numPools = 2;
+    replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0);
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+    // Reset the instance configs to have only two pools.
+    instanceConfigs.clear();
+    numInstances = 10;
+    for (int i = 0; i < numInstances; i++) {
+      InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      int pool = i / numInstancesPerPool;
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Put the existing partition to instances map as the parameter to the InstanceAssignmentDriver.
+    Map<Integer, List<String>> existingPartitionToInstancesMap = new HashMap<>();
+    existingPartitionToInstancesMap.put(0, Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+    existingPartitionToInstancesMap.put(1, Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
+
+    // Use all pools, the instance partition should be the same as the one without using
+    // the existing partition to instances map.
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
+    // replica-group 1.
+    // [pool0, pool1]
+    //  r0     r1
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
+
+    // Add the third pool with same number of instances
+    numPools = 3;
+    numInstances = numPools * numInstancesPerPool;
+    for (int i = numInstances - numInstancesPerPool; i < numInstances; i++) {
+      InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      int pool = numPools - 1;
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Putting the existingPartitionToInstancesMap shouldn't change the instance assignment.
+    // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // Pool 0 and 2 will be selected in the pool selection
+    // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 2 should be assigned to
+    // replica-group 1.
+    // [pool0, pool1, pool2]
+    //   r0             r1
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 12,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 14));
+
+    // Select all 3 pools in pool selection
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+
+    // Putting the existingPartitionToInstancesMap shouldn't change the instance assignment.
+    // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+    // All instances in pool 2 should be assigned to replica-group 0, and all instances in pool 0 should be assigned to
+    // replica-group 1.
+    // [pool0, pool1, pool2]
+    //   r1     r2      r0
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -215,10 +453,12 @@ public class InstanceAssignmentTest {
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
 
+    // Putting the existingPartitionToInstancesMap shouldn't change the instance assignment.
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
     // replica-group 1
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -244,7 +484,8 @@ public class InstanceAssignmentTest {
     //          r0  r2  r0  r2
     // pool 1: [i8, i9, i5, i6, i7]
     //          r1  r1
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0),
@@ -253,6 +494,40 @@ public class InstanceAssignmentTest {
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
     assertEquals(instancePartitions.getInstances(0, 2),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4));
+
+    instanceConfigs.remove(12);
+    instanceConfigs.remove(9);
+    instanceConfigs.remove(3);
+    int poolCount = 0;
+    for (int i = numInstances; i < numInstances + 3; i++) {
+      InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      int pool = poolCount++;
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // [pool0, pool1]
+    //  r0     r1
+    //  r2
+    // Each replica-group should have 2 instances assigned
+    // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+    // pool 0: [i15, i4, i0, i1, i2]
+    //          r0  r2  r0  r2
+    // pool 1: [i8, i16, i5, i6, i7]
+    //          r1  r1
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 15));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 16, SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4));
   }
 
   @Test
@@ -270,7 +545,7 @@ public class InstanceAssignmentTest {
     // No instance assignment config
     assertFalse(InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, InstancePartitionsType.OFFLINE));
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Instance assignment is not allowed for the given table config");
@@ -284,7 +559,7 @@ public class InstanceAssignmentTest {
 
     // No instance with correct tag
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "No enabled instance has the tag: tenant_OFFLINE");
@@ -295,7 +570,8 @@ public class InstanceAssignmentTest {
     }
 
     // All instances should be assigned as replica-group 0 partition 0
-    InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    InstancePartitions instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
     assertEquals(instancePartitions.getNumReplicaGroups(), 1);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     List<String> expectedInstances = new ArrayList<>(numInstances);
@@ -311,7 +587,7 @@ public class InstanceAssignmentTest {
 
     // No instance has correct pool configured
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "No enabled instance has the pool configured for the tag: tenant_OFFLINE");
@@ -328,7 +604,8 @@ public class InstanceAssignmentTest {
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // All instances in pool 0 should be assigned as replica-group 0 partition 0
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
     assertEquals(instancePartitions.getNumReplicaGroups(), 1);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     expectedInstances.clear();
@@ -343,7 +620,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many pools
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Not enough instance pools (2 in the cluster, asked for 3)");
@@ -355,7 +632,7 @@ public class InstanceAssignmentTest {
 
     // Ask for pool that does not exist
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Cannot find all instance pools configured: [0, 2]");
@@ -368,7 +645,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many instances
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Not enough qualified instances from pool: 0 (5 in the pool, asked for 6)");
@@ -381,7 +658,7 @@ public class InstanceAssignmentTest {
 
     // Number of replica-groups must be positive
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Number of replica-groups must be positive");
@@ -393,7 +670,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many replica-groups
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(),
@@ -406,7 +683,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many instances
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Not enough qualified instances from pool: 0 (5 in the pool, asked for 6)");
@@ -418,7 +695,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many instances per partition
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(),
@@ -434,7 +711,8 @@ public class InstanceAssignmentTest {
     //         r0  r2  r0  r2
     // pool1: [i8, i9, i5, i6, i7]
     //         r1  r1
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs,
+        Collections.emptyMap());
     assertEquals(instancePartitions.getNumReplicaGroups(), 3);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0),

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org