You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/01 07:17:16 UTC

(pinot) 03/05: Address PR comments

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch maintain-pool-selection-for-minimizeDataMovement
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 2a934ef00dcccb74a60312309300fcdbd013dd92
Author: jlli_LinkedIn <jl...@linkedin.com>
AuthorDate: Thu Nov 16 21:30:54 2023 -0800

    Address PR comments
---
 .../instance/InstanceTagPoolSelector.java          | 24 ++++++++++------------
 1 file changed, 11 insertions(+), 13 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 755e7aa713..2062a75209 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -22,11 +22,11 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.assignment.InstancePartitions;
@@ -50,8 +50,7 @@ public class InstanceTagPoolSelector {
   private final InstancePartitions _existingInstancePartitions;
 
   public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType,
-      boolean minimizeDataMovement,
-      @Nullable InstancePartitions existingInstancePartitions) {
+      boolean minimizeDataMovement, @Nullable InstancePartitions existingInstancePartitions) {
     _tagPoolConfig = tagPoolConfig;
     _tableNameWithType = tableNameWithType;
     _minimizeDataMovement = minimizeDataMovement;
@@ -124,7 +123,7 @@ public class InstanceTagPoolSelector {
 
         poolsToSelect = new ArrayList<>(numPoolsToSelect);
         if (_minimizeDataMovement && _existingInstancePartitions != null) {
-          Set<Integer> existingPools = new HashSet<>(numPoolsToSelect);
+          Set<Integer> existingPools = new TreeSet<>();
           // Keep the same pool if it's already been used for the table.
           int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
           int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
@@ -135,9 +134,10 @@ public class InstanceTagPoolSelector {
               List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
               for (String existingInstance : existingInstances) {
                 Integer existingPool = instanceToPoolMap.get(existingInstance);
-                if (existingPool != null & pools.contains(existingPool)) {
-                  poolsToSelect.add(existingPool);
-                  existingPools.add(existingPool);
+                if (existingPool != null) {
+                  if (existingPools.add(existingPool)) {
+                    poolsToSelect.add(existingPool);
+                  }
                   foundExistingPoolForReplicaGroup = true;
                   break;
                 }
@@ -147,12 +147,10 @@ public class InstanceTagPoolSelector {
           LOGGER.info("Keep the same pool: {} for table: {}", existingPools, _tableNameWithType);
           // Pick a pool from remainingPools that isn't used before.
           List<Integer> remainingPools = new ArrayList<>(pools);
-          remainingPools.retainAll(existingPools);
-          // Skip selecting the existing pool.
-          for (int i = 0; i < numPoolsToSelect; i++) {
-            if (existingPools.contains(i)) {
-              continue;
-            }
+          remainingPools.removeAll(existingPools);
+          // Select from the remaining pools.
+          int remainingNumPoolsToSelect = numPoolsToSelect - poolsToSelect.size();
+          for (int i = 0; i < remainingNumPoolsToSelect; i++) {
             poolsToSelect.add(remainingPools.remove(i % remainingPools.size()));
           }
         } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org