You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2022/03/30 17:54:47 UTC

[pinot] branch retain-instance-sequence created (now 564cb9a)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch retain-instance-sequence
in repository https://gitbox.apache.org/repos/asf/pinot.git.


      at 564cb9a  Add retainInstancesSequence feature to table rebalance to minimize data movement between instances

This branch includes the following new commits:

     new 564cb9a  Add retainInstancesSequence feature to table rebalance to minimize data movement between instances

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[pinot] 01/01: Add retainInstancesSequence feature to table rebalance to minimize data movement between instances

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch retain-instance-sequence
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 564cb9a9669b7d7390a003eb3dcc0b848689b3ec
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Wed Mar 30 10:54:14 2022 -0700

    Add retainInstancesSequence feature to table rebalance to minimize data movement between instances
---
 .../common/assignment/InstancePartitions.java      |  77 ++++-
 .../PinotInstanceAssignmentRestletResource.java    |  15 +-
 .../helix/core/PinotHelixResourceManager.java      |   2 +-
 .../instance/InstanceAssignmentDriver.java         |  43 ++-
 .../instance/InstanceTagPoolSelector.java          | 125 ++++++--
 .../core/rebalance/RebalanceConfigConstants.java   |   4 +
 .../helix/core/rebalance/TableRebalancer.java      |  30 +-
 .../instance/InstanceAssignmentTest.java           | 318 +++++++++++++++++++--
 8 files changed, 542 insertions(+), 72 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
index c511077..811de20 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -58,14 +60,18 @@ import org.apache.pinot.spi.utils.JsonUtils;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class InstancePartitions {
   private static final char PARTITION_REPLICA_GROUP_SEPARATOR = '_';
+  private static final String PARTITIONS_KEY = "partitions";
+  private static final String INSTANCE_SEPARATOR = "/";
 
   private final String _instancePartitionsName;
-  private final Map<String, List<String>> _partitionToInstancesMap;
+  private final Map<String, List<String>> _partitionWithReplicaGroupToInstancesMap;
+  private final Map<Integer, List<String>> _partitionToInstancesMap;
   private int _numPartitions;
   private int _numReplicaGroups;
 
   public InstancePartitions(String instancePartitionsName) {
     _instancePartitionsName = instancePartitionsName;
+    _partitionWithReplicaGroupToInstancesMap = new TreeMap<>();
     _partitionToInstancesMap = new TreeMap<>();
   }
 
@@ -73,10 +79,18 @@ public class InstancePartitions {
   private InstancePartitions(
       @JsonProperty(value = "instancePartitionsName", required = true) String instancePartitionsName,
       @JsonProperty(value = "partitionToInstancesMap", required = true)
-          Map<String, List<String>> partitionToInstancesMap) {
+          Map<String, List<String>> partitionWithReplicaGroupToInstancesMap,
+      @JsonProperty(value = "partitionToInstancesMap", required = true)
+          Map<String, String> partitionsMap) {
     _instancePartitionsName = instancePartitionsName;
-    _partitionToInstancesMap = partitionToInstancesMap;
-    for (String key : partitionToInstancesMap.keySet()) {
+    _partitionWithReplicaGroupToInstancesMap = partitionWithReplicaGroupToInstancesMap;
+    _partitionToInstancesMap = new TreeMap<>();
+    if (partitionsMap != null) {
+      for (Map.Entry<String, String> entry : partitionsMap.entrySet()) {
+        _partitionToInstancesMap.put(Integer.parseInt(entry.getKey()), extractInstances(entry.getValue()));
+      }
+    }
+    for (String key : partitionWithReplicaGroupToInstancesMap.keySet()) {
       int separatorIndex = key.indexOf(PARTITION_REPLICA_GROUP_SEPARATOR);
       int partitionId = Integer.parseInt(key.substring(0, separatorIndex));
       int replicaGroupId = Integer.parseInt(key.substring(separatorIndex + 1));
@@ -91,8 +105,8 @@ public class InstancePartitions {
   }
 
   @JsonProperty
-  public Map<String, List<String>> getPartitionToInstancesMap() {
-    return _partitionToInstancesMap;
+  public Map<String, List<String>> getPartitionWithReplicaGroupToInstancesMap() {
+    return _partitionWithReplicaGroupToInstancesMap;
   }
 
   @JsonIgnore
@@ -105,25 +119,68 @@ public class InstancePartitions {
     return _numReplicaGroups;
   }
 
+  @JsonIgnore
+  public Map<Integer, List<String>> getPartitionToInstancesMap() {
+    return _partitionToInstancesMap;
+  }
+
   public List<String> getInstances(int partitionId, int replicaGroupId) {
-    return _partitionToInstancesMap
+    return _partitionWithReplicaGroupToInstancesMap
         .get(Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR + replicaGroupId);
   }
 
   public void setInstances(int partitionId, int replicaGroupId, List<String> instances) {
     String key = Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR + replicaGroupId;
-    _partitionToInstancesMap.put(key, instances);
+    _partitionWithReplicaGroupToInstancesMap.put(key, instances);
     _numPartitions = Integer.max(_numPartitions, partitionId + 1);
     _numReplicaGroups = Integer.max(_numReplicaGroups, replicaGroupId + 1);
   }
 
+  public void setPartitionToInstancesMap(Map<Integer, List<String>> partitionToInstancesMap) {
+    _partitionToInstancesMap.putAll(partitionToInstancesMap);
+  }
+
   public static InstancePartitions fromZNRecord(ZNRecord znRecord) {
-    return new InstancePartitions(znRecord.getId(), znRecord.getListFields());
+    return new InstancePartitions(znRecord.getId(), znRecord.getListFields(), znRecord.getMapField(PARTITIONS_KEY));
+  }
+
+  private static List<String> extractInstances(String instancesRawString) {
+    if (instancesRawString == null || instancesRawString.length() == 0) {
+      return Collections.emptyList();
+    }
+    String[] instancesArray = instancesRawString.split(INSTANCE_SEPARATOR);
+    List<String> instances = new ArrayList<>(instancesArray.length);
+    Collections.addAll(instances, instancesArray);
+    return instances;
+  }
+
+  private String convertInstancesToString(List<String> instances) {
+    if (instances == null || instances.isEmpty()) {
+      return "";
+    }
+    StringBuilder stringBuilder = new StringBuilder();
+    for (String instance : instances) {
+      if (stringBuilder.length() == 0) {
+        stringBuilder.append(instance);
+      } else {
+        stringBuilder.append(INSTANCE_SEPARATOR).append(instance);
+      }
+    }
+    return stringBuilder.toString();
+  }
+
+  private Map<String, String> convertListToStringMap() {
+    Map<String, String> convertedMap = new TreeMap<>();
+    for (Map.Entry<Integer, List<String>> entry : _partitionToInstancesMap.entrySet()) {
+      convertedMap.put(Integer.toString(entry.getKey()), convertInstancesToString(entry.getValue()));
+    }
+    return convertedMap;
   }
 
   public ZNRecord toZNRecord() {
     ZNRecord znRecord = new ZNRecord(_instancePartitionsName);
-    znRecord.setListFields(_partitionToInstancesMap);
+    znRecord.setListFields(_partitionWithReplicaGroupToInstancesMap);
+    znRecord.setMapField(PARTITIONS_KEY, convertListToStringMap());
     return znRecord;
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
index 8d9e6cc..58e677d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
@@ -137,7 +137,7 @@ public class PinotInstanceAssignmentRestletResource {
           if (InstanceAssignmentConfigUtils
               .allowInstanceAssignment(offlineTableConfig, InstancePartitionsType.OFFLINE)) {
             instancePartitionsMap.put(InstancePartitionsType.OFFLINE, new InstanceAssignmentDriver(offlineTableConfig)
-                .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs));
+                .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()));
           }
         } catch (IllegalStateException e) {
           throw new ControllerApplicationException(LOGGER, "Caught IllegalStateException", Response.Status.BAD_REQUEST,
@@ -156,15 +156,15 @@ public class PinotInstanceAssignmentRestletResource {
           if (instancePartitionsType == InstancePartitionsType.CONSUMING || instancePartitionsType == null) {
             if (InstanceAssignmentConfigUtils
                 .allowInstanceAssignment(realtimeTableConfig, InstancePartitionsType.CONSUMING)) {
-              instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
-                  instanceAssignmentDriver.assignInstances(InstancePartitionsType.CONSUMING, instanceConfigs));
+              instancePartitionsMap.put(InstancePartitionsType.CONSUMING, instanceAssignmentDriver
+                  .assignInstances(InstancePartitionsType.CONSUMING, instanceConfigs, Collections.emptyMap()));
             }
           }
           if (instancePartitionsType == InstancePartitionsType.COMPLETED || instancePartitionsType == null) {
             if (InstanceAssignmentConfigUtils
                 .allowInstanceAssignment(realtimeTableConfig, InstancePartitionsType.COMPLETED)) {
-              instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
-                  instanceAssignmentDriver.assignInstances(InstancePartitionsType.COMPLETED, instanceConfigs));
+              instancePartitionsMap.put(InstancePartitionsType.COMPLETED, instanceAssignmentDriver
+                  .assignInstances(InstancePartitionsType.COMPLETED, instanceConfigs, Collections.emptyMap()));
             }
           }
         } catch (IllegalStateException e) {
@@ -295,8 +295,9 @@ public class PinotInstanceAssignmentRestletResource {
     while (iterator.hasNext()) {
       InstancePartitions instancePartitions = iterator.next();
       boolean oldInstanceFound = false;
-      Map<String, List<String>> partitionToInstancesMap = instancePartitions.getPartitionToInstancesMap();
-      for (List<String> instances : partitionToInstancesMap.values()) {
+      Map<String, List<String>> partitionWithReplicaGroupToInstancesMap =
+          instancePartitions.getPartitionWithReplicaGroupToInstancesMap();
+      for (List<String> instances : partitionWithReplicaGroupToInstancesMap.values()) {
         oldInstanceFound |= Collections.replaceAll(instances, oldInstanceId, newInstanceId);
       }
       if (oldInstanceFound) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ecbd3b7..754146a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1648,7 +1648,7 @@ public class PinotHelixResourceManager {
       List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
       for (InstancePartitionsType instancePartitionsType : instancePartitionsTypesToAssign) {
         InstancePartitions instancePartitions =
-            instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs);
+            instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, Collections.emptyMap());
         LOGGER.info("Persisting instance partitions: {}", instancePartitions);
         InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
       }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index 1440fae..4f60d2c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
@@ -51,16 +52,31 @@ public class InstanceAssignmentDriver {
     _tableConfig = tableConfig;
   }
 
+  /**
+   * Assign instances to InstancePartitions object.
+   * @param instancePartitionsType type of instance partitions
+   * @param instanceConfigs list of instance configs
+   * @param partitionToInstancesMap existing instance with sequence that should be respected. An empty list
+   *                                      means no preceding sequence to respect and the instances would be sorted.
+   */
   public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType,
-      List<InstanceConfig> instanceConfigs) {
+      List<InstanceConfig> instanceConfigs, Map<Integer, List<String>> partitionToInstancesMap) {
+    boolean shouldRetainInstanceSequence = !partitionToInstancesMap.isEmpty();
     String tableNameWithType = _tableConfig.getTableName();
-    LOGGER.info("Starting {} instance assignment for table: {}", instancePartitionsType, tableNameWithType);
+    LOGGER.info("Starting {} instance assignment for table: {}. Should retain instance sequence: {}",
+        instancePartitionsType, tableNameWithType, shouldRetainInstanceSequence);
 
     InstanceAssignmentConfig assignmentConfig =
         InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType);
     InstanceTagPoolSelector tagPoolSelector =
         new InstanceTagPoolSelector(assignmentConfig.getTagPoolConfig(), tableNameWithType);
-    Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = tagPoolSelector.selectInstances(instanceConfigs);
+    Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap =
+        tagPoolSelector.selectInstances(instanceConfigs, partitionToInstancesMap);
+
+    InstancePartitions instancePartitions = new InstancePartitions(
+        instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)));
+    instancePartitions
+        .setPartitionToInstancesMap(extractInstanceNamesFromPoolToInstanceConfigsMap(poolToInstanceConfigsMap));
 
     InstanceConstraintConfig constraintConfig = assignmentConfig.getConstraintConfig();
     List<InstanceConstraintApplier> constraintAppliers = new ArrayList<>();
@@ -75,9 +91,26 @@ public class InstanceAssignmentDriver {
 
     InstanceReplicaGroupPartitionSelector replicaPartitionSelector =
         new InstanceReplicaGroupPartitionSelector(assignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType);
-    InstancePartitions instancePartitions = new InstancePartitions(
-        instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)));
     replicaPartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions);
     return instancePartitions;
   }
+
+  private Map<Integer, List<String>> extractInstanceNamesFromPoolToInstanceConfigsMap(
+      Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap) {
+    Map<Integer, List<String>> partitionToInstancesMap = new TreeMap<>();
+    for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+      Integer pool = entry.getKey();
+      List<InstanceConfig> instanceConfigs = entry.getValue();
+      partitionToInstancesMap.put(pool, extractInstanceNamesFromInstanceConfigs(instanceConfigs));
+    }
+    return partitionToInstancesMap;
+  }
+
+  private List<String> extractInstanceNamesFromInstanceConfigs(List<InstanceConfig> instanceConfigs) {
+    List<String> instanceNames = new ArrayList<>(instanceConfigs.size());
+    for (InstanceConfig instanceConfig : instanceConfigs) {
+      instanceNames.add(instanceConfig.getInstanceName());
+    }
+    return instanceNames;
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 5aefd1a..b56c7ed 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -20,7 +20,10 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
 
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -48,52 +51,105 @@ public class InstanceTagPoolSelector {
 
   /**
    * Returns a map from pool to instance configs based on the tag and pool config for the given instance configs.
+   * @param instanceConfigs list of latest instance configs from ZK.
+   * @param partitionToInstancesMap existing instance with sequence that should be respected. An empty list
+   *                                      means no preceding sequence to respect and the instances would be sorted.
    */
-  public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> instanceConfigs) {
+  public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> instanceConfigs,
+      Map<Integer, List<String>> partitionToInstancesMap) {
     int tableNameHash = Math.abs(_tableNameWithType.hashCode());
     LOGGER.info("Starting instance tag/pool selection for table: {} with hash: {}", _tableNameWithType, tableNameHash);
 
-    // Filter out the instances with the correct tag
+    // Filter out the instances with the correct tag.
     String tag = _tagPoolConfig.getTag();
-    List<InstanceConfig> candidateInstanceConfigs = new ArrayList<>();
+    Map<String, InstanceConfig> candidateInstanceConfigsMap = new LinkedHashMap<>();
     for (InstanceConfig instanceConfig : instanceConfigs) {
       if (instanceConfig.getTags().contains(tag)) {
-        candidateInstanceConfigs.add(instanceConfig);
+        candidateInstanceConfigsMap.put(instanceConfig.getInstanceName(), instanceConfig);
       }
     }
-    candidateInstanceConfigs.sort(Comparator.comparing(InstanceConfig::getInstanceName));
-    int numCandidateInstances = candidateInstanceConfigs.size();
+
+    // Find out newly added instances from the latest copies of instance configs.
+    Deque<String> newlyAddedInstances = new LinkedList<>(candidateInstanceConfigsMap.keySet());
+    for (List<String> existingInstancesWithSequence : partitionToInstancesMap.values()) {
+      newlyAddedInstances.removeAll(existingInstancesWithSequence);
+    }
+
+    int numCandidateInstances = candidateInstanceConfigsMap.size();
     Preconditions.checkState(numCandidateInstances > 0, "No enabled instance has the tag: %s", tag);
     LOGGER.info("{} enabled instances have the tag: {} for table: {}", numCandidateInstances, tag, _tableNameWithType);
 
-    Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new TreeMap<>();
+    // Each pool number associates with a map that key is the instance name and value is the instance config.
+    Map<Integer, Map<String, InstanceConfig>> poolToInstanceConfigsMap = new HashMap<>();
+    // Each pool number associates with a list of newly added instance configs,
+    // so that new instances can be fetched from this list.
+    Map<Integer, Deque<InstanceConfig>> poolToNewInstanceConfigsMap = new HashMap<>();
+
+    // Extract the pool information from the instance configs.
+    for (Map.Entry<String,InstanceConfig> entry : candidateInstanceConfigsMap.entrySet()) {
+      String instanceName = entry.getKey();
+      InstanceConfig instanceConfig = entry.getValue();
+      Map<String, String> poolMap = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY);
+      if (poolMap != null && poolMap.containsKey(tag)) {
+        int pool = Integer.parseInt(poolMap.get(tag));
+        poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new TreeMap<>()).put(instanceName, instanceConfig);
+        if (newlyAddedInstances.contains(instanceName)) {
+          poolToNewInstanceConfigsMap.computeIfAbsent(pool, k -> new LinkedList<>()).add(instanceConfig);
+        }
+      }
+    }
+
+    Map<Integer, List<InstanceConfig>> poolToLatestInstanceConfigsMap = new TreeMap<>();
     if (_tagPoolConfig.isPoolBased()) {
       // Pool based selection
 
-      // Extract the pool information from the instance configs
-      for (InstanceConfig instanceConfig : candidateInstanceConfigs) {
-        Map<String, String> poolMap = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY);
-        if (poolMap != null && poolMap.containsKey(tag)) {
-          int pool = Integer.parseInt(poolMap.get(tag));
-          poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(instanceConfig);
+      for (Map.Entry<Integer, List<String>> entry : partitionToInstancesMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<String> existingInstanceAssignmentInPool = entry.getValue();
+        List<InstanceConfig> candidateInstanceConfigsWithSequence = new ArrayList<>();
+        for (String existingInstance: existingInstanceAssignmentInPool) {
+          InstanceConfig instanceConfig = poolToInstanceConfigsMap.get(pool).get(existingInstance);
+          // Add instances to the candidate list and respect the sequence of the existing instances from the ZK.
+          // The missing/removed instances will be replaced by the newly instances.
+          // If the instance still exists from ZK, then add it to the candidate list.
+          // E.g. if the old instances are: [I1, I2, I3, I4] and the new instance are: [I1, I3, I4, I5, I6],
+          // the removed instance is I2 and the newly added instances are I5 and I6.
+          // The position of I2 would be replaced by I5, the new remaining I6 would be appended to the tail.
+          // Thus, the new order would be [I1, I5, I3, I4, I6].
+          if (instanceConfig != null) {
+            candidateInstanceConfigsWithSequence.add(instanceConfig);
+          } else {
+            // The current chosen instance no longer lives in the cluster any more, thus pick a new instance.
+            candidateInstanceConfigsWithSequence.add(poolToNewInstanceConfigsMap.get(pool).pollFirst());
+          }
         }
+        poolToLatestInstanceConfigsMap.put(pool, candidateInstanceConfigsWithSequence);
       }
+
+      // The preceding list of instances has been traversed. Add the remaining new instances.
+      for (Map.Entry<Integer, Deque<InstanceConfig>> entry : poolToNewInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        Deque<InstanceConfig> remainingNewInstanceConfigs = entry.getValue();
+        poolToLatestInstanceConfigsMap.computeIfAbsent(pool, k -> new ArrayList<>())
+            .addAll(remainingNewInstanceConfigs);
+      }
+
       Preconditions.checkState(!poolToInstanceConfigsMap.isEmpty(),
           "No enabled instance has the pool configured for the tag: %s", tag);
       Map<Integer, Integer> poolToNumInstancesMap = new TreeMap<>();
-      for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToLatestInstanceConfigsMap.entrySet()) {
         poolToNumInstancesMap.put(entry.getKey(), entry.getValue().size());
       }
       LOGGER.info("Number instances for each pool: {} for table: {}", poolToNumInstancesMap, _tableNameWithType);
 
       // Calculate the pools to select based on the selection config
-      Set<Integer> pools = poolToInstanceConfigsMap.keySet();
+      Set<Integer> pools = poolToLatestInstanceConfigsMap.keySet();
       List<Integer> poolsToSelect = _tagPoolConfig.getPools();
       if (poolsToSelect != null && !poolsToSelect.isEmpty()) {
         Preconditions.checkState(pools.containsAll(poolsToSelect), "Cannot find all instance pools configured: %s",
             poolsToSelect);
       } else {
-        int numPools = poolToInstanceConfigsMap.size();
+        int numPools = poolToLatestInstanceConfigsMap.size();
         int numPoolsToSelect = _tagPoolConfig.getNumPools();
         if (numPoolsToSelect > 0) {
           Preconditions
@@ -106,7 +162,7 @@ public class InstanceTagPoolSelector {
         // Directly return the map if all the pools are selected
         if (numPools == numPoolsToSelect) {
           LOGGER.info("Selecting all {} pools: {} for table: {}", numPools, pools, _tableNameWithType);
-          return poolToInstanceConfigsMap;
+          return poolToLatestInstanceConfigsMap;
         }
 
         // Select pools based on the table name hash to evenly distribute the tables
@@ -123,10 +179,37 @@ public class InstanceTagPoolSelector {
     } else {
       // Non-pool based selection
 
-      LOGGER.info("Selecting {} instances for table: {}", numCandidateInstances, _tableNameWithType);
+      LOGGER.info("Selecting {} instances for table: {}", candidateInstanceConfigsMap.size(), _tableNameWithType);
       // Put all instance configs as pool 0
-      poolToInstanceConfigsMap.put(0, candidateInstanceConfigs);
+
+      for (Map.Entry<Integer, List<String>> entry : partitionToInstancesMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<String> existingInstanceAssignmentInPool = entry.getValue();
+        List<InstanceConfig> candidateInstanceConfigsWithSequence = new ArrayList<>();
+        for (String existingInstance: existingInstanceAssignmentInPool) {
+          InstanceConfig instanceConfig = candidateInstanceConfigsMap.get(existingInstance);
+          // Add instances to the candidate list and respect the sequence of the existing instances from the ZK.
+          // The missing/removed instances will be replaced by the newly instances.
+          // If the instance still exists from ZK, then add it to the candidate list.
+          // E.g. if the old instances are: [I1, I2, I3, I4] and the new instance are: [I1, I3, I4, I5, I6],
+          // the removed instance is I2 and the newly added instances are I5 and I6.
+          // The position of I2 would be replaced by I5, the new remaining I6 would be appended to the tail.
+          // Thus, the new order would be [I1, I5, I3, I4, I6].
+          if (instanceConfig != null) {
+            candidateInstanceConfigsWithSequence.add(instanceConfig);
+          } else {
+            // The current chosen instance no longer lives in the cluster any more, thus pick a new instance.
+            candidateInstanceConfigsWithSequence.add(candidateInstanceConfigsMap.get(newlyAddedInstances.pollFirst()));
+          }
+        }
+        poolToLatestInstanceConfigsMap.put(pool, candidateInstanceConfigsWithSequence);
+      }
+      // The preceding list of instances has been traversed. Add the remaining new instances.
+      for (String remainingNewInstance : newlyAddedInstances) {
+        poolToLatestInstanceConfigsMap.computeIfAbsent(0, k -> new ArrayList<>())
+            .add(candidateInstanceConfigsMap.get(remainingNewInstance));
+      }
     }
-    return poolToInstanceConfigsMap;
+    return poolToLatestInstanceConfigsMap;
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java
index 70bebab..d08fbbf 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java
@@ -33,6 +33,10 @@ public class RebalanceConfigConstants {
   public static final String REASSIGN_INSTANCES = "reassignInstances";
   public static final boolean DEFAULT_REASSIGN_INSTANCES = false;
 
+  // Whether to retain the sequence for the existing instances
+  public static final String RETAIN_INSTANCE_SEQUENCE = "retainInstancesSequence";
+  public static final boolean DEFAULT_RETAIN_INSTANCE_SEQUENCE = false;
+
   // Whether to reassign CONSUMING segments
   public static final String INCLUDE_CONSUMING = "includeConsuming";
   public static final boolean DEFAULT_INCLUDE_CONSUMING = false;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index d6701c8..005a1ef 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.rebalance;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -145,6 +146,8 @@ public class TableRebalancer {
         tableConfig.getRoutingConfig().getInstanceSelectorType());
     boolean bestEfforts = rebalanceConfig.getBoolean(RebalanceConfigConstants.BEST_EFFORTS,
         RebalanceConfigConstants.DEFAULT_BEST_EFFORTS);
+    boolean retainInstanceSequence = rebalanceConfig.getBoolean(RebalanceConfigConstants.RETAIN_INSTANCE_SEQUENCE,
+        RebalanceConfigConstants.DEFAULT_RETAIN_INSTANCE_SEQUENCE);
     LOGGER.info(
         "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, "
             + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, bestEfforts: {}",
@@ -194,7 +197,7 @@ public class TableRebalancer {
     // Calculate instance partitions map
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap;
     try {
-      instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, dryRun);
+      instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, dryRun, retainInstanceSequence);
     } catch (Exception e) {
       LOGGER.warn(
           "Caught exception while fetching/calculating instance partitions for table: {}, aborting the rebalance",
@@ -323,7 +326,7 @@ public class TableRebalancer {
           currentIdealState = idealState;
           currentAssignment = currentIdealState.getRecord().getMapFields();
           // Re-calculate the instance partitions in case the instance configs changed during the rebalance
-          instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, false);
+          instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, false, retainInstanceSequence);
           tierToInstancePartitionsMap = getTierToInstancePartitionsMap(tableNameWithType, sortedTiers);
           targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers,
               tierToInstancePartitionsMap, rebalanceConfig);
@@ -381,21 +384,24 @@ public class TableRebalancer {
   }
 
   private Map<InstancePartitionsType, InstancePartitions> getInstancePartitionsMap(TableConfig tableConfig,
-      boolean reassignInstances, boolean dryRun) {
+      boolean reassignInstances, boolean dryRun, boolean retainInstanceSequence) {
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>();
     if (tableConfig.getTableType() == TableType.OFFLINE) {
       instancePartitionsMap.put(InstancePartitionsType.OFFLINE,
-          getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, dryRun));
+          getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, dryRun,
+              retainInstanceSequence));
     } else {
       instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
-          getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, dryRun));
+          getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, dryRun,
+              retainInstanceSequence));
       String tableNameWithType = tableConfig.getTableName();
       if (InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) {
         LOGGER.info(
             "COMPLETED segments should be relocated, fetching/computing COMPLETED instance partitions for table: {}",
             tableNameWithType);
         instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
-            getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, dryRun));
+            getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, dryRun,
+                retainInstanceSequence));
       } else {
         LOGGER.info(
             "COMPLETED segments should not be relocated, skipping fetching/computing COMPLETED instance partitions "
@@ -413,14 +419,22 @@ public class TableRebalancer {
   }
 
   private InstancePartitions getInstancePartitions(TableConfig tableConfig,
-      InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean dryRun) {
+      InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean dryRun,
+      boolean retainInstanceSequence) {
     String tableNameWithType = tableConfig.getTableName();
     if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, instancePartitionsType)) {
       if (reassignInstances) {
+        Map<Integer, List<String>> partitionsToInstancesMap = Collections.emptyMap();
+        if (retainInstanceSequence) {
+          InstancePartitions existingInstancePartitions = InstancePartitionsUtils
+              .fetchOrComputeInstancePartitions(_helixManager, tableConfig, instancePartitionsType);
+          partitionsToInstancesMap = existingInstancePartitions.getPartitionToInstancesMap();
+        }
         LOGGER.info("Reassigning {} instances for table: {}", instancePartitionsType, tableNameWithType);
         InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
         InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType,
-            _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true));
+            _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true),
+            partitionsToInstancesMap);
         if (!dryRun) {
           LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions);
           InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index f1fc049..0bbee49 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -21,7 +21,9 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
@@ -71,7 +73,8 @@ public class InstanceAssignmentTest {
     }
 
     // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 2 instances
-    InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE,
+        instanceConfigs, Collections.emptyMap());
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     // Instances of index 4 to 7 are not assigned because of the hash-based rotation
@@ -95,7 +98,8 @@ public class InstanceAssignmentTest {
 
     // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3
     // instances should be assigned to 2 partitions, each with 2 instances
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions = driver
+        .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
     assertEquals(instancePartitions.getNumPartitions(), numPartitions);
     // Instance of index 7 is not assigned because of the hash-based rotation
@@ -123,6 +127,91 @@ public class InstanceAssignmentTest {
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
     assertEquals(instancePartitions.getInstances(1, 2),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 6));
+
+    // ===== Test against the cases when partitionToInstancesMap isn't empty. =====
+    // Put the existing partition to instances map as the parameter to the InstanceAssignmentDriver
+    // instead of passing an empty map.
+    // The returned instance partition should be the same as the last computed one.
+    Map<Integer, List<String>> existingPartitionToInstancesMap = new HashMap<>();
+    List<String> instances = Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1,
+        SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4,
+        SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7,
+        SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9);
+    existingPartitionToInstancesMap.put(0, instances);
+
+    // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3
+    // instances should be assigned to 2 partitions, each with 2 instances
+    instancePartitions = driver
+        .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+    assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+    // Instance of index 7 is not assigned because of the hash-based rotation
+    // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8
+    // [i8, i9, i0, i1, i2, i3, i4, i5, i6, i7]
+    //  r0, r1, r2, r0, r1, r2, r0, r1, r2
+    // r0: [i8, i1, i4]
+    //      p0, p0, p1
+    //      p1
+    // r1: [i9, i2, i5]
+    //      p0, p0, p1
+    //      p1
+    // r2: [i0, i3, i6]
+    //      p0, p0, p1
+    //      p1
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(1, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 9));
+    assertEquals(instancePartitions.getInstances(1, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
+    assertEquals(instancePartitions.getInstances(1, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 6));
+
+    // Remove two instances (i2, i6) and add two new instances (i10, i11).
+    instanceConfigs.remove(6);
+    instanceConfigs.remove(2);
+    for (int i = numInstances; i < numInstances + 2; i++) {
+      InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      instanceConfigs.add(instanceConfig);
+    }
+    // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3
+    // instances should be assigned to 2 partitions, each with 2 instances
+    instancePartitions = driver
+        .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+    assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+    // Instance of index 7 is not assigned because of the hash-based rotation
+    // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8
+    // [i8, i9, i0, i1, i10, i3, i4, i5, i11, i7]
+    //  r0, r1, r2, r0, r1, r2, r0, r1, r2
+    // r0: [i8, i1, i4]
+    //      p0, p0, p1
+    //      p1
+    // r1: [i9, i10, i5]
+    //      p0, p0, p1
+    //      p1
+    // r2: [i0, i3, i11]
+    //      p0, p0, p1
+    //      p1
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(1, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 9));
+    assertEquals(instancePartitions.getInstances(1, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
+    assertEquals(instancePartitions.getInstances(1, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 11));
   }
 
   @Test
@@ -155,7 +244,8 @@ public class InstanceAssignmentTest {
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
     // replica-group 1
-    InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs,
+        Collections.emptyMap());
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -182,7 +272,8 @@ public class InstanceAssignmentTest {
     // Pool 0 and 2 will be selected in the pool selection
     // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 2 should be assigned to
     // replica-group 1
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -200,7 +291,154 @@ public class InstanceAssignmentTest {
     // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
     // All instances in pool 2 should be assigned to replica-group 0, and all instances in pool 0 should be assigned to
     // replica-group 1
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 12,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 14));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+
+    // Select pool 0 and 1 in pool selection
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1));
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
+    // replica-group 1
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
+
+    // Assign instances from 2 pools to 3 replica-groups
+    numReplicaGroups = numPools;
+    replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0);
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // [pool0, pool1]
+    //  r0     r1
+    //  r2
+    // Each replica-group should have 2 instances assigned
+    // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+    // pool 0: [i3, i4, i0, i1, i2]
+    //          r0  r2  r0  r2
+    // pool 1: [i8, i9, i5, i6, i7]
+    //          r1  r1
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4));
+
+    // ===== Test against the cases when partitionToInstancesMap isn't empty. =====
+    // Reset the number of replica groups to 2 and pools to 2.
+    numReplicaGroups = 2;
+    numPools = 2;
+    replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0);
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+    // Reset the instance configs to have only two pools.
+    instanceConfigs.clear();
+    numInstances = 10;
+    for (int i = 0; i < numInstances; i++) {
+      InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      int pool = i / numInstancesPerPool;
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Put the existing partition to instances map as the parameter to the InstanceAssignmentDriver.
+    Map<Integer, List<String>> existingPartitionToInstancesMap = new HashMap<>();
+    existingPartitionToInstancesMap.put(0, Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+    existingPartitionToInstancesMap.put(1, Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
+
+    // Use all pools, the instance partition should be the same as the one without using
+    // the existing partition to instances map.
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
+    // replica-group 1.
+    // [pool0, pool1]
+    //  r0     r1
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7,
+            SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
+
+    // Add the third pool with same number of instances
+    numPools = 3;
+    numInstances = numPools * numInstancesPerPool;
+    for (int i = numInstances - numInstancesPerPool; i < numInstances; i++) {
+      InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      int pool = numPools - 1;
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Putting the existingPartitionToInstancesMap shouldn't change the instance assignment.
+    // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // Pool 0 and 2 will be selected in the pool selection
+    // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 2 should be assigned to
+    // replica-group 1.
+    // [pool0, pool1, pool2]
+    //   r0             r1
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+            SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+    assertEquals(instancePartitions.getInstances(0, 1), Arrays
+        .asList(SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 12,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 14));
+
+    // Select all 3 pools in pool selection
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+
+    // Putting the existingPartitionToInstancesMap shouldn't change the instance assignment.
+    // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+    // All instances in pool 2 should be assigned to replica-group 0, and all instances in pool 0 should be assigned to
+    // replica-group 1.
+    // [pool0, pool1, pool2]
+    //   r1     r2      r0
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -215,10 +453,12 @@ public class InstanceAssignmentTest {
     tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
         new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
 
+    // Putting the existingPartitionToInstancesMap shouldn't change the instance assignment.
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
     // replica-group 1
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -244,7 +484,8 @@ public class InstanceAssignmentTest {
     //          r0  r2  r0  r2
     // pool 1: [i8, i9, i5, i6, i7]
     //          r1  r1
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
     assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0),
@@ -253,6 +494,40 @@ public class InstanceAssignmentTest {
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
     assertEquals(instancePartitions.getInstances(0, 2),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4));
+
+    instanceConfigs.remove(12);
+    instanceConfigs.remove(9);
+    instanceConfigs.remove(3);
+    int poolCount = 0;
+    for (int i = numInstances; i < numInstances + 3; i++) {
+      InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      int pool = poolCount++;
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // [pool0, pool1]
+    //  r0     r1
+    //  r2
+    // Each replica-group should have 2 instances assigned
+    // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+    // pool 0: [i15, i4, i0, i1, i2]
+    //          r0  r2  r0  r2
+    // pool 1: [i8, i16, i5, i6, i7]
+    //          r1  r1
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 15));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 16, SERVER_INSTANCE_ID_PREFIX + 8));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4));
   }
 
   @Test
@@ -270,7 +545,7 @@ public class InstanceAssignmentTest {
     // No instance assignment config
     assertFalse(InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, InstancePartitionsType.OFFLINE));
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Instance assignment is not allowed for the given table config");
@@ -284,7 +559,7 @@ public class InstanceAssignmentTest {
 
     // No instance with correct tag
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "No enabled instance has the tag: tenant_OFFLINE");
@@ -295,7 +570,8 @@ public class InstanceAssignmentTest {
     }
 
     // All instances should be assigned as replica-group 0 partition 0
-    InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    InstancePartitions instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
     assertEquals(instancePartitions.getNumReplicaGroups(), 1);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     List<String> expectedInstances = new ArrayList<>(numInstances);
@@ -311,7 +587,7 @@ public class InstanceAssignmentTest {
 
     // No instance has correct pool configured
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "No enabled instance has the pool configured for the tag: tenant_OFFLINE");
@@ -328,7 +604,8 @@ public class InstanceAssignmentTest {
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // All instances in pool 0 should be assigned as replica-group 0 partition 0
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
     assertEquals(instancePartitions.getNumReplicaGroups(), 1);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     expectedInstances.clear();
@@ -343,7 +620,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many pools
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Not enough instance pools (2 in the cluster, asked for 3)");
@@ -355,7 +632,7 @@ public class InstanceAssignmentTest {
 
     // Ask for pool that does not exist
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Cannot find all instance pools configured: [0, 2]");
@@ -368,7 +645,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many instances
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Not enough qualified instances from pool: 0 (5 in the pool, asked for 6)");
@@ -381,7 +658,7 @@ public class InstanceAssignmentTest {
 
     // Number of replica-groups must be positive
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Number of replica-groups must be positive");
@@ -393,7 +670,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many replica-groups
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(),
@@ -406,7 +683,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many instances
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(), "Not enough qualified instances from pool: 0 (5 in the pool, asked for 6)");
@@ -418,7 +695,7 @@ public class InstanceAssignmentTest {
 
     // Ask for too many instances per partition
     try {
-      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+      driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(),
@@ -434,7 +711,8 @@ public class InstanceAssignmentTest {
     //         r0  r2  r0  r2
     // pool1: [i8, i9, i5, i6, i7]
     //         r1  r1
-    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+    instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs,
+        Collections.emptyMap());
     assertEquals(instancePartitions.getNumReplicaGroups(), 3);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0),

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org