You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2023/11/05 18:55:34 UTC
(pinot) 01/01: Enhance the minimizeDataMovement to keep the existing pool assignment
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch maintain-pool-selection-for-minimizeDataMovement
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit f372544ca2292c629763dd56dc54bd165ea555f5
Author: jlli_LinkedIn <jl...@linkedin.com>
AuthorDate: Sun Nov 5 10:55:11 2023 -0800
Enhance the minimizeDataMovement to keep the existing pool assignment
---
.../instance/InstanceAssignmentDriver.java | 8 +-
.../InstanceReplicaGroupPartitionSelector.java | 70 +++++++++++--
.../instance/InstanceTagPoolSelector.java | 50 +++++++++-
.../instance/InstanceAssignmentTest.java | 108 +++++++++++++++++++++
4 files changed, 221 insertions(+), 15 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index 6d869b86c1..6b833a436a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -30,6 +30,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,8 +89,11 @@ public class InstanceAssignmentDriver {
String tableNameWithType = _tableConfig.getTableName();
LOGGER.info("Starting {} instance assignment for table {}", instancePartitionsName, tableNameWithType);
+ InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig =
+ instanceAssignmentConfig.getReplicaGroupPartitionConfig();
InstanceTagPoolSelector tagPoolSelector =
- new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType);
+ new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType,
+ instanceReplicaGroupPartitionConfig, existingInstancePartitions);
Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = tagPoolSelector.selectInstances(instanceConfigs);
InstanceConstraintConfig constraintConfig = instanceAssignmentConfig.getConstraintConfig();
@@ -105,7 +109,7 @@ public class InstanceAssignmentDriver {
InstancePartitionSelector instancePartitionSelector =
InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(),
- instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions,
+ instanceReplicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions,
preConfiguredInstancePartitions);
InstancePartitions instancePartitions = new InstancePartitions(instancePartitionsName);
instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index de1e681d17..2e639409bc 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -21,19 +21,23 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Deque;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeMap;
import javax.annotation.Nullable;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.apache.pinot.spi.utils.Pairs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,16 +77,66 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
- for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
- // Pick one pool for each replica-group based on the table name hash
- int pool = pools.get((tableNameHash + replicaId) % numPools);
- poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
- replicaGroupIdToPoolMap.put(replicaId, pool);
-
+ Map<String, Integer> instanceToPoolMap = new HashMap<>();
+ for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+ Integer pool = entry.getKey();
+ List<InstanceConfig> instanceConfigsInPool = entry.getValue();
Set<String> candidateInstances =
poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
- List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool);
- instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName()));
+ for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+ String instanceName = instanceConfig.getInstanceName();
+ candidateInstances.add(instanceName);
+ instanceToPoolMap.put(instanceName, pool);
+ }
+ }
+
+ if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) {
+ // Keep the same pool for the replica group if it's already been used for the table.
+ int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+ int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+ int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);
+ for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) {
+ boolean foundExistingReplicaGroup = false;
+ for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingReplicaGroup; partitionId++) {
+ List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+ for (String existingInstance : existingInstances) {
+ Integer existingPool = instanceToPoolMap.get(existingInstance);
+ if (existingPool != null & pools.contains(existingPool)) {
+ poolToReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new ArrayList<>()).add(replicaGroupId);
+ replicaGroupIdToPoolMap.put(replicaGroupId, existingPool);
+ foundExistingReplicaGroup = true;
+ break;
+ }
+ }
+ }
+ }
+ // Use a min heap to track the least frequently picked pool among all the pools
+ PriorityQueue<Pairs.IntPair> minHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator());
+ for (int pool : pools) {
+ int numExistingReplicaGroups =
+ poolToReplicaGroupIdsMap.get(pool) != null ? poolToReplicaGroupIdsMap.get(pool).size() : 0;
+ minHeap.add(new Pairs.IntPair(numExistingReplicaGroups, pool));
+ }
+ for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
+ if (replicaGroupIdToPoolMap.containsKey(replicaId)) {
+ continue;
+ }
+ // Increment the frequency for a given pool and put it back to the min heap to rotate the pool selection.
+ Pairs.IntPair pair = minHeap.remove();
+ int pool = pair.getRight();
+ pair.setLeft(pair.getLeft() + 1);
+ minHeap.add(pair);
+ poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
+ replicaGroupIdToPoolMap.put(replicaId, pool);
+ }
+ } else {
+ // Current default way to assign pool to replica groups.
+ for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
+ // Pick one pool for each replica-group based on the table name hash
+ int pool = pools.get((tableNameHash + replicaId) % numPools);
+ poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
+ replicaGroupIdToPoolMap.put(replicaId, pool);
+ }
}
LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap,
_tableNameWithType);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 5aefd1ad69..f029915158 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -21,12 +21,16 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import javax.annotation.Nullable;
import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.utils.config.InstanceUtils;
+import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,9 +45,17 @@ public class InstanceTagPoolSelector {
private final InstanceTagPoolConfig _tagPoolConfig;
private final String _tableNameWithType;
- public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType) {
+ private final InstanceReplicaGroupPartitionConfig _instanceReplicaGroupPartitionConfig;
+
+ private final InstancePartitions _existingInstancePartitions;
+
+ public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType,
+ InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig,
+ @Nullable InstancePartitions existingInstancePartitions) {
_tagPoolConfig = tagPoolConfig;
_tableNameWithType = tableNameWithType;
+ _instanceReplicaGroupPartitionConfig = instanceReplicaGroupPartitionConfig;
+ _existingInstancePartitions = existingInstancePartitions;
}
/**
@@ -70,12 +82,14 @@ public class InstanceTagPoolSelector {
if (_tagPoolConfig.isPoolBased()) {
// Pool based selection
+ Map<String, Integer> instanceToPoolMap = new HashMap<>();
// Extract the pool information from the instance configs
for (InstanceConfig instanceConfig : candidateInstanceConfigs) {
Map<String, String> poolMap = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY);
if (poolMap != null && poolMap.containsKey(tag)) {
int pool = Integer.parseInt(poolMap.get(tag));
poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(instanceConfig);
+ instanceToPoolMap.put(instanceConfig.getInstanceName(), pool);
}
}
Preconditions.checkState(!poolToInstanceConfigsMap.isEmpty(),
@@ -109,11 +123,37 @@ public class InstanceTagPoolSelector {
return poolToInstanceConfigsMap;
}
- // Select pools based on the table name hash to evenly distribute the tables
poolsToSelect = new ArrayList<>(numPoolsToSelect);
- List<Integer> poolsInCluster = new ArrayList<>(pools);
- for (int i = 0; i < numPoolsToSelect; i++) {
- poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+ if (_instanceReplicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) {
+ // Keep the same pool if it's already been used for the table.
+ int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+ int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+ for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
+ for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
+ List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+ for (String existingInstance : existingInstances) {
+ Integer existingPool = instanceToPoolMap.get(existingInstance);
+ if (existingPool != null & pools.contains(existingPool)) {
+ poolsToSelect.add(existingPool);
+ }
+ }
+ }
+ }
+ LOGGER.info("Keep the same pool: {} for table: {}", poolsToSelect, _tableNameWithType);
+ // Skip selecting the existing pool.
+ List<Integer> poolsInCluster = new ArrayList<>(pools);
+ for (int i = 0; i < numPoolsToSelect; i++) {
+ if (poolsToSelect.contains(i)) {
+ continue;
+ }
+ poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+ }
+ } else {
+ // Select pools based on the table name hash to evenly distribute the tables
+ List<Integer> poolsInCluster = new ArrayList<>(pools);
+ for (int i = 0; i < numPoolsToSelect; i++) {
+ poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+ }
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index b25a529e10..56473b88ab 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -1693,6 +1693,114 @@ public class InstanceAssignmentTest {
assertEquals(instancePartitions.getInstances(0, 1),
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+
+ // The below is the test suite for testing out minimizeDataMovement with pool configs
+ // Add the third pool with same number of instances but keep number of pools the same (i.e. 2)
+ numPools = 3;
+ numInstances = numPools * numInstancesPerPool;
+ for (int i = numInstances + 4; i < numInstances + 9; i++) {
+ InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+ instanceConfig.addTag(OFFLINE_TAG);
+ int pool = numPools - 1;
+ instanceConfig.getRecord()
+ .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+ instanceConfigs.add(instanceConfig);
+ }
+
+ // Get the latest existingInstancePartitions from last computation.
+ existingInstancePartitions = instancePartitions;
+
+ // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2, but since minimizeDataMovement is enabled,
+ // same pools would be re-used.
+ // [pool0, pool1]
+ // r0 r1
+ // Thus, the instance partition assignment remains the same as the previous one.
+ // pool 0: [ i12, i4, i0, i1, i10 ]
+ // pool 1: [ i7, i9, i11, i13, i6 ]
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0,
+ SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
+ SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+
+ // Set tag pool config to 3.
+ tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+
+ // Get the latest existingInstancePartitions from last computation.
+ existingInstancePartitions = instancePartitions;
+
+ // Putting the existingPoolToInstancesMap shouldn't change the instance assignment.
+ // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+ // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+ // But since Pool 0 and Pool 1 is already being used for the table, the numReplica remains at 2,
+ // so the 3rd pool (Pool 2) won't be picked up.
+ // Thus, the instance partition assignment remains the same as the existing one.
+ // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
+ // replica-group 1
+ // Now in poolToInstancesMap:
+ // pool 0: [ i12, i4, i0, i1, i10 ]
+ // pool 1: [ i7, i9, i11, i13, i6 ]
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0,
+ SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
+ SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+
+ // Set replica group from 2 to 3
+ numReplicaGroups = 3;
+ replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null);
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+
+ // Get the latest existingInstancePartitions from last computation.
+ existingInstancePartitions = instancePartitions;
+
+ // Now that one more replica group is needed, Pool 2 will be chosen for the 3rd replica group
+ // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+ // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+ // [pool0, pool1, pool2]
+ // r0 r1 r2
+ // Each replica-group should have 2 instances assigned
+ // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+ // Latest instances from ZK:
+ // pool 0: [ i3, i4, i0, i1, i2 ]
+ // pool 1: [ i8, i9, i5, i6, i7 ]
+ // pool 2: [ i22,i23,i19,i20,i21]
+ // Thus, the new assignment will become:
+ // pool 0: [ i12, i4, i0, i1, i10 ]
+ // pool 1: [ i7, i9, i11, i13, i6 ]
+ // pool 2: [ i22, i23, i19, i20,i21 ]
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ System.out.println(instancePartitions);
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0,
+ SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
+ SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+ assertEquals(instancePartitions.getInstances(0, 2),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 22, SERVER_INSTANCE_ID_PREFIX + 23, SERVER_INSTANCE_ID_PREFIX + 19,
+ SERVER_INSTANCE_ID_PREFIX + 20, SERVER_INSTANCE_ID_PREFIX + 21));
+ }
+
+ @Test
+ public void test() {
+ System.out.println(Math.abs("myTable_OFFLINE".hashCode()) % 3);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org