You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2023/11/05 19:05:06 UTC

(pinot) branch maintain-pool-selection-for-minimizeDataMovement updated (7cc4f32a0a -> 1a8c7a1433)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch maintain-pool-selection-for-minimizeDataMovement
in repository https://gitbox.apache.org/repos/asf/pinot.git


 discard 7cc4f32a0a Enhance the minimizeDataMovement to keep the existing pool assignment
     new 1a8c7a1433 Enhance the minimizeDataMovement to keep the existing pool assignment

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (7cc4f32a0a)
            \
             N -- N -- N   refs/heads/maintain-pool-selection-for-minimizeDataMovement (1a8c7a1433)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../helix/core/assignment/instance/InstanceAssignmentDriver.java     | 4 ++--
 .../assignment/instance/InstanceReplicaGroupPartitionSelector.java   | 5 ++---
 .../helix/core/assignment/instance/InstanceTagPoolSelector.java      | 5 ++---
 3 files changed, 6 insertions(+), 8 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


(pinot) 01/01: Enhance the minimizeDataMovement to keep the existing pool assignment

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch maintain-pool-selection-for-minimizeDataMovement
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 1a8c7a14330c835f2e232febe0ebf5c3dacd6472
Author: jlli_LinkedIn <jl...@linkedin.com>
AuthorDate: Sun Nov 5 10:55:11 2023 -0800

    Enhance the minimizeDataMovement to keep the existing pool assignment
---
 .../instance/InstanceAssignmentDriver.java         |  12 ++-
 .../InstanceReplicaGroupPartitionSelector.java     |  71 ++++++++++++--
 .../instance/InstanceTagPoolSelector.java          |  55 +++++++++--
 .../instance/InstanceAssignmentTest.java           | 103 +++++++++++++++++++++
 4 files changed, 220 insertions(+), 21 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index 6d869b86c1..ffde3fce1e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -30,6 +30,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,8 +65,8 @@ public class InstanceAssignmentDriver {
   }
 
   public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType,
-      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, @Nullable
-      InstancePartitions preConfiguredInstancePartitions) {
+      List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions,
+      @Nullable InstancePartitions preConfiguredInstancePartitions) {
     String tableNameWithType = _tableConfig.getTableName();
     InstanceAssignmentConfig assignmentConfig =
         InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType);
@@ -88,8 +89,11 @@ public class InstanceAssignmentDriver {
     String tableNameWithType = _tableConfig.getTableName();
     LOGGER.info("Starting {} instance assignment for table {}", instancePartitionsName, tableNameWithType);
 
+    InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig =
+        instanceAssignmentConfig.getReplicaGroupPartitionConfig();
     InstanceTagPoolSelector tagPoolSelector =
-        new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType);
+        new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType,
+            instanceReplicaGroupPartitionConfig, existingInstancePartitions);
     Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = tagPoolSelector.selectInstances(instanceConfigs);
 
     InstanceConstraintConfig constraintConfig = instanceAssignmentConfig.getConstraintConfig();
@@ -105,7 +109,7 @@ public class InstanceAssignmentDriver {
 
     InstancePartitionSelector instancePartitionSelector =
         InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(),
-            instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions,
+            instanceReplicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions,
             preConfiguredInstancePartitions);
     InstancePartitions instancePartitions = new InstancePartitions(instancePartitionsName);
     instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index de1e681d17..e0008eab80 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -21,19 +21,23 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Deque;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.TreeMap;
 import javax.annotation.Nullable;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.apache.pinot.spi.utils.Pairs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,16 +77,65 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
       Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
-      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-        // Pick one pool for each replica-group based on the table name hash
-        int pool = pools.get((tableNameHash + replicaId) % numPools);
-        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
-        replicaGroupIdToPoolMap.put(replicaId, pool);
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+        Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+        for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+          String instanceName = instanceConfig.getInstanceName();
+          candidateInstances.add(instanceName);
+          instanceToPoolMap.put(instanceName, pool);
+        }
+      }
 
-        Set<String> candidateInstances =
-            poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
-        List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool);
-        instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName()));
+      if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) {
+        // Keep the same pool for the replica group if it's already been used for the table.
+        int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+        int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);
+        for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) {
+          boolean foundExistingReplicaGroup = false;
+          for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingReplicaGroup; partitionId++) {
+            List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            for (String existingInstance : existingInstances) {
+              Integer existingPool = instanceToPoolMap.get(existingInstance);
+              if (existingPool != null & pools.contains(existingPool)) {
+                poolToReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new ArrayList<>()).add(replicaGroupId);
+                replicaGroupIdToPoolMap.put(replicaGroupId, existingPool);
+                foundExistingReplicaGroup = true;
+                break;
+              }
+            }
+          }
+        }
+        // Use a min heap to track the least frequently picked pool among all the pools
+        PriorityQueue<Pairs.IntPair> minHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator());
+        for (int pool : pools) {
+          int numExistingReplicaGroups =
+              poolToReplicaGroupIdsMap.get(pool) != null ? poolToReplicaGroupIdsMap.get(pool).size() : 0;
+          minHeap.add(new Pairs.IntPair(numExistingReplicaGroups, pool));
+        }
+        for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
+          if (replicaGroupIdToPoolMap.containsKey(replicaId)) {
+            continue;
+          }
+          // Increment the frequency for a given pool and put it back to the min heap to rotate the pool selection.
+          Pairs.IntPair pair = minHeap.remove();
+          int pool = pair.getRight();
+          pair.setLeft(pair.getLeft() + 1);
+          minHeap.add(pair);
+          poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
+          replicaGroupIdToPoolMap.put(replicaId, pool);
+        }
+      } else {
+        // Current default way to assign pool to replica groups.
+        for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
+          // Pick one pool for each replica-group based on the table name hash
+          int pool = pools.get((tableNameHash + replicaId) % numPools);
+          poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
+          replicaGroupIdToPoolMap.put(replicaId, pool);
+        }
       }
       LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap,
           _tableNameWithType);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 5aefd1ad69..d620509ac6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -21,12 +21,16 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import javax.annotation.Nullable;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.config.InstanceUtils;
+import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,9 +45,17 @@ public class InstanceTagPoolSelector {
   private final InstanceTagPoolConfig _tagPoolConfig;
   private final String _tableNameWithType;
 
-  public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType) {
+  private final InstanceReplicaGroupPartitionConfig _instanceReplicaGroupPartitionConfig;
+
+  private final InstancePartitions _existingInstancePartitions;
+
+  public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType,
+      InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig,
+      @Nullable InstancePartitions existingInstancePartitions) {
     _tagPoolConfig = tagPoolConfig;
     _tableNameWithType = tableNameWithType;
+    _instanceReplicaGroupPartitionConfig = instanceReplicaGroupPartitionConfig;
+    _existingInstancePartitions = existingInstancePartitions;
   }
 
   /**
@@ -70,12 +82,14 @@ public class InstanceTagPoolSelector {
     if (_tagPoolConfig.isPoolBased()) {
       // Pool based selection
 
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
       // Extract the pool information from the instance configs
       for (InstanceConfig instanceConfig : candidateInstanceConfigs) {
         Map<String, String> poolMap = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY);
         if (poolMap != null && poolMap.containsKey(tag)) {
           int pool = Integer.parseInt(poolMap.get(tag));
           poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(instanceConfig);
+          instanceToPoolMap.put(instanceConfig.getInstanceName(), pool);
         }
       }
       Preconditions.checkState(!poolToInstanceConfigsMap.isEmpty(),
@@ -96,9 +110,8 @@ public class InstanceTagPoolSelector {
         int numPools = poolToInstanceConfigsMap.size();
         int numPoolsToSelect = _tagPoolConfig.getNumPools();
         if (numPoolsToSelect > 0) {
-          Preconditions
-              .checkState(numPoolsToSelect <= numPools, "Not enough instance pools (%s in the cluster, asked for %s)",
-                  numPools, numPoolsToSelect);
+          Preconditions.checkState(numPoolsToSelect <= numPools,
+              "Not enough instance pools (%s in the cluster, asked for %s)", numPools, numPoolsToSelect);
         } else {
           numPoolsToSelect = numPools;
         }
@@ -109,11 +122,37 @@ public class InstanceTagPoolSelector {
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+        if (_instanceReplicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) {
+          // Keep the same pool if it's already been used for the table.
+          int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+          int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+          for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
+            for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
+              List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              for (String existingInstance : existingInstances) {
+                Integer existingPool = instanceToPoolMap.get(existingInstance);
+                if (existingPool != null & pools.contains(existingPool)) {
+                  poolsToSelect.add(existingPool);
+                }
+              }
+            }
+          }
+          LOGGER.info("Keep the same pool: {} for table: {}", poolsToSelect, _tableNameWithType);
+          // Skip selecting the existing pool.
+          List<Integer> poolsInCluster = new ArrayList<>(pools);
+          for (int i = 0; i < numPoolsToSelect; i++) {
+            if (poolsToSelect.contains(i)) {
+              continue;
+            }
+            poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+          }
+        } else {
+          // Select pools based on the table name hash to evenly distribute the tables
+          List<Integer> poolsInCluster = new ArrayList<>(pools);
+          for (int i = 0; i < numPoolsToSelect; i++) {
+            poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+          }
         }
       }
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index b25a529e10..0ef7da33dc 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -1693,6 +1693,109 @@ public class InstanceAssignmentTest {
     assertEquals(instancePartitions.getInstances(0, 1),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
             SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+
+    // The below is the test suite for testing out minimizeDataMovement with pool configs
+    // Add the third pool with same number of instances but keep number of pools the same (i.e. 2)
+    numPools = 3;
+    numInstances = numPools * numInstancesPerPool;
+    for (int i = numInstances + 4; i < numInstances + 9; i++) {
+      InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+      instanceConfig.addTag(OFFLINE_TAG);
+      int pool = numPools - 1;
+      instanceConfig.getRecord()
+          .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+      instanceConfigs.add(instanceConfig);
+    }
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2, but since minimizeDataMovement is enabled,
+    // same pools would be re-used.
+    // [pool0, pool1]
+    //  r0     r1
+    // Thus, the instance partition assignment remains the same as the previous one.
+    //     pool 0: [ i12, i4,  i0,  i1, i10 ]
+    //     pool 1: [  i7, i9, i11, i13,  i6 ]
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0,
+            SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+
+    // Set tag pool config to 3.
+    tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Putting the existingPoolToInstancesMap shouldn't change the instance assignment,
+    // as there are only 2 replica groups needed.
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+    // But since Pool 0 and Pool 1 is already being used for the table, the numReplica remains at 2,
+    // so the 3rd pool (Pool 2) won't be picked up.
+    // Thus, the instance partition assignment remains the same as the existing one.
+    // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
+    // replica-group 1
+    // Now in poolToInstancesMap:
+    //     pool 0: [ i12, i4,  i0,  i1, i10 ]
+    //     pool 1: [  i7, i9, i11, i13,  i6 ]
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0,
+            SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+
+    // Set replica group from 2 to 3
+    numReplicaGroups = 3;
+    replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null);
+    tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+        new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+
+    // Get the latest existingInstancePartitions from last computation.
+    existingInstancePartitions = instancePartitions;
+
+    // Now that 1 more replica group is needed, Pool 2 will be chosen for the 3rd replica group
+    // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+    // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+    // [pool0, pool1, pool2]
+    //  r0     r1     r2
+    // Each replica-group should have 2 instances assigned
+    // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+    // Latest instances from ZK:
+    //   pool 0: [ i3, i4, i0, i1, i2 ]
+    //   pool 1: [ i8, i9, i5, i6, i7 ]
+    //   pool 2: [ i22,i23,i19,i20,i21]
+    // Thus, the new assignment will become:
+    //   pool 0: [ i12, i4,  i0,  i1, i10 ]
+    //   pool 1: [  i7, i9, i11, i13,  i6 ]
+    //   pool 2: [ i22, i23, i19, i20,i21 ]
+    instancePartitions =
+        driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+    assertEquals(instancePartitions.getNumPartitions(), 1);
+    assertEquals(instancePartitions.getInstances(0, 0),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0,
+            SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10));
+    assertEquals(instancePartitions.getInstances(0, 1),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
+            SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+    assertEquals(instancePartitions.getInstances(0, 2),
+        Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 22, SERVER_INSTANCE_ID_PREFIX + 23, SERVER_INSTANCE_ID_PREFIX + 19,
+            SERVER_INSTANCE_ID_PREFIX + 20, SERVER_INSTANCE_ID_PREFIX + 21));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org