You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/01 07:17:18 UTC
(pinot) 05/05: Enhance algorithm
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch maintain-pool-selection-for-minimizeDataMovement
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit a286bd868f434c72e32351890566d684ec14a378
Author: Xiaotian (Jackie) Jiang <ja...@gmail.com>
AuthorDate: Wed Jan 31 23:15:41 2024 -0800
Enhance algorithm
---
.../instance/FDAwareInstancePartitionSelector.java | 7 +-
.../instance/InstancePartitionSelector.java | 10 +-
.../InstanceReplicaGroupPartitionSelector.java | 768 +++++++++++----------
.../instance/InstanceTagPoolSelector.java | 56 +-
.../instance/InstanceAssignmentTest.java | 43 +-
.../InstanceReplicaGroupPartitionSelectorTest.java | 141 ++--
.../java/org/apache/pinot/spi/utils/Pairs.java | 23 +-
7 files changed, 562 insertions(+), 486 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
index de96d4da4d..89d64272e3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
@@ -109,10 +109,7 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector
return new ImmutablePair<>(numReplicaGroups, numInstancesPerReplicaGroup);
}
- /**
- * Selects instances based on the replica-group/partition config, and stores the result into the given instance
- * partitions.
- */
+ @Override
public void selectInstances(Map<Integer, List<InstanceConfig>> faultDomainToInstanceConfigsMap,
InstancePartitions instancePartitions) {
@@ -152,7 +149,7 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector
* initialize the new replicaGroupBasedAssignmentState for assignment,
* place existing instances in their corresponding positions
*/
- if (_minimizeDataMovement && _existingInstancePartitions != null) {
+ if (_minimizeDataMovement) {
int numExistingReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
int numExistingPartitions = _existingInstancePartitions.getNumPartitions();
/*
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
index 335070b003..b80ad8bba9 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelector.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
@@ -32,13 +33,14 @@ abstract class InstancePartitionSelector {
protected final boolean _minimizeDataMovement;
public InstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
- String tableNameWithType, InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
+ String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
_replicaGroupPartitionConfig = replicaGroupPartitionConfig;
_tableNameWithType = tableNameWithType;
_existingInstancePartitions = existingInstancePartitions;
- // For backward compatibility, enable minimize data movement when it is enabled in top level or instance
- // partition selector level.
- _minimizeDataMovement = minimizeDataMovement || replicaGroupPartitionConfig.isMinimizeDataMovement();
+ // For backward compatibility, enable minimize data movement when it is enabled in top level or instance partition
+ // selector level
+ _minimizeDataMovement = (minimizeDataMovement || replicaGroupPartitionConfig.isMinimizeDataMovement())
+ && existingInstancePartitions != null;
}
/**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index 505006f1d3..8da6dbe2f6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -19,24 +19,22 @@
package org.apache.pinot.controller.helix.core.assignment.instance;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeMap;
import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
-import org.apache.pinot.spi.utils.Pairs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,10 +51,7 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement);
}
- /**
- * Selects instances based on the replica-group/partition config, and stores the result into the given instance
- * partitions.
- */
+ @Override
public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
InstancePartitions instancePartitions) {
int numPools = poolToInstanceConfigsMap.size();
@@ -65,393 +60,448 @@ public class InstanceReplicaGroupPartitionSelector extends InstancePartitionSele
int tableNameHash = Math.abs(_tableNameWithType.hashCode());
List<Integer> pools = new ArrayList<>(poolToInstanceConfigsMap.keySet());
pools.sort(null);
- LOGGER.info("Starting instance replica-group/partition selection for table: {} with hash: {} from pools: {}",
- _tableNameWithType, tableNameHash, pools);
+ LOGGER.info("Starting instance replica-group/partition selection for table: {} with hash: {} from pools: {}, "
+ + "minimize data movement: {}", _tableNameWithType, tableNameHash, pools, _minimizeDataMovement);
if (_replicaGroupPartitionConfig.isReplicaGroupBased()) {
- // Replica-group based selection
-
- int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups();
- Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive");
- Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
- Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>();
- Map<Integer, Set<Integer>> existingPoolToExistingReplicaGroupIdsMap = new TreeMap<>();
- Map<Integer, Set<String>> existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>();
- Map<Integer, Integer> replicaGroupIdToPoolMap = new TreeMap<>();
- Map<Integer, Set<String>> poolToCandidateInstancesMap = new TreeMap<>();
- Map<String, Integer> instanceToPoolMap = new HashMap<>();
- for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
- Integer pool = entry.getKey();
- List<InstanceConfig> instanceConfigsInPool = entry.getValue();
- Set<String> candidateInstances = poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new LinkedHashSet<>());
- for (InstanceConfig instanceConfig : instanceConfigsInPool) {
- String instanceName = instanceConfig.getInstanceName();
- candidateInstances.add(instanceName);
- instanceToPoolMap.put(instanceName, pool);
- }
+ if (_minimizeDataMovement) {
+ replicaGroupBasedMinimumMovement(poolToInstanceConfigsMap, instancePartitions, pools, tableNameHash);
+ } else {
+ replicaGroupBasedSimple(poolToInstanceConfigsMap, instancePartitions, pools, tableNameHash);
}
+ } else {
+ nonReplicaGroupBased(poolToInstanceConfigsMap, instancePartitions, pools, tableNameHash);
+ }
+ }
- if (_minimizeDataMovement && _existingInstancePartitions != null) {
- // Collect the stats between the existing pools, existing replica groups, and existing instances.
- int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
- int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
- for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
- for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
- List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
- for (String existingInstance : existingInstances) {
- Integer existingPool = instanceToPoolMap.get(existingInstance);
- if (existingPool != null) {
- existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>())
- .add(existingInstance);
- existingPoolToExistingReplicaGroupIdsMap.computeIfAbsent(existingPool, k -> new HashSet<>())
- .add(replicaGroupId);
- existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>())
- .add(existingInstance);
- }
- }
- }
- }
-
- // Use a max heap to track the number of servers used for the given pools,
- // so that pool with max number of existing instances will be considered first.
- PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false));
- for (int pool : pools) {
- maxHeap.add(
- new Pairs.IntPair(existingPoolsToExistingInstancesMap.computeIfAbsent(pool, k -> new HashSet<>()).size(),
- pool));
- }
+ private void nonReplicaGroupBased(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
+ InstancePartitions instancePartitions, List<Integer> pools, int tableNameHash) {
+ // Pick one pool based on the table name hash
+ int pool = pools.get(Math.abs(tableNameHash % pools.size()));
+ LOGGER.info("Selecting pool: {} for table: {}", pool, _tableNameWithType);
+ List<InstanceConfig> instanceConfigs = poolToInstanceConfigsMap.get(pool);
+ int numInstances = instanceConfigs.size();
- // Get the maximum number of replica groups per pool.
- int maxNumberOfReplicaGroupPerPool = numReplicaGroups / pools.size();
- // Given a pool number, assign replica group which has the max number of existing instances.
- // Repeat this process until the max number of replica groups per pool is reached.
- while (!maxHeap.isEmpty()) {
- Pairs.IntPair pair = maxHeap.remove();
- int poolNumber = pair.getRight();
- for (int i = 0; i < maxNumberOfReplicaGroupPerPool; i++) {
- Set<Integer> existingReplicaGroups = existingPoolToExistingReplicaGroupIdsMap.get(poolNumber);
- if (existingReplicaGroups == null || existingReplicaGroups.isEmpty()) {
- continue;
- }
- int targetReplicaGroupId = -1;
- int maxNumInstances = 0;
- for (int existingReplicaGroupId : existingReplicaGroups) {
- int numExistingInstances =
- existingReplicaGroupIdToExistingInstancesMap.getOrDefault(existingReplicaGroupId, new HashSet<>())
- .size();
- if (numExistingInstances > maxNumInstances) {
- maxNumInstances = numExistingInstances;
- targetReplicaGroupId = existingReplicaGroupId;
- }
- }
- // If target existing replica group cannot be found, it means it should be chosen from a new replica group.
- if (targetReplicaGroupId > -1) {
- poolToReplicaGroupIdsMap.computeIfAbsent(poolNumber, k -> new ArrayList<>()).add(targetReplicaGroupId);
- replicaGroupIdToPoolMap.put(targetReplicaGroupId, poolNumber);
- // Clear the stats so that the same replica group won't be picked up again in later iteration.
- existingReplicaGroupIdToExistingInstancesMap.get(targetReplicaGroupId).clear();
- }
- }
- }
+ // Assign all instances if not configured
+ int numInstancesToSelect = _replicaGroupPartitionConfig.getNumInstances();
+ if (numInstancesToSelect > 0) {
+ Preconditions.checkState(numInstancesToSelect <= numInstances,
+ "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstances,
+ numInstancesToSelect);
+ } else {
+ numInstancesToSelect = numInstances;
+ }
- // If there is any new replica group added, choose pool which is least frequently picked up.
- // Use a min heap to track the least frequently picked pool among all the pools.
- PriorityQueue<Pairs.IntPair> minHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator());
- for (int pool : pools) {
- int numExistingReplicaGroups =
- poolToReplicaGroupIdsMap.get(pool) != null ? poolToReplicaGroupIdsMap.get(pool).size() : 0;
- minHeap.add(new Pairs.IntPair(numExistingReplicaGroups, pool));
- }
- for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
- if (replicaGroupIdToPoolMap.containsKey(replicaId)) {
- continue;
- }
- // Increment the frequency for a given pool and put it back to the min heap to rotate the pool selection.
- Pairs.IntPair pair = minHeap.remove();
- int pool = pair.getRight();
- pair.setLeft(pair.getLeft() + 1);
- minHeap.add(pair);
- poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
- replicaGroupIdToPoolMap.put(replicaId, pool);
- }
- } else {
- // Current default way to assign pool to replica groups.
- for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
- // Pick one pool for each replica-group based on the table name hash
- int pool = pools.get((tableNameHash + replicaId) % numPools);
- poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId);
- replicaGroupIdToPoolMap.put(replicaId, pool);
- }
- }
- LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap,
- _tableNameWithType);
-
- int numInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
- if (numInstancesPerReplicaGroup > 0) {
- // Check if we have enough instances if number of instances per replica-group is configured
- for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
- int pool = entry.getKey();
- int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size();
- int numInstancesToSelect = numInstancesPerReplicaGroup * entry.getValue().size();
- Preconditions.checkState(numInstancesToSelect <= numInstancesInPool,
- "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstancesInPool,
- numInstancesToSelect);
- }
- } else {
- // Use as many instances as possible if number of instances per replica-group is not configured
- numInstancesPerReplicaGroup = Integer.MAX_VALUE;
- for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
- int pool = entry.getKey();
- int numReplicaGroupsInPool = entry.getValue().size();
- int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size();
- Preconditions.checkState(numReplicaGroupsInPool <= numInstancesInPool,
- "Not enough qualified instances from pool: %s, cannot select %s replica-groups from %s instances", pool,
- numReplicaGroupsInPool, numInstancesInPool);
- numInstancesPerReplicaGroup =
- Math.min(numInstancesPerReplicaGroup, numInstancesInPool / numReplicaGroupsInPool);
- }
+ List<String> instancesToSelect;
+ if (_minimizeDataMovement) {
+ List<String> existingInstances = _existingInstancePartitions.getInstances(0, 0);
+ LinkedHashSet<String> candidateInstances = Sets.newLinkedHashSetWithExpectedSize(instanceConfigs.size());
+ instanceConfigs.forEach(k -> candidateInstances.add(k.getInstanceName()));
+ instancesToSelect =
+ selectInstancesWithMinimumMovement(numInstancesToSelect, candidateInstances, existingInstances);
+ LOGGER.info("Selecting instances: {} for table: {}, existing instances: {}", instancesToSelect,
+ _tableNameWithType, existingInstances);
+ } else {
+ instancesToSelect = new ArrayList<>(numInstancesToSelect);
+ for (int i = 0; i < numInstancesToSelect; i++) {
+ instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
}
- LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup,
- _tableNameWithType);
+ LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType);
+ }
+ // Set the instances as partition 0 replica 0
+ instancePartitions.setInstances(0, 0, instancesToSelect);
+ }
+
+ /**
+ * Selects the instances with minimum movement.
+ * For each instance in the existing instances, if it is still alive, keep it in the same position. Then fill the
+ * vacant positions with the remaining candidate instances.
+ * NOTE: This method will modify the candidate instances.
+ */
+ private static List<String> selectInstancesWithMinimumMovement(int numInstancesToSelect,
+ LinkedHashSet<String> candidateInstances, List<String> existingInstances) {
+ // Initialize the list with empty positions to fill
+ List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
+ for (int i = 0; i < numInstancesToSelect; i++) {
+ instancesToSelect.add(null);
+ }
- // Assign instances within a replica-group to one partition if not configured
- int numPartitions = _replicaGroupPartitionConfig.getNumPartitions();
- if (numPartitions <= 0) {
- numPartitions = 1;
+ // Keep the existing instances that are still alive
+ int numInstancesToCheck = Math.min(numInstancesToSelect, existingInstances.size());
+ for (int i = 0; i < numInstancesToCheck; i++) {
+ String existingInstance = existingInstances.get(i);
+ if (candidateInstances.remove(existingInstance)) {
+ instancesToSelect.set(i, existingInstance);
}
- // Assign all instances within a replica-group to each partition if not configured
- int numInstancesPerPartition = _replicaGroupPartitionConfig.getNumInstancesPerPartition();
- if (numInstancesPerPartition > 0) {
- Preconditions.checkState(numInstancesPerPartition <= numInstancesPerReplicaGroup,
- "Number of instances per partition: %s must be smaller or equal to number of instances per replica-group:"
- + " %s", numInstancesPerPartition, numInstancesPerReplicaGroup);
- } else {
- numInstancesPerPartition = numInstancesPerReplicaGroup;
+ }
+
+ // Fill the vacant positions with the remaining candidate instances
+ Iterator<String> iterator = candidateInstances.iterator();
+ for (int i = 0; i < numInstancesToSelect; i++) {
+ if (instancesToSelect.get(i) == null) {
+ instancesToSelect.set(i, iterator.next());
}
- LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}",
- numPartitions, numInstancesPerPartition, _tableNameWithType);
-
- if (_minimizeDataMovement && _existingInstancePartitions != null) {
- // Minimize data movement.
- int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
- int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
- int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);
-
- existingReplicaGroupIdToExistingInstancesMap = new TreeMap<>();
- // Step 1: find out the replica groups and their existing instances,
- // so that these instances can be filtered out and won't be chosen for the other replica group.
- for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) {
- Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
- if (pool == null) {
- // Skip the replica group if it's no longer needed.
- continue;
- }
+ }
- for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
- List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
- existingReplicaGroupIdToExistingInstancesMap.computeIfAbsent(replicaGroupId, k -> new HashSet<>())
- .addAll(existingInstances);
- }
- }
+ return instancesToSelect;
+ }
- for (int replicaGroupId = 0; replicaGroupId < numCommonReplicaGroups; replicaGroupId++) {
- Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
- // Step 2: filter out instances that belong to other replica groups which should not be the candidate.
- LinkedHashSet<String> candidateInstances = new LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
- for (int otherReplicaGroupId = 0;
- otherReplicaGroupId < existingNumReplicaGroups && otherReplicaGroupId < numReplicaGroups;
- otherReplicaGroupId++) {
- if (replicaGroupId != otherReplicaGroupId) {
- candidateInstances.removeAll(existingReplicaGroupIdToExistingInstancesMap.get(otherReplicaGroupId));
- }
- }
- LinkedHashSet<String> chosenCandidateInstances = new LinkedHashSet<>();
- for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
- List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
- // Step 3: figure out the missing instances and the new instances to fill their vacant positions.
- List<String> instancesToSelect =
- getInstancesWithMinimumMovement(numInstancesPerPartition, candidateInstances, existingInstances);
- chosenCandidateInstances.addAll(instancesToSelect);
- instancePartitions.setInstances(partitionId, replicaGroupId, instancesToSelect);
- }
- // Remove instances that are already been chosen.
- poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances);
- }
+ private void replicaGroupBasedSimple(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
+ InstancePartitions instancePartitions, List<Integer> pools, int tableNameHash) {
+ int numPools = pools.size();
+ int numReplicaGroups = getNumReplicaGroups();
- // If the new number of replica groups is greater than the existing number of replica groups.
- for (int replicaGroupId = existingNumReplicaGroups; replicaGroupId < numReplicaGroups; replicaGroupId++) {
- int pool = replicaGroupIdToPoolMap.get(replicaGroupId);
- LinkedHashSet<String> candidateInstances = new LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
-
- Set<String> chosenCandidateInstances = new HashSet<>();
- for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
- List<String> existingInstances = Collections.emptyList();
- List<String> instancesToSelect =
- getInstancesWithMinimumMovement(numInstancesPerPartition, candidateInstances, existingInstances);
- chosenCandidateInstances.addAll(instancesToSelect);
- instancePartitions.setInstances(partitionId, replicaGroupId, instancesToSelect);
- }
- // Remove instances that are already been chosen.
- poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances);
- }
- } else {
- // Pick instances based on the sorted list of instance names.
- String[][] replicaGroupIdToInstancesMap = new String[numReplicaGroups][numInstancesPerReplicaGroup];
- for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
- List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(entry.getKey());
- List<Integer> replicaGroupIdsInPool = entry.getValue();
-
- // Use round-robin to assign instances to each replica-group so that they get instances with similar picking
- // priority
- // E.g. (within a pool, 10 instances, 2 replica-groups, 3 instances per replica-group)
- // [i0, i1, i2, i3, i4, i5, i6, i7, i8, i9]
- // r0 r1 r0 r1 r0 r1
- int instanceIdInPool = 0;
- for (int instanceIdInReplicaGroup = 0; instanceIdInReplicaGroup < numInstancesPerReplicaGroup;
- instanceIdInReplicaGroup++) {
- for (int replicaGroupId : replicaGroupIdsInPool) {
- replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup] =
- instanceConfigsInPool.get(instanceIdInPool++).getInstanceName();
- }
- }
+ // Pick one pool for each replica-group based on the table name hash
+ Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
+ int startIndex = Math.abs(tableNameHash % numPools);
+ for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
+ int pool = pools.get((startIndex + replicaGroupId) % numPools);
+ poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaGroupId);
+ }
+ LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap,
+ _tableNameWithType);
+
+ int numInstancesPerReplicaGroup =
+ getNumInstancesPerReplicaGroup(poolToInstanceConfigsMap, poolToReplicaGroupIdsMap);
+ LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup,
+ _tableNameWithType);
+ int numPartitions = getNumPartitions();
+ int numInstancesPerPartition = getNumInstancesPerPartition(numInstancesPerReplicaGroup);
+ LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}",
+ numPartitions, numInstancesPerPartition, _tableNameWithType);
+
+ // Pick instances based on the sorted list of instance names
+ String[][] replicaGroupIdToInstancesMap = new String[numReplicaGroups][numInstancesPerReplicaGroup];
+ for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
+ List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(entry.getKey());
+ List<Integer> replicaGroupIdsInPool = entry.getValue();
+
+ // Use round-robin to assign instances to each replica-group so that they get instances with similar picking
+ // priority
+ // E.g. (within a pool, 10 instances, 2 replica-groups, 3 instances per replica-group)
+ // [i0, i1, i2, i3, i4, i5, i6, i7, i8, i9]
+ // r0 r1 r0 r1 r0 r1
+ int instanceIdInPool = 0;
+ for (int instanceIdInReplicaGroup = 0; instanceIdInReplicaGroup < numInstancesPerReplicaGroup;
+ instanceIdInReplicaGroup++) {
+ for (int replicaGroupId : replicaGroupIdsInPool) {
+ replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup] =
+ instanceConfigsInPool.get(instanceIdInPool++).getInstanceName();
}
+ }
+ }
- // Assign consecutive instances within a replica-group to each partition.
- // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances per partition)
- // [i0, i1, i2, i3, i4]
- // p0 p0 p0 p1 p1
- // p1 p2 p2 p2
- for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
- int instanceIdInReplicaGroup = 0;
- for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
- List<String> instancesInPartition = new ArrayList<>(numInstancesPerPartition);
- for (int instanceIdInPartition = 0; instanceIdInPartition < numInstancesPerPartition;
- instanceIdInPartition++) {
- instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
- instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup;
- }
- LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}",
- instancesInPartition, replicaGroupId, partitionId, _tableNameWithType);
- instancePartitions.setInstances(partitionId, replicaGroupId, instancesInPartition);
- }
+ // Assign consecutive instances within a replica-group to each partition
+ // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances per partition)
+ // [i0, i1, i2, i3, i4]
+ // p0 p0 p0 p1 p1
+ // p1 p2 p2 p2
+ for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
+ String[] instancesInReplicaGroup = replicaGroupIdToInstancesMap[replicaGroupId];
+ int instanceIdInReplicaGroup = 0;
+ for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+ List<String> instances = new ArrayList<>(numInstancesPerPartition);
+ for (int i = 0; i < numInstancesPerPartition; i++) {
+ instances.add(instancesInReplicaGroup[instanceIdInReplicaGroup]);
+ instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup;
}
+ LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}", instances,
+ replicaGroupId, partitionId, _tableNameWithType);
+ instancePartitions.setInstances(partitionId, replicaGroupId, instances);
}
- } else {
- // Non-replica-group based selection
-
- // Pick one pool based on the table name hash
- int pool = pools.get(tableNameHash % numPools);
- LOGGER.info("Selecting pool: {} for table: {}", pool, _tableNameWithType);
- List<InstanceConfig> instanceConfigs = poolToInstanceConfigsMap.get(pool);
- int numInstanceConfigs = instanceConfigs.size();
-
- // Assign all instances if not configured
- int numInstancesToSelect = _replicaGroupPartitionConfig.getNumInstances();
- if (numInstancesToSelect > 0) {
- Preconditions.checkState(numInstancesToSelect <= numInstanceConfigs,
- "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstanceConfigs,
+ }
+ }
+
+ private int getNumReplicaGroups() {
+ int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups();
+ Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive");
+ return numReplicaGroups;
+ }
+
+ private int getNumInstancesPerReplicaGroup(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
+ Map<Integer, List<Integer>> poolToReplicaGroupIdsMap) {
+ int numInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
+ if (numInstancesPerReplicaGroup > 0) {
+ // Check if we have enough instances if number of instances per replica-group is configured
+ for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
+ int pool = entry.getKey();
+ int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size();
+ int numInstancesToSelect = numInstancesPerReplicaGroup * entry.getValue().size();
+ Preconditions.checkState(numInstancesToSelect <= numInstancesInPool,
+ "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstancesInPool,
numInstancesToSelect);
- } else {
- numInstancesToSelect = numInstanceConfigs;
}
-
- List<String> instancesToSelect;
- if (_minimizeDataMovement && _existingInstancePartitions != null) {
- // Minimize data movement.
- List<String> existingInstances = _existingInstancePartitions.getInstances(0, 0);
- LinkedHashSet<String> candidateInstances = new LinkedHashSet<>();
- instanceConfigs.forEach(k -> candidateInstances.add(k.getInstanceName()));
- instancesToSelect =
- getInstancesWithMinimumMovement(numInstancesToSelect, candidateInstances, existingInstances);
- } else {
- // Select instances sequentially.
- instancesToSelect = new ArrayList<>(numInstancesToSelect);
- for (int i = 0; i < numInstancesToSelect; i++) {
- instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
- }
+ } else {
+ // Use as many instances as possible if number of instances per replica-group is not configured
+ numInstancesPerReplicaGroup = Integer.MAX_VALUE;
+ for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
+ int pool = entry.getKey();
+ int numReplicaGroupsInPool = entry.getValue().size();
+ int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size();
+ Preconditions.checkState(numReplicaGroupsInPool <= numInstancesInPool,
+ "Not enough qualified instances from pool: %s, cannot select %s replica-groups from %s instances", pool,
+ numReplicaGroupsInPool, numInstancesInPool);
+ numInstancesPerReplicaGroup =
+ Math.min(numInstancesPerReplicaGroup, numInstancesInPool / numReplicaGroupsInPool);
}
- LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect, _tableNameWithType);
- // Set the instances as partition 0 replica 0
- instancePartitions.setInstances(0, 0, instancesToSelect);
}
+ return numInstancesPerReplicaGroup;
}
- /**
- * Select instances with minimum movement.
- * This algorithm can solve the following scenarios:
- * * swap an instance
- * * add/remove replica groups
- * * increase/decrease number of instances per replica group
- * TODO: handle the scenarios that selected pools are changed.
- * TODO: improve the algorithm by doing the following steps:
- * 1. assign the existing instances for all partitions;
- * 2. assign the vacant positions based on the partitions already assigned to each instance.
- * @param numInstancesToSelect number of instances to select
- * @param candidateInstances candidate instances to be selected
- * @param existingInstances list of existing instances
- */
- private static List<String> getInstancesWithMinimumMovement(int numInstancesToSelect,
- LinkedHashSet<String> candidateInstances, List<String> existingInstances) {
- // Initialize the list with empty positions to fill.
- List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
- for (int i = 0; i < numInstancesToSelect; i++) {
- instancesToSelect.add(null);
+ private int getNumPartitions() {
+ // Assign instances within a replica-group to one partition if not configured
+ int numPartitions = _replicaGroupPartitionConfig.getNumPartitions();
+ if (numPartitions <= 0) {
+ numPartitions = 1;
}
- Deque<String> newlyAddedInstances = new LinkedList<>();
+ return numPartitions;
+ }
+
+ private int getNumInstancesPerPartition(int numInstancesPerReplicaGroup) {
+ // Assign all instances within a replica-group to each partition if not configured
+ int numInstancesPerPartition = _replicaGroupPartitionConfig.getNumInstancesPerPartition();
+ if (numInstancesPerPartition > 0) {
+ Preconditions.checkState(numInstancesPerPartition <= numInstancesPerReplicaGroup,
+ "Number of instances per partition: %s must be smaller or equal to number of instances per replica-group: %s",
+ numInstancesPerPartition, numInstancesPerReplicaGroup);
+ } else {
+ numInstancesPerPartition = numInstancesPerReplicaGroup;
+ }
+ return numInstancesPerPartition;
+ }
+
+ private void replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
+ InstancePartitions instancePartitions, List<Integer> pools, int tableNameHash) {
+ int numPools = pools.size();
+ int numReplicaGroups = getNumReplicaGroups();
- // Find out the existing instances that are still alive.
- Set<String> existingInstancesStillAlive = new HashSet<>();
- for (String existingInstance : existingInstances) {
- if (candidateInstances.contains(existingInstance)) {
- existingInstancesStillAlive.add(existingInstance);
+ Map<String, Integer> instanceToPoolMap = new HashMap<>();
+ for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+ int pool = entry.getKey();
+ for (InstanceConfig instanceConfig : entry.getValue()) {
+ instanceToPoolMap.put(instanceConfig.getInstanceName(), pool);
}
}
- // Find out the newly added instances.
- for (String candidateInstance : candidateInstances) {
- if (!existingInstancesStillAlive.contains(candidateInstance)) {
- newlyAddedInstances.add(candidateInstance);
+ // Calculate the mapping from pool to replica-groups assigned to the pool
+ List<Set<String>> replicaGroupIdToExistingInstancesMap = new ArrayList<>(numReplicaGroups);
+ Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
+ int maxReplicaGroupsPerPool = (numReplicaGroups + numPools - 1) / numPools;
+ int startIndex = Math.abs(tableNameHash % numPools);
+
+ int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
+ int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
+ for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
+ // For each replica-group, gather number of existing instances within each pool
+ Set<String> existingInstanceSet = new HashSet<>();
+ replicaGroupIdToExistingInstancesMap.add(existingInstanceSet);
+ Map<Integer, Integer> poolToNumExistingInstancesMap = new TreeMap<>();
+ if (replicaGroupId < existingNumReplicaGroups) {
+ for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
+ List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+ existingInstanceSet.addAll(existingInstances);
+ for (String existingInstance : existingInstances) {
+ Integer existingPool = instanceToPoolMap.get(existingInstance);
+ if (existingPool != null) {
+ poolToNumExistingInstancesMap.merge(existingPool, 1, Integer::sum);
+ }
+ }
+ }
+ }
+ // Sort the pools based on the number of existing instances in the pool in descending order, then use the table
+ // name hash to break even
+ // Triple stores (pool, numExistingInstances, poolIndex) for sorting
+ List<Triple<Integer, Integer, Integer>> triples = new ArrayList<>(numPools);
+ for (int i = 0; i < numPools; i++) {
+ int pool = pools.get((startIndex + replicaGroupId + i) % numPools);
+ triples.add(Triple.of(pool, poolToNumExistingInstancesMap.getOrDefault(pool, 0), i));
+ }
+ triples.sort((o1, o2) -> {
+ int result = Integer.compare(o2.getMiddle(), o1.getMiddle());
+ return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight());
+ });
+ for (Triple<Integer, Integer, Integer> triple : triples) {
+ int pool = triple.getLeft();
+ List<Integer> replicaGroupIds = poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>());
+ if (replicaGroupIds.size() < maxReplicaGroupsPerPool) {
+ replicaGroupIds.add(replicaGroupId);
+ break;
+ }
}
}
+ LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap,
+ _tableNameWithType);
- int numExistingInstances = existingInstances.size();
- for (int i = 0; i < numInstancesToSelect; i++) {
- String existingInstance = i < numExistingInstances ? existingInstances.get(i) : null;
- String selectedInstance;
- if (existingInstance != null && candidateInstances.contains(existingInstance)) {
- selectedInstance = existingInstance;
- existingInstancesStillAlive.remove(selectedInstance);
- } else {
- selectedInstance = newlyAddedInstances.poll();
+ int numInstancesPerReplicaGroup =
+ getNumInstancesPerReplicaGroup(poolToInstanceConfigsMap, poolToReplicaGroupIdsMap);
+ LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup,
+ _tableNameWithType);
+ int numPartitions = getNumPartitions();
+ int numInstancesPerPartition = getNumInstancesPerPartition(numInstancesPerReplicaGroup);
+ LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}",
+ numPartitions, numInstancesPerPartition, _tableNameWithType);
+
+ List<List<String>> replicaGroupIdToInstancesMap = new ArrayList<>(numReplicaGroups);
+ for (int i = 0; i < numReplicaGroups; i++) {
+ replicaGroupIdToInstancesMap.add(new ArrayList<>(numInstancesPerReplicaGroup));
+ }
+ for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
+ // For each pool, keep the existing instances that are still alive within each replica-group
+ int pool = entry.getKey();
+ List<Integer> replicaGroupIds = entry.getValue();
+ List<String> newInstances = new ArrayList<>();
+ for (InstanceConfig instanceConfig : poolToInstanceConfigsMap.get(pool)) {
+ String instanceName = instanceConfig.getInstanceName();
+ boolean isExistingInstance = false;
+ for (int replicaGroupId : replicaGroupIds) {
+ List<String> instances = replicaGroupIdToInstancesMap.get(replicaGroupId);
+ if (instances.size() == numInstancesPerReplicaGroup) {
+ continue;
+ }
+ if (replicaGroupIdToExistingInstancesMap.get(replicaGroupId).contains(instanceName)) {
+ instances.add(instanceName);
+ isExistingInstance = true;
+ break;
+ }
+ }
+ if (!isExistingInstance) {
+ newInstances.add(instanceName);
+ }
}
- instancesToSelect.set(i, selectedInstance);
- // If it's an existing alive instance, or it's for a new replica group, add the new instance to the tail,
- // so that it won't be firstly chosen for the next partition.
- // For newly added instances to fill the existing replica group, the sequence cannot change;
- // otherwise there is no guarantee that same vacant position will be filled with the same new instance.
- // The 'selectedInstance' object can still be null if there is no new instances from the candidate list.
- if (selectedInstance != null && (i < numExistingInstances || existingInstances.isEmpty())) {
- candidateInstances.remove(selectedInstance);
- candidateInstances.add(selectedInstance);
+ // Fill the vacant positions with the new instances. First fill the replica groups with the least instances, then
+ // use round-robin to assign instances to each replica-group so that they get instances with similar picking
+ // priority.
+ int numInstancesToFill = numInstancesPerReplicaGroup * replicaGroupIds.size();
+ for (int replicaGroupId : replicaGroupIds) {
+ numInstancesToFill -= replicaGroupIdToInstancesMap.get(replicaGroupId).size();
+ }
+ for (int i = 0; i < numInstancesToFill; i++) {
+ int leastNumInstances = Integer.MAX_VALUE;
+ int replicaGroupIdWithLeastInstances = -1;
+ for (int replicaGroupId : replicaGroupIds) {
+ int numInstances = replicaGroupIdToInstancesMap.get(replicaGroupId).size();
+ if (numInstances < leastNumInstances) {
+ leastNumInstances = numInstances;
+ replicaGroupIdWithLeastInstances = replicaGroupId;
+ }
+ }
+ replicaGroupIdToInstancesMap.get(replicaGroupIdWithLeastInstances).add(newInstances.get(i));
}
}
- // If there are still some vacant positions in the instance list,
- // try to fill with instances which are either left over or newly added.
- for (int i = 0; i < instancesToSelect.size(); i++) {
- if (instancesToSelect.get(i) == null) {
- if (!existingInstancesStillAlive.isEmpty()) {
- Iterator<String> iterator = existingInstancesStillAlive.iterator();
- String existingInstanceLeftOver = iterator.next();
- instancesToSelect.set(i, existingInstanceLeftOver);
- iterator.remove();
- } else if (!newlyAddedInstances.isEmpty()) {
- // pick a new instance to fill its vacant position.
- String newInstance = newlyAddedInstances.pollFirst();
- instancesToSelect.set(i, newInstance);
+ if (numPartitions == 1) {
+ for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
+ List<String> instancesInReplicaGroup = replicaGroupIdToInstancesMap.get(replicaGroupId);
+ if (replicaGroupId < existingNumReplicaGroups) {
+ List<String> existingInstances = _existingInstancePartitions.getInstances(0, replicaGroupId);
+ LinkedHashSet<String> candidateInstances = new LinkedHashSet<>(instancesInReplicaGroup);
+ List<String> instances =
+ selectInstancesWithMinimumMovement(numInstancesPerReplicaGroup, candidateInstances, existingInstances);
+ LOGGER.info(
+ "Selecting instances: {} for replica-group: {}, partition: 0 for table: {}, existing instances: {}",
+ instances, replicaGroupId, _tableNameWithType, existingInstances);
+ instancePartitions.setInstances(0, replicaGroupId, instances);
+ } else {
+ LOGGER.info("Selecting instances: {} for replica-group: {}, partition: 0 for table: {}, "
+ + "there is no existing instances", instancesInReplicaGroup, replicaGroupId, _tableNameWithType);
+ instancePartitions.setInstances(0, replicaGroupId, instancesInReplicaGroup);
+ }
+ }
+ } else {
+ for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
+ List<String> instancesInReplicaGroup = replicaGroupIdToInstancesMap.get(replicaGroupId);
+ if (replicaGroupId < existingNumReplicaGroups) {
+ int maxNumPartitionsPerInstance = (numInstancesPerReplicaGroup + numPartitions - 1) / numPartitions;
+ Map<String, Integer> instanceToNumPartitionsMap =
+ Maps.newHashMapWithExpectedSize(numInstancesPerReplicaGroup);
+ for (String instance : instancesInReplicaGroup) {
+ instanceToNumPartitionsMap.put(instance, 0);
+ }
+
+ List<List<String>> partitionIdToInstancesMap = new ArrayList<>(numPartitions);
+ List<Set<String>> partitionIdToInstanceSetMap = new ArrayList<>(numPartitions);
+ List<List<String>> partitionIdToExistingInstancesMap = new ArrayList<>(existingNumPartitions);
+ for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+ // Initialize the list with empty positions to fill
+ List<String> instances = new ArrayList<>(numInstancesPerPartition);
+ for (int i = 0; i < numInstancesPerPartition; i++) {
+ instances.add(null);
+ }
+ partitionIdToInstancesMap.add(instances);
+ Set<String> instanceSet = Sets.newHashSetWithExpectedSize(numInstancesPerPartition);
+ partitionIdToInstanceSetMap.add(instanceSet);
+
+ // Keep the existing instances that are still alive
+ if (partitionId < existingNumPartitions) {
+ List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+ partitionIdToExistingInstancesMap.add(existingInstances);
+ int numInstancesToCheck = Math.min(numInstancesPerPartition, existingInstances.size());
+ for (int i = 0; i < numInstancesToCheck; i++) {
+ String existingInstance = existingInstances.get(i);
+ Integer numPartitionsOnInstance = instanceToNumPartitionsMap.get(existingInstance);
+ if (numPartitionsOnInstance != null && numPartitionsOnInstance < maxNumPartitionsPerInstance) {
+ instances.set(i, existingInstance);
+ instanceSet.add(existingInstance);
+ instanceToNumPartitionsMap.put(existingInstance, numPartitionsOnInstance + 1);
+ }
+ }
+ }
+ }
+
+ // Fill the vacant positions with instance that serves the least partitions
+ for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+ List<String> instances = partitionIdToInstancesMap.get(partitionId);
+ Set<String> instanceSet = partitionIdToInstanceSetMap.get(partitionId);
+ int numInstancesToFill = numInstancesPerPartition - instanceSet.size();
+ if (numInstancesToFill > 0) {
+ // Triple stores (instance, numPartitionsOnInstance, instanceIndex) for sorting
+ List<Triple<String, Integer, Integer>> triples = new ArrayList<>(numInstancesPerReplicaGroup);
+ for (int i = 0; i < numInstancesPerReplicaGroup; i++) {
+ String instance = instancesInReplicaGroup.get(i);
+ if (!instanceSet.contains(instance)) {
+ triples.add(Triple.of(instance, instanceToNumPartitionsMap.get(instance), i));
+ }
+ }
+ triples.sort((o1, o2) -> {
+ int result = Integer.compare(o1.getMiddle(), o2.getMiddle());
+ return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight());
+ });
+ int instanceIdToFill = 0;
+ for (int i = 0; i < numInstancesPerPartition; i++) {
+ if (instances.get(i) == null) {
+ String instance = triples.get(instanceIdToFill++).getLeft();
+ instances.set(i, instance);
+ instanceToNumPartitionsMap.put(instance, instanceToNumPartitionsMap.get(instance) + 1);
+ }
+ }
+ }
+
+ if (partitionId < existingNumPartitions) {
+ LOGGER.info(
+ "Selecting instances: {} for replica-group: {}, partition: {} for table: {}, existing instances: {}",
+ instances, replicaGroupId, partitionId, _tableNameWithType,
+ partitionIdToExistingInstancesMap.get(partitionId));
+ } else {
+ LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}, "
+ + "there is no existing instances", instances, replicaGroupId, partitionId, _tableNameWithType);
+ }
+ instancePartitions.setInstances(partitionId, replicaGroupId, instances);
+ }
+ } else {
+ // Assign consecutive instances within a replica-group to each partition
+ int instanceIdInReplicaGroup = 0;
+ for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+ List<String> instances = new ArrayList<>(numInstancesPerPartition);
+ for (int i = 0; i < numInstancesPerPartition; i++) {
+ instances.add(instancesInReplicaGroup.get(instanceIdInReplicaGroup));
+ instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup;
+ }
+ LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}, "
+ + "there is no existing instances", instances, replicaGroupId, partitionId, _tableNameWithType);
+ instancePartitions.setInstances(partitionId, replicaGroupId, instances);
+ }
}
}
}
- return instancesToSelect;
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 940968432b..28d58bbbcd 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -22,18 +22,17 @@ import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeMap;
import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
-import org.apache.pinot.spi.utils.Pairs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,16 +45,14 @@ public class InstanceTagPoolSelector {
private final InstanceTagPoolConfig _tagPoolConfig;
private final String _tableNameWithType;
-
private final boolean _minimizeDataMovement;
-
private final InstancePartitions _existingInstancePartitions;
public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType,
boolean minimizeDataMovement, @Nullable InstancePartitions existingInstancePartitions) {
_tagPoolConfig = tagPoolConfig;
_tableNameWithType = tableNameWithType;
- _minimizeDataMovement = minimizeDataMovement;
+ _minimizeDataMovement = minimizeDataMovement && existingInstancePartitions != null;
_existingInstancePartitions = existingInstancePartitions;
}
@@ -104,7 +101,7 @@ public class InstanceTagPoolSelector {
// Calculate the pools to select based on the selection config
Set<Integer> pools = poolToInstanceConfigsMap.keySet();
List<Integer> poolsToSelect = _tagPoolConfig.getPools();
- if (poolsToSelect != null && !poolsToSelect.isEmpty()) {
+ if (!CollectionUtils.isEmpty(poolsToSelect)) {
Preconditions.checkState(pools.containsAll(poolsToSelect), "Cannot find all instance pools configured: %s",
poolsToSelect);
} else {
@@ -123,45 +120,44 @@ public class InstanceTagPoolSelector {
return poolToInstanceConfigsMap;
}
+ // Select pools based on the table name hash to evenly distribute the tables
+ List<Integer> poolsInCluster = new ArrayList<>(pools);
+ int startIndex = Math.abs(tableNameHash % numPools);
poolsToSelect = new ArrayList<>(numPoolsToSelect);
- if (_minimizeDataMovement && _existingInstancePartitions != null) {
- Map<Integer, Set<String>> existingPoolsToExistingInstancesMap = new TreeMap<>();
- // Keep the same pool if it's already been used for the table.
+ if (_minimizeDataMovement) {
+ assert _existingInstancePartitions != null;
+ Map<Integer, Integer> poolToNumExistingInstancesMap = new TreeMap<>();
int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
- for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
- for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
+ for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
+ for (int replicaGroupId = 0; replicaGroupId < existingNumReplicaGroups; replicaGroupId++) {
List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
for (String existingInstance : existingInstances) {
Integer existingPool = instanceToPoolMap.get(existingInstance);
if (existingPool != null) {
- if (!existingPoolsToExistingInstancesMap.containsKey(existingPool)) {
- existingPoolsToExistingInstancesMap.put(existingPool, new HashSet<>());
- }
- existingPoolsToExistingInstancesMap.computeIfAbsent(existingPool, k -> new HashSet<>())
- .add(existingInstance);
+ poolToNumExistingInstancesMap.merge(existingPool, 1, Integer::sum);
}
}
}
}
-
- // Use a max heap to track the number of servers used for all the pools.
- PriorityQueue<Pairs.IntPair> maxHeap = new PriorityQueue<>(pools.size(), Pairs.intPairComparator(false));
- for (int pool : pools) {
- maxHeap.add(new Pairs.IntPair(existingPoolsToExistingInstancesMap.get(pool).size(), pool));
+ // Sort the pools based on the number of existing instances in the pool in descending order, then use the
+ // table name hash to break even
+ // Triple stores (pool, numExistingInstances, poolIndex) for sorting
+ List<Triple<Integer, Integer, Integer>> triples = new ArrayList<>(numPools);
+ for (int i = 0; i < numPools; i++) {
+ int pool = poolsInCluster.get((startIndex + i) % numPools);
+ triples.add(Triple.of(pool, poolToNumExistingInstancesMap.getOrDefault(pool, 0), i));
}
-
- // Pick the pools from the max heap, so that data movement be minimized.
+ triples.sort((o1, o2) -> {
+ int result = Integer.compare(o2.getMiddle(), o1.getMiddle());
+ return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight());
+ });
for (int i = 0; i < numPoolsToSelect; i++) {
- Pairs.IntPair pair = maxHeap.remove();
- poolsToSelect.add(pair.getRight());
+ poolsToSelect.add(triples.get(i).getLeft());
}
- LOGGER.info("The selected pools: " + poolsToSelect);
} else {
- // Select pools based on the table name hash to evenly distribute the tables
- List<Integer> poolsInCluster = new ArrayList<>(pools);
for (int i = 0; i < numPoolsToSelect; i++) {
- poolsToSelect.add(poolsInCluster.get((tableNameHash + i) % numPools));
+ poolsToSelect.add(poolsInCluster.get((startIndex + i) % numPools));
}
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index a6220c00a2..113d4e1649 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -50,7 +50,9 @@ import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.Test;
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.fail;
public class InstanceAssignmentTest {
@@ -198,8 +200,8 @@ public class InstanceAssignmentTest {
// r0: [i8, i1, i4]
// p0, p0, p1
// p1
- // r1: [i9, i10, i5]
- // p0, p0, p1
+ // r1: [i9, i5, i10]
+ // p0, p1, p0
// p1
// r2: [i0, i3, i11]
// p0, p0, p1
@@ -217,7 +219,7 @@ public class InstanceAssignmentTest {
assertEquals(instancePartitions.getInstances(1, 2),
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 0));
- // Add 2 more instances to the ZK and increase the number of instances per replica group from 2 to 3.
+ // Add 2 more instances to the ZK and increase the number of instances per partition from 2 to 3.
for (int i = numInstances + 2; i < numInstances + 4; i++) {
InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
instanceConfig.addTag(OFFLINE_TAG);
@@ -233,34 +235,29 @@ public class InstanceAssignmentTest {
// Math.abs("myTable_OFFLINE".hashCode()) % 12 = 2
// [i10, i11, i12, i13, i3, i4, i5, i11, i7, i8, i9, i0, i1]
- // For r0, the candidate instances are [i12, i13, i4, i7, i8, i1].
- // For p0, since the existing assignment is [i8, i1], the next available instance from the candidates is i12.
- // For p1, the existing assignment is [i4, i8], the next available instance is also i12.
- // r0: [i12, i4, i8, i1]
- // For r1, the candidate instances become [i10, i13, i5, i7, i9].
- // For p0, since the existing assignment is [i9, i10], the next available instance is i13 (new instance).
- // For p1, the existing assignment is [i5, i9], the next available one from the candidates is i10, but since
- // i10 is already used in the former partition, it got added to the tail, so the next available one is i13.
- // r1: [i10, i13, i5, i9]
- // For r2, the candidate instances become [i11, i3, i7, i0].
- // For p0, the existing assignment is [i0, i3], the next available instance from the candidates is i11.
- // For p1, the existing assignment is [i11, i0], the next available instance from the candidates is i3, but
- // since i3 is already used in the former partition, it got appended to the tail, so the next available one is i7.
- // r2: [i11, i3, i7, i0]
+ // r0: [i8, i1, i4, i12]
+ // p0, p0, p1, p0
+ // p1, p1
+ // r1: [i9, i5, i10, i13]
+ // p0, p1, p0, p0
+ // p1, p1
+ // r2: [i0, i3, i11, i7]
+ // p0, p0, p1, p0
+ // p1, p1
assertEquals(instancePartitions.getInstances(0, 0),
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 12));
assertEquals(instancePartitions.getInstances(1, 0),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 12));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 1));
assertEquals(instancePartitions.getInstances(0, 1),
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13));
assertEquals(instancePartitions.getInstances(1, 1),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 13));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 10));
assertEquals(instancePartitions.getInstances(0, 2),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 11));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 7));
assertEquals(instancePartitions.getInstances(1, 2),
- Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 7));
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
- // Reduce the number of instances per replica group from 3 to 2.
+ // Reduce the number of instances per partition from 3 to 2.
numInstancesPerPartition = 2;
tableConfig.getValidationConfig()
.setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerPartition));
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
index fdb6292f26..288b789aee 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelectorTest.java
@@ -31,36 +31,66 @@ import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+
public class InstanceReplicaGroupPartitionSelectorTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ //@formatter:off
private static final String INSTANCE_CONFIG_TEMPLATE =
- "{\n" + " \"id\": \"Server_pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
- + " \"simpleFields\": {\n" + " \"HELIX_ENABLED\": \"true\",\n"
- + " \"HELIX_ENABLED_TIMESTAMP\": \"1688959934305\",\n"
- + " \"HELIX_HOST\": \"pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local\",\n"
- + " \"HELIX_PORT\": \"8098\",\n" + " \"adminPort\": \"8097\",\n" + " \"grpcPort\": \"8090\",\n"
- + " \"queryMailboxPort\": \"46347\",\n" + " \"queryServerPort\": \"45031\",\n"
- + " \"shutdownInProgress\": \"false\"\n" + " },\n" + " \"mapFields\": {\n"
- + " \"SYSTEM_RESOURCE_INFO\": {\n" + " \"numCores\": \"16\",\n"
- + " \"totalMemoryMB\": \"126976\",\n" + " \"maxHeapSizeMB\": \"65536\"\n" + " },\n"
- + " \"pool\": {\n" + " \"DefaultTenant_OFFLINE\": \"${pool}\",\n"
- + " \"${poolName}\": \"${pool}\",\n" + " \"AllReplicationGroups\": \"1\"\n" + " }\n" + " },\n"
- + " \"listFields\": {\n" + " \"TAG_LIST\": [\n" + " \"DefaultTenant_OFFLINE\",\n"
- + " \"DefaultTenant_REALTIME\",\n" + " \"${poolName}\",\n" + " \"AllReplicationGroups\"\n"
- + " ]\n" + " }\n" + "}";
+ "{\n"
+ + " \"id\": \"Server_pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+ + " \"simpleFields\": {\n"
+ + " \"HELIX_ENABLED\": \"true\",\n"
+ + " \"HELIX_ENABLED_TIMESTAMP\": \"1688959934305\",\n"
+ + " \"HELIX_HOST\": \"pinot-server-${serverName}.pinot-server-headless.pinot.svc.cluster.local\",\n"
+ + " \"HELIX_PORT\": \"8098\",\n"
+ + " \"adminPort\": \"8097\",\n"
+ + " \"grpcPort\": \"8090\",\n"
+ + " \"queryMailboxPort\": \"46347\",\n"
+ + " \"queryServerPort\": \"45031\",\n"
+ + " \"shutdownInProgress\": \"false\"\n"
+ + " },\n"
+ + " \"mapFields\": {\n"
+ + " \"SYSTEM_RESOURCE_INFO\": {\n"
+ + " \"numCores\": \"16\",\n"
+ + " \"totalMemoryMB\": \"126976\",\n"
+ + " \"maxHeapSizeMB\": \"65536\"\n"
+ + " },\n"
+ + " \"pool\": {\n"
+ + " \"DefaultTenant_OFFLINE\": \"${pool}\",\n"
+ + " \"${poolName}\": \"${pool}\",\n"
+ + " \"AllReplicationGroups\": \"1\"\n"
+ + " }\n"
+ + " },\n"
+ + " \"listFields\": {\n"
+ + " \"TAG_LIST\": [\n"
+ + " \"DefaultTenant_OFFLINE\",\n"
+ + " \"DefaultTenant_REALTIME\",\n"
+ + " \"${poolName}\",\n"
+ + " \"AllReplicationGroups\"\n"
+ + " ]\n"
+ + " }\n"
+ + "}";
+ //@formatter:on
@Test
public void testPoolsWhenOneMorePoolAddedAndOneMoreReplicaGroupsNeeded()
throws JsonProcessingException {
+ //@formatter:off
String existingPartitionsJson =
- " {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
- + " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n"
- + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
- + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
- + " ]\n" + " }\n" + " }\n";
+ "{\n"
+ + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+ + " \"partitionToInstancesMap\": {\n"
+ + " \"0_0\": [\n"
+ + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+ + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+ + " ]\n"
+ + " }\n"
+ + "}";
+ //@formatter:on
InstancePartitions existing = OBJECT_MAPPER.readValue(existingPartitionsJson, InstancePartitions.class);
InstanceReplicaGroupPartitionConfig config =
new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null);
@@ -94,33 +124,47 @@ public class InstanceReplicaGroupPartitionSelectorTest {
// Now that 1 more pool is added and 1 more RG is needed, a new set called "0_1" is generated,
// and the instances from Pool 1 are assigned to this new replica.
+ //@formatter:off
String expectedInstancePartitions =
- " {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
- + " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n"
- + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
- + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
- + " ],\n" + " \"0_1\": [\n"
- + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
- + " \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
- + " ]\n" + " }\n" + " }\n";
+ "{\n"
+ + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+ + " \"partitionToInstancesMap\": {\n"
+ + " \"0_0\": [\n"
+ + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+ + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+ + " ],\n"
+ + " \"0_1\": [\n"
+ + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+ + " \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+ + " ]\n"
+ + " }\n"
+ + "}";
+ //@formatter:on
InstancePartitions expectedPartitions =
OBJECT_MAPPER.readValue(expectedInstancePartitions, InstancePartitions.class);
- assert assignedPartitions.equals(expectedPartitions);
+ assertEquals(assignedPartitions, expectedPartitions);
}
@Test
public void testSelectPoolsWhenExistingReplicaGroupMapsToMultiplePools()
throws JsonProcessingException {
// The "rg0-2" instance used to belong to Pool 1, but now it belongs to Pool 0.
+ //@formatter:off
String existingPartitionsJson =
- " {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
- + " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n"
- + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
- + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
- + " ],\n" + " \"0_1\": [\n"
- + " \"Server_pinot-server-rg0-2.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
- + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
- + " ]\n" + " }\n" + " }\n";
+ "{\n"
+ + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+ + " \"partitionToInstancesMap\": {\n"
+ + " \"0_0\": [\n"
+ + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+ + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+ + " ],\n"
+ + " \"0_1\": [\n"
+ + " \"Server_pinot-server-rg0-2.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+ + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+ + " ]\n"
+ + " }\n"
+ + "}";
+ //@formatter:on
InstancePartitions existing = OBJECT_MAPPER.readValue(existingPartitionsJson, InstancePartitions.class);
InstanceReplicaGroupPartitionConfig config =
new InstanceReplicaGroupPartitionConfig(true, 0, 2, 2, 1, 2, true, null);
@@ -150,17 +194,24 @@ public class InstanceReplicaGroupPartitionSelectorTest {
// The "rg0-2" instance is replaced by "rg1-0" (which belongs to Pool 1), as "rg0-2" no longer belongs to Pool 1.
// And "rg1-0" remains the same position as it's always under Pool 1.
+ //@formatter:off
String expectedInstancePartitions =
- " {\n" + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
- + " \"partitionToInstancesMap\": {\n" + " \"0_0\": [\n"
- + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
- + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
- + " ],\n" + " \"0_1\": [\n"
- + " \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
- + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
- + " ]\n" + " }\n" + " }\n";
+ "{\n"
+ + " \"instancePartitionsName\": \"0f97dac8-4123-47c6-9a4d-b8ce039c5ea5_OFFLINE\",\n"
+ + " \"partitionToInstancesMap\": {\n"
+ + " \"0_0\": [\n"
+ + " \"Server_pinot-server-rg0-0.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+ + " \"Server_pinot-server-rg0-1.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+ + " ],\n"
+ + " \"0_1\": [\n"
+ + " \"Server_pinot-server-rg1-1.pinot-server-headless.pinot.svc.cluster.local_8098\",\n"
+ + " \"Server_pinot-server-rg1-0.pinot-server-headless.pinot.svc.cluster.local_8098\"\n"
+ + " ]\n"
+ + " }\n"
+ + "}";
+ //@formatter:on
InstancePartitions expectedPartitions =
OBJECT_MAPPER.readValue(expectedInstancePartitions, InstancePartitions.class);
- assert assignedPartitions.equals(expectedPartitions);
+ assertEquals(assignedPartitions, expectedPartitions);
}
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java
index 45645387af..be18d35e50 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/Pairs.java
@@ -30,11 +30,7 @@ public class Pairs {
}
public static Comparator<IntPair> intPairComparator() {
- return new AscendingIntPairComparator(true);
- }
-
- public static Comparator<IntPair> intPairComparator(boolean ascending) {
- return new AscendingIntPairComparator(ascending);
+ return new AscendingIntPairComparator();
}
public static class IntPair {
@@ -83,26 +79,13 @@ public class Pairs {
}
public static class AscendingIntPairComparator implements Comparator<IntPair> {
- private boolean _ascending;
-
- public AscendingIntPairComparator(boolean ascending) {
- _ascending = ascending;
- }
@Override
public int compare(IntPair pair1, IntPair pair2) {
if (pair1._left != pair2._left) {
- if (_ascending) {
- return Integer.compare(pair1._left, pair2._left);
- } else {
- return Integer.compare(pair2._left, pair1._left);
- }
+ return Integer.compare(pair1._left, pair2._left);
} else {
- if (_ascending) {
- return Integer.compare(pair1._right, pair2._right);
- } else {
- return Integer.compare(pair2._right, pair1._right);
- }
+ return Integer.compare(pair1._right, pair2._right);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org