You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/01 07:17:14 UTC
(pinot) 01/05: Enhance the minimizeDataMovement to keep the existing pool assignment
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch maintain-pool-selection-for-minimizeDataMovement
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 9550545be3ea35b2874d79a62d396806c0f0f23f
Author: jlli_LinkedIn <jl...@linkedin.com>
AuthorDate: Sun Nov 5 10:55:11 2023 -0800
Enhance the minimizeDataMovement to keep the existing pool assignment
---
.../assignment/InstanceAssignmentConfigUtils.java | 2 +-
.../common/utils/config/TableConfigSerDeTest.java | 2 +-
.../instance/FDAwareInstancePartitionSelector.java | 6 +-
.../instance/InstanceAssignmentDriver.java | 10 +-
.../instance/InstancePartitionSelector.java | 4 +-
.../instance/InstancePartitionSelectorFactory.java | 18 +-
.../InstanceReplicaGroupPartitionSelector.java | 78 +++++++--
.../instance/InstanceTagPoolSelector.java | 63 ++++++-
.../MirrorServerSetInstancePartitionSelector.java | 4 +-
...anceAssignmentRestletResourceStatelessTest.java | 6 +-
.../instance/InstanceAssignmentTest.java | 193 ++++++++++++++++-----
.../InstanceReplicaGroupPartitionSelectorTest.java | 2 +-
.../TableRebalancerClusterStatelessTest.java | 4 +-
.../table/assignment/InstanceAssignmentConfig.java | 16 +-
.../InstanceReplicaGroupPartitionConfig.java | 2 +
15 files changed, 311 insertions(+), 99 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
index ebf38d308f..13cc270954 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
@@ -122,7 +122,7 @@ public class InstanceAssignmentConfigUtils {
replicaGroupStrategyConfig.getNumInstancesPerPartition(), 0, 0, minimizeDataMovement, null);
}
- return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig);
+ return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, minimizeDataMovement);
}
public static boolean isMirrorServerSetAssignment(TableConfig tableConfig,
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index ed9d605af0..74f857a102 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -212,7 +212,7 @@ public class TableConfigSerDeTest {
InstanceAssignmentConfig instanceAssignmentConfig =
new InstanceAssignmentConfig(new InstanceTagPoolConfig("tenant_OFFLINE", true, 3, null),
new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")),
- new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false, null));
+ new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false, null), null, false);
TableConfig tableConfig = tableConfigBuilder.setInstanceAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig)).build();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
index 294971615a..de96d4da4d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
@@ -50,8 +50,8 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector
private static final Logger LOGGER = LoggerFactory.getLogger(FDAwareInstancePartitionSelector.class);
public FDAwareInstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
- String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions) {
- super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions);
+ String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
+ super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement);
}
/**
@@ -152,7 +152,7 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector
* initialize the new replicaGroupBasedAssignmentState for assignment,
* place existing instances in their corresponding positions
*/
- if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) {
+ if (_minimizeDataMovement && _existingInstancePartitions != null) {
int numExistingReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
int numExistingPartitions = _existingInstancePartitions.getNumPartitions();
/*
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index 6d869b86c1..09866c1ed7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -64,8 +64,8 @@ public class InstanceAssignmentDriver {
}
public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType,
- List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, @Nullable
- InstancePartitions preConfiguredInstancePartitions) {
+ List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions,
+ @Nullable InstancePartitions preConfiguredInstancePartitions) {
String tableNameWithType = _tableConfig.getTableName();
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType);
@@ -88,8 +88,10 @@ public class InstanceAssignmentDriver {
String tableNameWithType = _tableConfig.getTableName();
LOGGER.info("Starting {} instance assignment for table {}", instancePartitionsName, tableNameWithType);
+ boolean minimizeDataMovement = instanceAssignmentConfig.isMinimizeDataMovement();
InstanceTagPoolSelector tagPoolSelector =
- new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType);
+ new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType,
+ minimizeDataMovement, existingInstancePartitions);
Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = tagPoolSelector.selectInstances(instanceConfigs);
InstanceConstraintConfig constraintConfig = instanceAssignmentConfig.getConstraintConfig();
@@ -106,7 +108,7 @@ public class InstanceAssignmentDriver {
InstancePartitionSelector instancePartitionSelector =
InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(),
instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions,
- preConfiguredInstancePartitions);
+ preConfiguredInstancePartitions, minimizeDataMovement);
InstancePartitions instancePartitions = new InstancePartitions(instancePartitionsName);
instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions);
return instancePartitions;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
index 396b869924..5f92db2426 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
@@ -29,12 +29,14 @@ abstract class InstancePartitionSelector {
protected final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig;
protected final String _tableNameWithType;
protected final InstancePartitions _existingInstancePartitions;
+ protected final boolean _minimizeDataMovement;
public InstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
- String tableNameWithType, InstancePartitions existingInstancePartitions) {
+ String tableNameWithType, InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
_replicaGroupPartitionConfig = replicaGroupPartitionConfig;
_tableNameWithType = tableNameWithType;
_existingInstancePartitions = existingInstancePartitions;
+ _minimizeDataMovement = minimizeDataMovement;
}
/**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
index 256aa89b02..8a343b1598 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
@@ -19,6 +19,7 @@
package org.apache.pinot.controller.helix.core.assignment.instance;
import java.util.Arrays;
+import javax.annotation.Nullable;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
@@ -31,25 +32,18 @@ public class InstancePartitionSelectorFactory {
public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType,
- InstancePartitions existingInstancePartitions) {
- return getInstance(partitionSelector, instanceReplicaGroupPartitionConfig, tableNameWithType,
- existingInstancePartitions, null);
- }
-
- public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
- InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType,
- InstancePartitions existingInstancePartitions, InstancePartitions preConfiguredInstancePartitions
- ) {
+ InstancePartitions existingInstancePartitions, @Nullable InstancePartitions preConfiguredInstancePartitions,
+ boolean minimizeDataMovement) {
switch (partitionSelector) {
case FD_AWARE_INSTANCE_PARTITION_SELECTOR:
return new FDAwareInstancePartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
- existingInstancePartitions);
+ existingInstancePartitions, minimizeDataMovement);
case INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR:
return new InstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
- existingInstancePartitions);
+ existingInstancePartitions, minimizeDataMovement);
case MIRROR_SERVER_SET_PARTITION_SELECTOR:
return new MirrorServerSetInstancePartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
- existingInstancePartitions, preConfiguredInstancePartitions);
+ existingInstancePartitions, preConfiguredInstancePartitions, minimizeDataMovement);
default:
throw new IllegalStateException("Unexpected PartitionSelector: " + partitionSelector + ", should be from"
+ Arrays.toString(InstanceAssignmentConfig.PartitionSelector.values()));
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index de1e681d17..79e95db7a6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -22,18 +22,21 @@ import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeMap;
import javax.annotation.Nullable;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.apache.pinot.spi.utils.Pairs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,8 +49,8 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
private static final Logger LOGGER = LoggerFactory.getLogger(InstanceReplicaGroupPartitionSelector.class);
public InstanceReplicaGroupPartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
- String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions) {
- super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions);
+ String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
+ super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement);
}
/**
@@ -73,16 +76,65 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
- for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
- // Pick one pool for each replica-group based on the table name hash
- int pool = pools.get((tableNameHash + replicaId) % numPools);
- poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
- replicaGroupIdToPoolMap.put(replicaId, pool);
+ Map<String, Integer> instanceToPoolMap = new HashMap<>();
+ for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+ Integer pool = entry.getKey();
+ List<InstanceConfig> instanceConfigsInPool = entry.getValue();
+ Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
+ for (InstanceConfig instanceConfig : instanceConfigsInPool) {
+ String instanceName = instanceConfig.getInstanceName();
+ candidateInstances.add(instanceName);
+ instanceToPoolMap.put(instanceName, pool);
+ }
+ }
- Set<String> candidateInstances =
- poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
- List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool);
- instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName()));
+ if (_minimizeDataMovement && _existingInstancePartitions != null) {
+ // Keep the same pool for the replica group if it's already been used for the table.
+ int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+ int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+ int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);
+ for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) {
+ boolean foundExistingReplicaGroup = false;
+ for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingReplicaGroup; partitionId++) {
+ List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+ for (String existingInstance : existingInstances) {
+ Integer existingPool = instanceToPoolMap.get(existingInstance);
+ if (existingPool != null & pools.contains(existingPool)) {
+ poolToReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new ArrayList<>()).add(replicaGroupId);
+ replicaGroupIdToPoolMap.put(replicaGroupId, existingPool);
+ foundExistingReplicaGroup = true;
+ break;
+ }
+ }
+ }
+ }
+ // Use a min heap to track the least frequently picked pool among all the pools
+ PriorityQueue<Pairs.IntPair> minHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator());
+ for (int pool : pools) {
+ int numExistingReplicaGroups =
+ poolToReplicaGroupIdsMap.get(pool) != null ? poolToReplicaGroupIdsMap.get(pool).size() : 0;
+ minHeap.add(new Pairs.IntPair(numExistingReplicaGroups, pool));
+ }
+ for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
+ if (replicaGroupIdToPoolMap.containsKey(replicaId)) {
+ continue;
+ }
+ // Increment the frequency for a given pool and put it back to the min heap to rotate the pool selection.
+ Pairs.IntPair pair = minHeap.remove();
+ int pool = pair.getRight();
+ pair.setLeft(pair.getLeft() + 1);
+ minHeap.add(pair);
+ poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
+ replicaGroupIdToPoolMap.put(replicaId, pool);
+ }
+ } else {
+ // Current default way to assign pool to replica groups.
+ for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
+ // Pick one pool for each replica-group based on the table name hash
+ int pool = pools.get((tableNameHash + replicaId) % numPools);
+ poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
+ replicaGroupIdToPoolMap.put(replicaId, pool);
+ }
}
LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap,
_tableNameWithType);
@@ -132,7 +184,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}",
numPartitions, numInstancesPerPartition, _tableNameWithType);
- if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) {
+ if (_minimizeDataMovement && _existingInstancePartitions != null) {
// Minimize data movement.
int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
@@ -257,7 +309,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
}
List<String> instancesToSelect;
- if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) {
+ if (_minimizeDataMovement && _existingInstancePartitions != null) {
// Minimize data movement.
List<String> existingInstances = _existingInstancePartitions.getInstances(0, 0);
LinkedHashSet<String> candidateInstances = new LinkedHashSet<>();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 5aefd1ad69..755e7aa713 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -21,11 +21,15 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import javax.annotation.Nullable;
import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
import org.slf4j.Logger;
@@ -41,9 +45,17 @@ public class InstanceTagPoolSelector {
private final InstanceTagPoolConfig _tagPoolConfig;
private final String _tableNameWithType;
- public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType) {
+ private final boolean _minimizeDataMovement;
+
+ private final InstancePartitions _existingInstancePartitions;
+
+ public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType,
+ boolean minimizeDataMovement,
+ @Nullable InstancePartitions existingInstancePartitions) {
_tagPoolConfig = tagPoolConfig;
_tableNameWithType = tableNameWithType;
+ _minimizeDataMovement = minimizeDataMovement;
+ _existingInstancePartitions = existingInstancePartitions;
}
/**
@@ -70,12 +82,14 @@ public class InstanceTagPoolSelector {
if (_tagPoolConfig.isPoolBased()) {
// Pool based selection
+ Map<String, Integer> instanceToPoolMap = new HashMap<>();
// Extract the pool information from the instance configs
for (InstanceConfig instanceConfig : candidateInstanceConfigs) {
Map<String, String> poolMap = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY);
if (poolMap != null && poolMap.containsKey(tag)) {
int pool = Integer.parseInt(poolMap.get(tag));
poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(instanceConfig);
+ instanceToPoolMap.put(instanceConfig.getInstanceName(), pool);
}
}
Preconditions.checkState(!poolToInstanceConfigsMap.isEmpty(),
@@ -96,9 +110,8 @@ public class InstanceTagPoolSelector {
int numPools = poolToInstanceConfigsMap.size();
int numPoolsToSelect = _tagPoolConfig.getNumPools();
if (numPoolsToSelect > 0) {
- Preconditions
- .checkState(numPoolsToSelect <= numPools, "Not enough instance pools (%s in the cluster, asked for %s)",
- numPools, numPoolsToSelect);
+ Preconditions.checkState(numPoolsToSelect <= numPools,
+ "Not enough instance pools (%s in the cluster, asked for %s)", numPools, numPoolsToSelect);
} else {
numPoolsToSelect = numPools;
}
@@ -109,11 +122,45 @@ public class InstanceTagPoolSelector {
return poolToInstanceConfigsMap;
}
- // Select pools based on the table name hash to evenly distribute the tables
poolsToSelect = new ArrayList<>(numPoolsToSelect);
- List<Integer> poolsInCluster = new ArrayList<>(pools);
- for (int i = 0; i < numPoolsToSelect; i++) {
- poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+ if (_minimizeDataMovement && _existingInstancePartitions != null) {
+ Set<Integer> existingPools = new HashSet<>(numPoolsToSelect);
+ // Keep the same pool if it's already been used for the table.
+ int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+ int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+ for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
+ boolean foundExistingPoolForReplicaGroup = false;
+ for (int partitionId = 0; partitionId < existingNumPartitions & !foundExistingPoolForReplicaGroup;
+ partitionId++) {
+ List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+ for (String existingInstance : existingInstances) {
+ Integer existingPool = instanceToPoolMap.get(existingInstance);
+ if (existingPool != null & pools.contains(existingPool)) {
+ poolsToSelect.add(existingPool);
+ existingPools.add(existingPool);
+ foundExistingPoolForReplicaGroup = true;
+ break;
+ }
+ }
+ }
+ }
+ LOGGER.info("Keep the same pool: {} for table: {}", existingPools, _tableNameWithType);
+ // Pick a pool from remainingPools that isn't used before.
+ List<Integer> remainingPools = new ArrayList<>(pools);
+ remainingPools.retainAll(existingPools);
+ // Skip selecting the existing pool.
+ for (int i = 0; i < numPoolsToSelect; i++) {
+ if (existingPools.contains(i)) {
+ continue;
+ }
+ poolsToSelect.add(remainingPools.remove(i % remainingPools.size()));
+ }
+ } else {
+ // Select pools based on the table name hash to evenly distribute the tables
+ List<Integer> poolsInCluster = new ArrayList<>(pools);
+ for (int i = 0; i < numPoolsToSelect; i++) {
+ poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+ }
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
index 6b4086615a..f273866eeb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
@@ -76,8 +76,8 @@ public class MirrorServerSetInstancePartitionSelector extends InstancePartitionS
public MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions,
- InstancePartitions preConfiguredInstancePartitions) {
- super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions);
+ InstancePartitions preConfiguredInstancePartitions, boolean minimizeDataMovement) {
+ super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement);
_preConfiguredInstancePartitions = preConfiguredInstancePartitions;
_numTargetInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
_numTargetReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
index dedc79384e..9feb8844c8 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
@@ -118,7 +118,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
// Add OFFLINE instance assignment config to the offline table config
InstanceAssignmentConfig offlineInstanceAssignmentConfig = new InstanceAssignmentConfig(
new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), false, 0, null), null,
- new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null));
+ new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null), null, false);
offlineTableConfig.setInstanceAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), offlineInstanceAssignmentConfig));
_helixResourceManager.setExistingTableConfig(offlineTableConfig);
@@ -136,7 +136,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
// Add CONSUMING instance assignment config to the real-time table config
InstanceAssignmentConfig consumingInstanceAssignmentConfig = new InstanceAssignmentConfig(
new InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(SERVER_TENANT_NAME), false, 0, null), null,
- new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null));
+ new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null), null, false);
realtimeTableConfig.setInstanceAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.CONSUMING.toString(), consumingInstanceAssignmentConfig));
_helixResourceManager.setExistingTableConfig(realtimeTableConfig);
@@ -164,7 +164,7 @@ public class PinotInstanceAssignmentRestletResourceStatelessTest extends Control
null)));
InstanceAssignmentConfig tierInstanceAssignmentConfig = new InstanceAssignmentConfig(
new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), false, 0, null), null,
- new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null));
+ new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null), null, false);
Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = new HashMap<>();
instanceAssignmentConfigMap.put(InstancePartitionsType.OFFLINE.toString(), offlineInstanceAssignmentConfig);
instanceAssignmentConfigMap.put(TIER_NAME, tierInstanceAssignmentConfig);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index b25a529e10..a6220c00a2 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -374,7 +374,7 @@ public class InstanceAssignmentTest {
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+ InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured"))
.build();
InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig);
@@ -480,7 +480,7 @@ public class InstanceAssignmentTest {
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+ InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig);
InstancePartitions preConfigured = new InstancePartitions("preConfigured");
@@ -561,7 +561,7 @@ public class InstanceAssignmentTest {
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+ InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
driver = new InstanceAssignmentDriver(tableConfig);
preConfigured = new InstancePartitions("preConfigured");
@@ -664,7 +664,7 @@ public class InstanceAssignmentTest {
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+ InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
driver = new InstanceAssignmentDriver(tableConfig);
preConfigured = new InstancePartitions("preConfigured");
@@ -756,7 +756,7 @@ public class InstanceAssignmentTest {
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+ InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
driver = new InstanceAssignmentDriver(tableConfig);
preConfigured = new InstancePartitions("preConfigured");
@@ -851,7 +851,7 @@ public class InstanceAssignmentTest {
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+ InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
driver = new InstanceAssignmentDriver(tableConfig);
preConfigured = new InstancePartitions("preConfigured");
@@ -956,7 +956,7 @@ public class InstanceAssignmentTest {
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+ InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
driver = new InstanceAssignmentDriver(tableConfig);
preConfigured = new InstancePartitions("preConfigured");
@@ -1063,7 +1063,7 @@ public class InstanceAssignmentTest {
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+ InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
driver = new InstanceAssignmentDriver(tableConfig);
preConfigured = new InstancePartitions("preConfigured");
@@ -1156,7 +1156,7 @@ public class InstanceAssignmentTest {
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+ InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
driver = new InstanceAssignmentDriver(tableConfig);
@@ -1230,7 +1230,7 @@ public class InstanceAssignmentTest {
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+ InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString(), false)))
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build();
driver = new InstanceAssignmentDriver(tableConfig);
@@ -1311,7 +1311,7 @@ public class InstanceAssignmentTest {
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false, null);
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig))).build();
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false))).build();
InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig);
// Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
@@ -1364,7 +1364,7 @@ public class InstanceAssignmentTest {
// Select all 3 pools in pool selection
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false)));
// Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
// All instances in pool 2 should be assigned to replica-group 0, and all instances in pool 0 should be assigned to
@@ -1386,7 +1386,7 @@ public class InstanceAssignmentTest {
// Select pool 0 and 1 in pool selection
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1));
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false)));
// Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
// All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
@@ -1408,7 +1408,7 @@ public class InstanceAssignmentTest {
numReplicaGroups = numPools;
replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, false, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, false)));
// Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
// [pool0, pool1]
@@ -1438,7 +1438,7 @@ public class InstanceAssignmentTest {
replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null);
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true)));
// Reset the instance configs to have only two pools.
instanceConfigs.clear();
numInstances = 10;
@@ -1487,7 +1487,7 @@ public class InstanceAssignmentTest {
// Select pool 0 and 1 in pool selection
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1));
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true)));
// Get the latest existingInstancePartitions from last computation.
existingInstancePartitions = instancePartitions;
@@ -1514,7 +1514,7 @@ public class InstanceAssignmentTest {
numReplicaGroups = 3;
replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true)));
// Get the latest existingInstancePartitions from last computation.
existingInstancePartitions = instancePartitions;
@@ -1593,7 +1593,7 @@ public class InstanceAssignmentTest {
numReplicaGroups = 2;
replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true)));
// Get the latest existingInstancePartitions from last computation.
existingInstancePartitions = instancePartitions;
@@ -1693,6 +1693,109 @@ public class InstanceAssignmentTest {
assertEquals(instancePartitions.getInstances(0, 1),
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+
+ // The below is the test suite for testing out minimizeDataMovement with pool configs
+ // Add the third pool with same number of instances but keep number of pools the same (i.e. 2)
+ numPools = 3;
+ numInstances = numPools * numInstancesPerPool;
+ for (int i = numInstances + 4; i < numInstances + 9; i++) {
+ InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+ instanceConfig.addTag(OFFLINE_TAG);
+ int pool = numPools - 1;
+ instanceConfig.getRecord()
+ .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+ instanceConfigs.add(instanceConfig);
+ }
+
+ // Get the latest existingInstancePartitions from last computation.
+ existingInstancePartitions = instancePartitions;
+
+ // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2, but since minimizeDataMovement is enabled,
+ // same pools would be re-used.
+ // [pool0, pool1]
+ // r0 r1
+ // Thus, the instance partition assignment remains the same as the previous one.
+ // pool 0: [ i12, i4, i0, i1, i10 ]
+ // pool 1: [ i7, i9, i11, i13, i6 ]
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0,
+ SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
+ SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+
+ // Set tag pool config to 3.
+ tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true)));
+
+ // Get the latest existingInstancePartitions from last computation.
+ existingInstancePartitions = instancePartitions;
+
+ // Putting the existingPoolToInstancesMap shouldn't change the instance assignment,
+ // as there are only 2 replica groups needed.
+ // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+ // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+ // But since Pool 0 and Pool 1 is already being used for the table, the numReplica remains at 2,
+ // so the 3rd pool (Pool 2) won't be picked up.
+ // Thus, the instance partition assignment remains the same as the existing one.
+ // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
+ // replica-group 1
+ // Now in poolToInstancesMap:
+ // pool 0: [ i12, i4, i0, i1, i10 ]
+ // pool 1: [ i7, i9, i11, i13, i6 ]
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0,
+ SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
+ SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+
+ // Set replica group from 2 to 3
+ numReplicaGroups = 3;
+ replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0, true, null);
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, null, true)));
+
+ // Get the latest existingInstancePartitions from last computation.
+ existingInstancePartitions = instancePartitions;
+
+ // Now that 1 more replica group is needed, Pool 2 will be chosen for the 3rd replica group
+ // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+ // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+ // [pool0, pool1, pool2]
+ // r0 r1 r2
+ // Each replica-group should have 2 instances assigned
+ // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+ // Latest instances from ZK:
+ // pool 0: [ i3, i4, i0, i1, i2 ]
+ // pool 1: [ i8, i9, i5, i6, i7 ]
+ // pool 2: [ i22,i23,i19,i20,i21]
+ // Thus, the new assignment will become:
+ // pool 0: [ i12, i4, i0, i1, i10 ]
+ // pool 1: [ i7, i9, i11, i13, i6 ]
+ // pool 2: [ i22, i23, i19, i20,i21 ]
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 0,
+ SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 10));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 11,
+ SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 6));
+ assertEquals(instancePartitions.getInstances(0, 2),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 22, SERVER_INSTANCE_ID_PREFIX + 23, SERVER_INSTANCE_ID_PREFIX + 19,
+ SERVER_INSTANCE_ID_PREFIX + 20, SERVER_INSTANCE_ID_PREFIX + 21));
}
@Test
@@ -1720,7 +1823,7 @@ public class InstanceAssignmentTest {
InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
// No instance with correct tag
try {
@@ -1750,7 +1853,7 @@ public class InstanceAssignmentTest {
// Enable pool
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
// No instance has correct pool configured
try {
@@ -1784,7 +1887,7 @@ public class InstanceAssignmentTest {
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 3, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
// Ask for too many pools
try {
@@ -1796,7 +1899,7 @@ public class InstanceAssignmentTest {
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 2));
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
// Ask for pool that does not exist
try {
@@ -1810,7 +1913,7 @@ public class InstanceAssignmentTest {
replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(false, 6, 0, 0, 0, 0, false, null
);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
// Ask for too many instances
try {
@@ -1824,7 +1927,7 @@ public class InstanceAssignmentTest {
replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 0, 0, false, null
);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
// Number of replica-groups must be positive
try {
@@ -1836,7 +1939,7 @@ public class InstanceAssignmentTest {
replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 11, 0, 0, 0, false, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
// Ask for too many replica-groups
try {
@@ -1849,7 +1952,7 @@ public class InstanceAssignmentTest {
replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 3, 0, 0, false, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
// Ask for too many instances
try {
@@ -1861,7 +1964,7 @@ public class InstanceAssignmentTest {
replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 3, false, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
// Ask for too many instances per partition
try {
@@ -1874,7 +1977,7 @@ public class InstanceAssignmentTest {
replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 0, false, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
// Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
// pool0: [i3, i4, i0, i1, i2]
@@ -1914,7 +2017,8 @@ public class InstanceAssignmentTest {
try {
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, "ILLEGAL_SELECTOR"))).build();
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, "ILLEGAL_SELECTOR", false)))
+ .build();
} catch (IllegalArgumentException e) {
assertEquals(e.getMessage(),
"No enum constant org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig.PartitionSelector"
@@ -1943,7 +2047,8 @@ public class InstanceAssignmentTest {
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+ InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
+ .build();
driver = new InstanceAssignmentDriver(tableConfig);
try {
instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
@@ -1976,7 +2081,8 @@ public class InstanceAssignmentTest {
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+ InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
+ .build();
driver = new InstanceAssignmentDriver(tableConfig);
try {
instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
@@ -2017,7 +2123,8 @@ public class InstanceAssignmentTest {
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+ InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
+ .build();
driver = new InstanceAssignmentDriver(tableConfig);
try {
instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null);
@@ -2055,7 +2162,8 @@ public class InstanceAssignmentTest {
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+ InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
+ .build();
InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig);
InstancePartitions instancePartitions =
@@ -2127,7 +2235,8 @@ public class InstanceAssignmentTest {
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
+ InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true)))
+ .build();
driver = new InstanceAssignmentDriver(tableConfig);
// existingInstancePartitions = instancePartitions
instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, instancePartitions);
@@ -2208,7 +2317,7 @@ public class InstanceAssignmentTest {
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+ InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
.setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerReplicaGroup))
.setSegmentPartitionConfig(segmentPartitionConfig).build();
driver = new InstanceAssignmentDriver(tableConfig);
@@ -2282,7 +2391,7 @@ public class InstanceAssignmentTest {
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+ InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
.build();
driver = new InstanceAssignmentDriver(tableConfig);
@@ -2338,7 +2447,7 @@ public class InstanceAssignmentTest {
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+ InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true)))
.build();
driver = new InstanceAssignmentDriver(tableConfig);
@@ -2405,7 +2514,7 @@ public class InstanceAssignmentTest {
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+ InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
.build();
driver = new InstanceAssignmentDriver(tableConfig);
@@ -2471,7 +2580,7 @@ public class InstanceAssignmentTest {
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+ InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true)))
.build();
driver = new InstanceAssignmentDriver(tableConfig);
@@ -2542,7 +2651,7 @@ public class InstanceAssignmentTest {
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+ InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
.build();
driver = new InstanceAssignmentDriver(tableConfig);
@@ -2593,7 +2702,7 @@ public class InstanceAssignmentTest {
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+ InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), false)))
.build();
driver = new InstanceAssignmentDriver(tableConfig);
@@ -2657,7 +2766,7 @@ public class InstanceAssignmentTest {
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME + TABLE_NAME_ZERO_HASH_COMPLEMENT)
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, instanceConstraintConfig, replicaPartitionConfig,
- InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
+ InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(), true)))
.build();
driver = new InstanceAssignmentDriver(tableConfig);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
index 889206437f..2fdef27796 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
@@ -64,7 +64,7 @@ public class InstanceReplicaGroupPartitionSelectorTest {
new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null);
InstanceReplicaGroupPartitionSelector selector =
- new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", existing);
+ new InstanceReplicaGroupPartitionSelector(config, "tableNameBlah", existing, true);
String[] serverNames = {"rg0-0", "rg0-1", "rg1-0", "rg1-1"};
String[] poolNumbers = {"0", "0", "1", "1"};
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 5d679c0380..1df7109ef2 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -195,7 +195,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest {
InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
_helixResourceManager.updateTableConfig(tableConfig);
// No need to reassign instances because instances should be automatically assigned when updating the table config
@@ -481,7 +481,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest {
InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0, 0, false, null);
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(TIER_A_NAME,
- new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig)));
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false)));
_helixResourceManager.updateTableConfig(tableConfig);
rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
index 391ba4812d..ad4b22ecaf 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
@@ -41,13 +41,17 @@ public class InstanceAssignmentConfig extends BaseJsonConfig {
"Configuration for the instance replica-group and partition of the instance assignment (mandatory)")
private final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig;
+ @JsonPropertyDescription("Configuration to minimize data movement for pool and instance assignment")
+ private final boolean _minimizeDataMovement;
+
@JsonCreator
public InstanceAssignmentConfig(
@JsonProperty(value = "tagPoolConfig", required = true) InstanceTagPoolConfig tagPoolConfig,
@JsonProperty("constraintConfig") @Nullable InstanceConstraintConfig constraintConfig,
@JsonProperty(value = "replicaGroupPartitionConfig", required = true)
InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
- @JsonProperty("partitionSelector") @Nullable String partitionSelector) {
+ @JsonProperty("partitionSelector") @Nullable String partitionSelector,
+ @JsonProperty("minimizeDataMovement") boolean minimizeDataMovement) {
Preconditions.checkArgument(tagPoolConfig != null, "'tagPoolConfig' must be configured");
Preconditions
.checkArgument(replicaGroupPartitionConfig != null, "'replicaGroupPartitionConfig' must be configured");
@@ -57,11 +61,7 @@ public class InstanceAssignmentConfig extends BaseJsonConfig {
_partitionSelector =
partitionSelector == null ? PartitionSelector.INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR
: PartitionSelector.valueOf(partitionSelector);
- }
-
- public InstanceAssignmentConfig(InstanceTagPoolConfig tagPoolConfig, InstanceConstraintConfig constraintConfig,
- InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig) {
- this(tagPoolConfig, constraintConfig, replicaGroupPartitionConfig, null);
+ _minimizeDataMovement = minimizeDataMovement;
}
public PartitionSelector getPartitionSelector() {
@@ -81,6 +81,10 @@ public class InstanceAssignmentConfig extends BaseJsonConfig {
return _replicaGroupPartitionConfig;
}
+ public boolean isMinimizeDataMovement() {
+ return _minimizeDataMovement;
+ }
+
public enum PartitionSelector {
FD_AWARE_INSTANCE_PARTITION_SELECTOR, INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR,
MIRROR_SERVER_SET_PARTITION_SELECTOR
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java
index adc72e8f1c..1bc40cba21 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceReplicaGroupPartitionConfig.java
@@ -56,6 +56,8 @@ public class InstanceReplicaGroupPartitionConfig extends BaseJsonConfig {
"Name of the column used for partition, if not provided table level replica group will be used")
private final String _partitionColumn;
+ // TODO: remove this config in the next official release
+ @Deprecated
private final boolean _minimizeDataMovement;
@JsonCreator
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org