You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "jackjlli (via GitHub)" <gi...@apache.org> on 2023/11/05 19:08:20 UTC

[PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

jackjlli opened a new pull request, #11953:
URL: https://github.com/apache/pinot/pull/11953

   This PR maintains the existing pool selection for the minimizeDataMovement instance partition assignment strategy.
   
   This scenario would be useful for the case when a new pool is added and Pinot admin would like to keep the current replica group assignment to the existing pool as well as leveraging the new pool for the new replica group.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

Posted by "jackjlli (via GitHub)" <gi...@apache.org>.
jackjlli commented on code in PR #11953:
URL: https://github.com/apache/pinot/pull/11953#discussion_r1419760987


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java:
##########
@@ -109,11 +121,46 @@ public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> i
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+        if (_minimizeDataMovement && _existingInstancePartitions != null) {
+          Set<Integer> existingPools = new TreeSet<>();
+          // Keep the same pool if it's already been used for the table.
+          int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+          int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+          for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
+            boolean foundExistingPoolForReplicaGroup = false;
+            for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingPoolForReplicaGroup;
+                partitionId++) {
+              List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              for (String existingInstance : existingInstances) {
+                Integer existingPool = instanceToPoolMap.get(existingInstance);
+                if (existingPool != null) {
+                  if (existingPools.add(existingPool)) {
+                    poolsToSelect.add(existingPool);
+                  }
+                  foundExistingPoolForReplicaGroup = true;
+                  break;
+                }
+              }
+            }
+          }
+          LOGGER.info("Keep the same pool: {} for table: {}", existingPools, _tableNameWithType);
+          // Pick a pool from remainingPools that isn't used before.
+          List<Integer> remainingPools = new ArrayList<>(pools);
+          remainingPools.retainAll(existingPools);
+          // Skip selecting the existing pool.
+          for (int i = 0; i < numPoolsToSelect; i++) {
+            if (existingPools.contains(i)) {

Review Comment:
   Updated the logic to select from the remaining pools.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

Posted by "jackjlli (via GitHub)" <gi...@apache.org>.
jackjlli commented on code in PR #11953:
URL: https://github.com/apache/pinot/pull/11953#discussion_r1419770234


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -73,16 +76,65 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
       Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
-      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-        // Pick one pool for each replica-group based on the table name hash
-        int pool = pools.get((tableNameHash + replicaId) % numPools);
-        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
-        replicaGroupIdToPoolMap.put(replicaId, pool);
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+        Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+        for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+          String instanceName = instanceConfig.getInstanceName();
+          candidateInstances.add(instanceName);
+          instanceToPoolMap.put(instanceName, pool);
+        }
+      }
 
-        Set<String> candidateInstances =
-            poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
-        List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool);
-        instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName()));
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
+        // Keep the same pool for the replica group if it's already been used for the table.
+        int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+        int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);

Review Comment:
   Let me put the analysis for RG reduction and increment respectively here.
   
   For the RG reduction scenario, it's always the RG(s) with higher numbers that got removed. Let's say numRG got reduced from 2 to 1 (i.e. one out of two replica groups got reduced), it's always the RG1 instead of RG0 that would be removed.  In this case, when calculating the new number of instances per RG, we should always use the updated number of RGs in a pool (which comes from `poolToReplicaGroupIdsMap`) to calculate that (detailed logic can be seen in Line 164 of this class).
   
   For the RG increment scenario, it's always the RG(s) with higher numbers that got added. In this case, the pool(s) with least number of previous usage would be chosen, which are either 1) the pools would never be used at all (i.e. from a unused pool), or 2) the least frequent chosen pool would be picked up (i.e. from an existing pool). The order is maintained by a min heap, so that the unused pool would always be chosen before choosing any existing pools.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11953:
URL: https://github.com/apache/pinot/pull/11953#discussion_r1423299335


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -73,16 +76,65 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
       Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
-      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-        // Pick one pool for each replica-group based on the table name hash
-        int pool = pools.get((tableNameHash + replicaId) % numPools);
-        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
-        replicaGroupIdToPoolMap.put(replicaId, pool);
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+        Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+        for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+          String instanceName = instanceConfig.getInstanceName();
+          candidateInstances.add(instanceName);
+          instanceToPoolMap.put(instanceName, pool);
+        }
+      }
 
-        Set<String> candidateInstances =
-            poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
-        List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool);
-        instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName()));
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
+        // Keep the same pool for the replica group if it's already been used for the table.
+        int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+        int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);

Review Comment:
   What I meant is that we didn't really pick the optimal pool to minimize the data movement. Currently we pick a pool when any existing server shows up in the pool, even if there are more servers shared with another pool. This algorithm works well when servers are not moved from one pool to another, but might not return best pool otherwise.
   Another algorithm which always give the best pool is to track the shared server count within all pools, and pick the pool with most shared servers.
   
   More importantly, the current algorithm could cause wrong assignment in the following scenario:
   Existing instance partitions: RG0: [s0, s1], RG1: [s2, s3]
   Pools: P0: [s0, s1, s2], P1: [s3, s4, s5]
   Current algorithm will assign both PRs to P0, but P0 doesn't have enough servers to hold both RGs
   To make it work the same way as default assignment, we need to track the maximum RGs assigned to a pool, and not assign more RGs to a pool when it already reaches the max.
   
   I'd suggest adding a test to catch such scenario



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

Posted by "jackjlli (via GitHub)" <gi...@apache.org>.
jackjlli commented on code in PR #11953:
URL: https://github.com/apache/pinot/pull/11953#discussion_r1419760685


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java:
##########
@@ -109,11 +121,46 @@ public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> i
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+        if (_minimizeDataMovement && _existingInstancePartitions != null) {
+          Set<Integer> existingPools = new TreeSet<>();
+          // Keep the same pool if it's already been used for the table.
+          int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+          int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+          for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
+            boolean foundExistingPoolForReplicaGroup = false;
+            for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingPoolForReplicaGroup;
+                partitionId++) {
+              List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              for (String existingInstance : existingInstances) {
+                Integer existingPool = instanceToPoolMap.get(existingInstance);
+                if (existingPool != null) {
+                  if (existingPools.add(existingPool)) {
+                    poolsToSelect.add(existingPool);
+                  }
+                  foundExistingPoolForReplicaGroup = true;
+                  break;
+                }
+              }
+            }
+          }
+          LOGGER.info("Keep the same pool: {} for table: {}", existingPools, _tableNameWithType);
+          // Pick a pool from remainingPools that isn't used before.
+          List<Integer> remainingPools = new ArrayList<>(pools);
+          remainingPools.retainAll(existingPools);

Review Comment:
   Good catch! Updated it to `removeAll()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11953:
URL: https://github.com/apache/pinot/pull/11953#discussion_r1391841066


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java:
##########
@@ -29,12 +29,14 @@ abstract class InstancePartitionSelector {
   protected final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig;
   protected final String _tableNameWithType;
   protected final InstancePartitions _existingInstancePartitions;
+  protected final boolean _minimizeDataMovement;
 
   public InstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
-      String tableNameWithType, InstancePartitions existingInstancePartitions) {
+      String tableNameWithType, InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
     _replicaGroupPartitionConfig = replicaGroupPartitionConfig;
     _tableNameWithType = tableNameWithType;
     _existingInstancePartitions = existingInstancePartitions;
+    _minimizeDataMovement = minimizeDataMovement;

Review Comment:
   ```suggestion
       // For backward compatibility, enable minimize data movement when it is enabled in top level or instance partition selector level
       _minimizeDataMovement = minimizeDataMovement || replicaGroupPartitionConfig.isMinimizeDataMovement();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

Posted by "jackjlli (via GitHub)" <gi...@apache.org>.
jackjlli commented on code in PR #11953:
URL: https://github.com/apache/pinot/pull/11953#discussion_r1396731814


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -73,16 +76,65 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
       Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
-      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-        // Pick one pool for each replica-group based on the table name hash
-        int pool = pools.get((tableNameHash + replicaId) % numPools);
-        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
-        replicaGroupIdToPoolMap.put(replicaId, pool);
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+        Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+        for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+          String instanceName = instanceConfig.getInstanceName();
+          candidateInstances.add(instanceName);
+          instanceToPoolMap.put(instanceName, pool);
+        }
+      }
 
-        Set<String> candidateInstances =
-            poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
-        List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool);
-        instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName()));
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
+        // Keep the same pool for the replica group if it's already been used for the table.
+        int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+        int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);

Review Comment:
   The reason of checking the common replica groups is that it's not always the case that numReplicaGroups gets incremented. If the number of replica groups is reduced, we don't actually care what was used for the stale RG which is no longer needed. That's why the common one is used here.
   
   The scenarios you mentioned can be covered by using the min heap in Line 112, which is to gather the pool number as well as the number of times to be chosen. The one with the least frequent usage would always be chosen to assign to a RG. Keep in mind that 1 RG can only have 1 pool (RG -> pool), while 1 pool may have more than 1 RG (pool -> [RG]).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11953:
URL: https://github.com/apache/pinot/pull/11953#discussion_r1410042951


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java:
##########
@@ -109,11 +121,46 @@ public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> i
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+        if (_minimizeDataMovement && _existingInstancePartitions != null) {
+          Set<Integer> existingPools = new TreeSet<>();
+          // Keep the same pool if it's already been used for the table.
+          int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+          int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+          for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
+            boolean foundExistingPoolForReplicaGroup = false;
+            for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingPoolForReplicaGroup;
+                partitionId++) {
+              List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              for (String existingInstance : existingInstances) {
+                Integer existingPool = instanceToPoolMap.get(existingInstance);
+                if (existingPool != null) {
+                  if (existingPools.add(existingPool)) {
+                    poolsToSelect.add(existingPool);
+                  }
+                  foundExistingPoolForReplicaGroup = true;
+                  break;
+                }
+              }
+            }
+          }
+          LOGGER.info("Keep the same pool: {} for table: {}", existingPools, _tableNameWithType);
+          // Pick a pool from remainingPools that isn't used before.
+          List<Integer> remainingPools = new ArrayList<>(pools);
+          remainingPools.retainAll(existingPools);

Review Comment:
   I don't follow this part. Should it be `removeAll()`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -73,16 +76,65 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
       Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
-      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-        // Pick one pool for each replica-group based on the table name hash
-        int pool = pools.get((tableNameHash + replicaId) % numPools);
-        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
-        replicaGroupIdToPoolMap.put(replicaId, pool);
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+        Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+        for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+          String instanceName = instanceConfig.getInstanceName();
+          candidateInstances.add(instanceName);
+          instanceToPoolMap.put(instanceName, pool);
+        }
+      }
 
-        Set<String> candidateInstances =
-            poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
-        List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool);
-        instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName()));
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
+        // Keep the same pool for the replica group if it's already been used for the table.
+        int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+        int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);

Review Comment:
   Say we used to have 2 RGs, and now we reduce it to 1 RG, we should still check all existing RGs and pick the pool with the most common instances so that we can keep minimize movement.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java:
##########
@@ -109,11 +121,46 @@ public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> i
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+        if (_minimizeDataMovement && _existingInstancePartitions != null) {
+          Set<Integer> existingPools = new TreeSet<>();
+          // Keep the same pool if it's already been used for the table.
+          int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+          int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+          for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
+            boolean foundExistingPoolForReplicaGroup = false;
+            for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingPoolForReplicaGroup;
+                partitionId++) {
+              List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              for (String existingInstance : existingInstances) {
+                Integer existingPool = instanceToPoolMap.get(existingInstance);
+                if (existingPool != null) {
+                  if (existingPools.add(existingPool)) {
+                    poolsToSelect.add(existingPool);
+                  }
+                  foundExistingPoolForReplicaGroup = true;
+                  break;
+                }
+              }
+            }
+          }
+          LOGGER.info("Keep the same pool: {} for table: {}", existingPools, _tableNameWithType);
+          // Pick a pool from remainingPools that isn't used before.
+          List<Integer> remainingPools = new ArrayList<>(pools);
+          remainingPools.retainAll(existingPools);
+          // Skip selecting the existing pool.
+          for (int i = 0; i < numPoolsToSelect; i++) {
+            if (existingPools.contains(i)) {

Review Comment:
   This doesn't seem correct. Why are we looking up index within the pool set?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

Posted by "jackjlli (via GitHub)" <gi...@apache.org>.
jackjlli commented on PR #11953:
URL: https://github.com/apache/pinot/pull/11953#issuecomment-1801074257

   Discussed with @Jackie-Jiang offline. The `_minimizeDataMovement` config is put to the `InstanceAssignmentConfig` level so that it applies to both pool selection and instance selection. The `_minimizeDataMovement` config inside `InstanceReplicaGroupPartitionConfig` class is marked as `Deprecated` and will be removed in the next official release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

Posted by "jackjlli (via GitHub)" <gi...@apache.org>.
jackjlli commented on PR #11953:
URL: https://github.com/apache/pinot/pull/11953#issuecomment-1800431309

   > We should not access `InstanceReplicaGroupPartitionConfig` in `InstanceTagPoolSelector`. We can consider moving `minimizeDataMovement` flag into `InstanceAssignmentConfig`
   
   That's good point! How about we just pass the boolean `_minimizeDataMovement` parameter down to `InstanceTagPoolSelector` class, because that's the only parameter I need in that class?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

Posted by "jackjlli (via GitHub)" <gi...@apache.org>.
jackjlli commented on code in PR #11953:
URL: https://github.com/apache/pinot/pull/11953#discussion_r1396722849


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java:
##########
@@ -109,11 +122,45 @@ public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> i
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+        if (_minimizeDataMovement && _existingInstancePartitions != null) {
+          Set<Integer> existingPools = new HashSet<>(numPoolsToSelect);
+          // Keep the same pool if it's already been used for the table.
+          int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+          int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+          for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
+            boolean foundExistingPoolForReplicaGroup = false;
+            for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingPoolForReplicaGroup;
+                partitionId++) {
+              List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              for (String existingInstance : existingInstances) {
+                Integer existingPool = instanceToPoolMap.get(existingInstance);
+                if (existingPool != null & pools.contains(existingPool)) {
+                  poolsToSelect.add(existingPool);

Review Comment:
   Adjusted the logic to only adding the candidate pool to  `poolsToSelect ` if adding it to the existingPools set succeeded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

Posted by "Ferix9288 (via GitHub)" <gi...@apache.org>.
Ferix9288 commented on PR #11953:
URL: https://github.com/apache/pinot/pull/11953#issuecomment-1861257035

   Hello~ I've been lurking for the past months or so haha. Any chance this can be merged in soon as our company really dislikes our current workaround for this problem? Thanks so much again @jackjlli  and @Jackie-Jiang  πŸ™ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #11953:
URL: https://github.com/apache/pinot/pull/11953#issuecomment-1793827579

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/11953?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#11953](https://app.codecov.io/gh/apache/pinot/pull/11953?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (50e0c45) into [master](https://app.codecov.io/gh/apache/pinot/commit/baea4a20dfa0fb56e545078c7fa11e01a478e681?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (baea4a2) will **decrease** coverage by `15.00%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #11953       +/-   ##
   =============================================
   - Coverage     61.45%   46.45%   -15.00%     
   + Complexity     1145      938      -207     
   =============================================
     Files          2385     1787      -598     
     Lines        129065    93505    -35560     
     Branches      19955    15103     -4852     
   =============================================
   - Hits          79317    43439    -35878     
   - Misses        44023    47013     +2990     
   + Partials       5725     3053     -2672     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pinot/pull/11953/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Ξ” | |
   |---|---|---|
   | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/11953/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration](https://app.codecov.io/gh/apache/pinot/pull/11953/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration1](https://app.codecov.io/gh/apache/pinot/pull/11953/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration2](https://app.codecov.io/gh/apache/pinot/pull/11953/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java-11](https://app.codecov.io/gh/apache/pinot/pull/11953/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java-21](https://app.codecov.io/gh/apache/pinot/pull/11953/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.45% <ΓΈ> (-14.85%)` | :arrow_down: |
   | [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/11953/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/11953/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.45% <ΓΈ> (-14.84%)` | :arrow_down: |
   | [temurin](https://app.codecov.io/gh/apache/pinot/pull/11953/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.45% <ΓΈ> (-15.00%)` | :arrow_down: |
   | [unittests](https://app.codecov.io/gh/apache/pinot/pull/11953/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.45% <ΓΈ> (-15.00%)` | :arrow_down: |
   | [unittests1](https://app.codecov.io/gh/apache/pinot/pull/11953/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.45% <ΓΈ> (-0.22%)` | :arrow_down: |
   | [unittests2](https://app.codecov.io/gh/apache/pinot/pull/11953/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   [see 976 files with indirect coverage changes](https://app.codecov.io/gh/apache/pinot/pull/11953/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: Codecov offers a browser extension for seamless coverage viewing on GitHub. Try it in [Chrome](https://chrome.google.com/webstore/detail/codecov/gedikamndpbemklijjkncpnolildpbgo) or [Firefox](https://addons.mozilla.org/en-US/firefox/addon/codecov/) today!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

Posted by "jackjlli (via GitHub)" <gi...@apache.org>.
jackjlli commented on code in PR #11953:
URL: https://github.com/apache/pinot/pull/11953#discussion_r1438492453


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -73,16 +76,65 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
       Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
-      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-        // Pick one pool for each replica-group based on the table name hash
-        int pool = pools.get((tableNameHash + replicaId) % numPools);
-        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
-        replicaGroupIdToPoolMap.put(replicaId, pool);
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+        Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+        for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+          String instanceName = instanceConfig.getInstanceName();
+          candidateInstances.add(instanceName);
+          instanceToPoolMap.put(instanceName, pool);
+        }
+      }
 
-        Set<String> candidateInstances =
-            poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
-        List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool);
-        instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName()));
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
+        // Keep the same pool for the replica group if it's already been used for the table.
+        int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+        int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);

Review Comment:
   This is good scenario! I've adjusted the logic to consider the scenario when instances got moved across pools, which breaks the assumption that 1 RG can only have 1 pool (1 RG -> 1 pool) for the **existing** instance partition mapping. 
   A unit test called `testSelectPoolsWhenExistingReplicaGroupMapsToMultiplePools` is also added to capture this scenario in this PR. PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

Posted by "somandal (via GitHub)" <gi...@apache.org>.
somandal commented on code in PR #11953:
URL: https://github.com/apache/pinot/pull/11953#discussion_r1442339107


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java:
##########
@@ -79,13 +83,26 @@ public boolean equals(Object obj) {
   }
 
   public static class AscendingIntPairComparator implements Comparator<IntPair> {
+    private boolean _ascending;
+
+    public AscendingIntPairComparator(boolean ascending) {

Review Comment:
   recommend renaming this class since it is no longer a strictly "Ascending" comparator. The boolean you added allows both ascending and descending comparisons.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -71,18 +74,114 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon
       int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups();
       Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive");
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
+      Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>();
+      Map<Integer, Set<Integer>> existingPoolToExistingReplicaGroupIdsMap = new TreeMap<>();
+      Map<Integer, Set<String>> existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
       Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
-      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-        // Pick one pool for each replica-group based on the table name hash
-        int pool = pools.get((tableNameHash + replicaId) % numPools);
-        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
-        replicaGroupIdToPoolMap.put(replicaId, pool);
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+        Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+        for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+          String instanceName = instanceConfig.getInstanceName();
+          candidateInstances.add(instanceName);
+          instanceToPoolMap.put(instanceName, pool);
+        }
+      }
+
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
+        // Collect the stats between the existing pools, existing replica groups, and existing instances.
+        int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+        for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
+          for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
+            List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            for (String existingInstance : existingInstances) {
+              Integer existingPool = instanceToPoolMap.get(existingInstance);
+              if (existingPool != null) {
+                existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>())
+                    .add(existingInstance);
+                existingPoolToExistingReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new HashSet<>())
+                    .add(replicaGroupId);
+                existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>())
+                    .add(existingInstance);
+              }
+            }
+          }
+        }
+
+        // Use a max heap to track the number of servers used for the given pools,
+        // so that pool with max number of existing instances will be considered first.
+        PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false));
+        for (int pool : pools) {
+          maxHeap.add(
+              new Pairs.IntPair(existingPoolsToExistingInstancesMap.computeIfAbsent(pool, k -> new HashSet<>()).size(),
+                  pool));
+        }
 
-        Set<String> candidateInstances =
-            poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
-        List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool);
-        instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName()));
+        // Get the maximum number of replica groups per pool.
+        int maxNumberOfReplicaGroupPerPool = numReplicaGroups / pools.size();

Review Comment:
   The comment here is confusing. Should this use the ceil() of the division? What if the `numReplicaGroups` isn't a multiple of number of pools? e.g. 3 replica groups across 2 pools? This will set the max to 1 instead of 2.
   
   Or is this intentionally the floor? In which case can you update the comment and variable name to reflect that this should be minimum number of RGs/pool?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java:
##########
@@ -56,6 +56,8 @@ public class InstanceReplicaGroupPartitionConfig extends BaseJsonConfig {
       "Name of the column used for partition, if not provided table level replica group will be used")
   private final String _partitionColumn;
 
+  // TODO: remove this config in the next official release
+  @Deprecated

Review Comment:
   just a question: we'll have to update all table configs on our end to remove this once it is removed, right? Will we see failures for existing tables if this is deleted in the next release but we still have table configs setting this in the `InstanceReplicaGroupPartitionConfig`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -71,18 +74,114 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon
       int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups();
       Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive");
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
+      Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>();
+      Map<Integer, Set<Integer>> existingPoolToExistingReplicaGroupIdsMap = new TreeMap<>();
+      Map<Integer, Set<String>> existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
       Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
-      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-        // Pick one pool for each replica-group based on the table name hash
-        int pool = pools.get((tableNameHash + replicaId) % numPools);
-        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
-        replicaGroupIdToPoolMap.put(replicaId, pool);
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+        Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+        for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+          String instanceName = instanceConfig.getInstanceName();
+          candidateInstances.add(instanceName);
+          instanceToPoolMap.put(instanceName, pool);
+        }
+      }
+
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
+        // Collect the stats between the existing pools, existing replica groups, and existing instances.
+        int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+        for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
+          for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
+            List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+            for (String existingInstance : existingInstances) {
+              Integer existingPool = instanceToPoolMap.get(existingInstance);
+              if (existingPool != null) {
+                existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>())
+                    .add(existingInstance);
+                existingPoolToExistingReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new HashSet<>())
+                    .add(replicaGroupId);
+                existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>())
+                    .add(existingInstance);
+              }
+            }
+          }
+        }
+
+        // Use a max heap to track the number of servers used for the given pools,
+        // so that pool with max number of existing instances will be considered first.
+        PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false));
+        for (int pool : pools) {
+          maxHeap.add(
+              new Pairs.IntPair(existingPoolsToExistingInstancesMap.computeIfAbsent(pool, k -> new HashSet<>()).size(),
+                  pool));
+        }
 
-        Set<String> candidateInstances =
-            poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
-        List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool);
-        instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName()));
+        // Get the maximum number of replica groups per pool.
+        int maxNumberOfReplicaGroupPerPool = numReplicaGroups / pools.size();
+        // Given a pool number, assign replica group which has the max number of existing instances.
+        // Repeat this process until the max number of replica groups per pool is reached.
+        while (!maxHeap.isEmpty()) {
+          Pairs.IntPair pair = maxHeap.remove();
+          int poolNumber = pair.getRight();
+          for (int i = 0; i < maxNumberOfReplicaGroupPerPool; i++) {

Review Comment:
   Just wondering if there is a code simplification opportunity here. Instead of running this outer loop, can you just extract out the relevant group ids from `existingReplicaGroupIdToExistingInstancesMap`, sort by size ascending and assign the top `maxNumberOfReplicaGroupPerPool` number of target groups if larger than 0?
   
   Also I guess if you do want to keep this for loop you can move it to be after the following, right?
   ```
    Set<Integer> existingReplicaGroups = existingPoolToExistingReplicaGroupIdsMap.get(poolNumber);
               if (existingReplicaGroups == null || existingReplicaGroups.isEmpty()) {
                 continue;
               }
   ```
   I don't see how the above will change for each run



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java:
##########
@@ -109,11 +123,46 @@ public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> i
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+        if (_minimizeDataMovement && _existingInstancePartitions != null) {
+          Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>();
+          // Keep the same pool if it's already been used for the table.
+          int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+          int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+          for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
+            for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
+              List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              for (String existingInstance : existingInstances) {
+                Integer existingPool = instanceToPoolMap.get(existingInstance);
+                if (existingPool != null) {
+                  if (!existingPoolsToExistingInstancesMap.containsKey(existingPool)) {
+                    existingPoolsToExistingInstancesMap.put(existingPool, new HashSet<>());
+                  }

Review Comment:
   you don't need this. You're already doing a `computeIfAbsent` on the next line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11953:
URL: https://github.com/apache/pinot/pull/11953#discussion_r1395100930


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java:
##########
@@ -109,11 +122,45 @@ public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> i
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+        if (_minimizeDataMovement && _existingInstancePartitions != null) {
+          Set<Integer> existingPools = new HashSet<>(numPoolsToSelect);

Review Comment:
   (minor) This set is very small
   ```suggestion
             Set<Integer> existingPools = new TreeSet<>();
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -73,16 +76,65 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon
       Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
       Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
       Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
-      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
-        // Pick one pool for each replica-group based on the table name hash
-        int pool = pools.get((tableNameHash + replicaId) % numPools);
-        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
-        replicaGroupIdToPoolMap.put(replicaId, pool);
+      Map<String, Integer> instanceToPoolMap = new HashMap<>();
+      for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+        Integer pool = entry.getKey();
+        List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+        Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+        for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+          String instanceName = instanceConfig.getInstanceName();
+          candidateInstances.add(instanceName);
+          instanceToPoolMap.put(instanceName, pool);
+        }
+      }
 
-        Set<String> candidateInstances =
-            poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
-        List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool);
-        instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName()));
+      if (_minimizeDataMovement && _existingInstancePartitions != null) {
+        // Keep the same pool for the replica group if it's already been used for the table.
+        int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+        int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+        int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);

Review Comment:
   Why are we only checking the common replica groups? We should try to match the pool to existing replica groups, then assign the remaining pools.
   There are several scenarios to cover:
   - Same pool and replica groups
   - pools < replica groups (or single pool)
   - pools > replica groups



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java:
##########
@@ -109,11 +122,45 @@ public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> i
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+        if (_minimizeDataMovement && _existingInstancePartitions != null) {
+          Set<Integer> existingPools = new HashSet<>(numPoolsToSelect);
+          // Keep the same pool if it's already been used for the table.
+          int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+          int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+          for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
+            boolean foundExistingPoolForReplicaGroup = false;
+            for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingPoolForReplicaGroup;
+                partitionId++) {
+              List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              for (String existingInstance : existingInstances) {
+                Integer existingPool = instanceToPoolMap.get(existingInstance);
+                if (existingPool != null & pools.contains(existingPool)) {

Review Comment:
   (minor) The second check should be redundant because both maps are extracted from the same mapping
   ```suggestion
                   if (existingPool != null) {
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java:
##########
@@ -109,11 +122,45 @@ public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> i
           return poolToInstanceConfigsMap;
         }
 
-        // Select pools based on the table name hash to evenly distribute the tables
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
-        List<Integer> poolsInCluster = new ArrayList<>(pools);
-        for (int i = 0; i < numPoolsToSelect; i++) {
-          poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+        if (_minimizeDataMovement && _existingInstancePartitions != null) {
+          Set<Integer> existingPools = new HashSet<>(numPoolsToSelect);
+          // Keep the same pool if it's already been used for the table.
+          int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+          int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+          for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
+            boolean foundExistingPoolForReplicaGroup = false;
+            for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingPoolForReplicaGroup;
+                partitionId++) {
+              List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+              for (String existingInstance : existingInstances) {
+                Integer existingPool = instanceToPoolMap.get(existingInstance);
+                if (existingPool != null & pools.contains(existingPool)) {
+                  poolsToSelect.add(existingPool);

Review Comment:
   This can potentially add the same pool multiple times. We should probably first gather all existing pools, then decide the pools to select



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java:
##########
@@ -41,9 +45,17 @@ public class InstanceTagPoolSelector {
   private final InstanceTagPoolConfig _tagPoolConfig;
   private final String _tableNameWithType;
 
-  public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType) {
+  private final boolean _minimizeDataMovement;
+
+  private final InstancePartitions _existingInstancePartitions;
+
+  public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType,
+      boolean minimizeDataMovement,
+      @Nullable InstancePartitions existingInstancePartitions) {

Review Comment:
   (minor) Remove the unnecessary whitespace and reformat



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

Posted by "jackjlli (via GitHub)" <gi...@apache.org>.
jackjlli commented on PR #11953:
URL: https://github.com/apache/pinot/pull/11953#issuecomment-1811858733

   Thanks @Jackie-Jiang! The PR is ready to be reviewed now. It'd be great if you can help review it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

Posted by "jackjlli (via GitHub)" <gi...@apache.org>.
jackjlli commented on PR #11953:
URL: https://github.com/apache/pinot/pull/11953#issuecomment-1872474544

   Hey @Ferix9288 , sorry for the late reply! I've updated the logic for this PR and hopefully it can be merged to meet your need soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Maintain pool selection for the minimizeDataMovement instance partition assignment strategy [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang merged PR #11953:
URL: https://github.com/apache/pinot/pull/11953


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org