You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2022/03/30 17:54:48 UTC
[pinot] 01/01: Add retainInstancesSequence feature to table rebalance to minimize data movement between instances
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch retain-instance-sequence
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 564cb9a9669b7d7390a003eb3dcc0b848689b3ec
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Wed Mar 30 10:54:14 2022 -0700
Add retainInstancesSequence feature to table rebalance to minimize data movement between instances
---
.../common/assignment/InstancePartitions.java | 77 ++++-
.../PinotInstanceAssignmentRestletResource.java | 15 +-
.../helix/core/PinotHelixResourceManager.java | 2 +-
.../instance/InstanceAssignmentDriver.java | 43 ++-
.../instance/InstanceTagPoolSelector.java | 125 ++++++--
.../core/rebalance/RebalanceConfigConstants.java | 4 +
.../helix/core/rebalance/TableRebalancer.java | 30 +-
.../instance/InstanceAssignmentTest.java | 318 +++++++++++++++++++--
8 files changed, 542 insertions(+), 72 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
index c511077..811de20 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -58,14 +60,18 @@ import org.apache.pinot.spi.utils.JsonUtils;
@JsonIgnoreProperties(ignoreUnknown = true)
public class InstancePartitions {
private static final char PARTITION_REPLICA_GROUP_SEPARATOR = '_';
+ private static final String PARTITIONS_KEY = "partitions";
+ private static final String INSTANCE_SEPARATOR = "/";
private final String _instancePartitionsName;
- private final Map<String, List<String>> _partitionToInstancesMap;
+ private final Map<String, List<String>> _partitionWithReplicaGroupToInstancesMap;
+ private final Map<Integer, List<String>> _partitionToInstancesMap;
private int _numPartitions;
private int _numReplicaGroups;
public InstancePartitions(String instancePartitionsName) {
_instancePartitionsName = instancePartitionsName;
+ _partitionWithReplicaGroupToInstancesMap = new TreeMap<>();
_partitionToInstancesMap = new TreeMap<>();
}
@@ -73,10 +79,18 @@ public class InstancePartitions {
private InstancePartitions(
@JsonProperty(value = "instancePartitionsName", required = true) String instancePartitionsName,
@JsonProperty(value = "partitionToInstancesMap", required = true)
- Map<String, List<String>> partitionToInstancesMap) {
+ Map<String, List<String>> partitionWithReplicaGroupToInstancesMap,
+ @JsonProperty(value = "partitionToInstancesMap", required = true)
+ Map<String, String> partitionsMap) {
_instancePartitionsName = instancePartitionsName;
- _partitionToInstancesMap = partitionToInstancesMap;
- for (String key : partitionToInstancesMap.keySet()) {
+ _partitionWithReplicaGroupToInstancesMap = partitionWithReplicaGroupToInstancesMap;
+ _partitionToInstancesMap = new TreeMap<>();
+ if (partitionsMap != null) {
+ for (Map.Entry<String, String> entry : partitionsMap.entrySet()) {
+ _partitionToInstancesMap.put(Integer.parseInt(entry.getKey()), extractInstances(entry.getValue()));
+ }
+ }
+ for (String key : partitionWithReplicaGroupToInstancesMap.keySet()) {
int separatorIndex = key.indexOf(PARTITION_REPLICA_GROUP_SEPARATOR);
int partitionId = Integer.parseInt(key.substring(0, separatorIndex));
int replicaGroupId = Integer.parseInt(key.substring(separatorIndex + 1));
@@ -91,8 +105,8 @@ public class InstancePartitions {
}
@JsonProperty
- public Map<String, List<String>> getPartitionToInstancesMap() {
- return _partitionToInstancesMap;
+ public Map<String, List<String>> getPartitionWithReplicaGroupToInstancesMap() {
+ return _partitionWithReplicaGroupToInstancesMap;
}
@JsonIgnore
@@ -105,25 +119,68 @@ public class InstancePartitions {
return _numReplicaGroups;
}
+ @JsonIgnore
+ public Map<Integer, List<String>> getPartitionToInstancesMap() {
+ return _partitionToInstancesMap;
+ }
+
public List<String> getInstances(int partitionId, int replicaGroupId) {
- return _partitionToInstancesMap
+ return _partitionWithReplicaGroupToInstancesMap
.get(Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR + replicaGroupId);
}
public void setInstances(int partitionId, int replicaGroupId, List<String> instances) {
String key = Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR + replicaGroupId;
- _partitionToInstancesMap.put(key, instances);
+ _partitionWithReplicaGroupToInstancesMap.put(key, instances);
_numPartitions = Integer.max(_numPartitions, partitionId + 1);
_numReplicaGroups = Integer.max(_numReplicaGroups, replicaGroupId + 1);
}
+ public void setPartitionToInstancesMap(Map<Integer, List<String>> partitionToInstancesMap) {
+ _partitionToInstancesMap.putAll(partitionToInstancesMap);
+ }
+
public static InstancePartitions fromZNRecord(ZNRecord znRecord) {
- return new InstancePartitions(znRecord.getId(), znRecord.getListFields());
+ return new InstancePartitions(znRecord.getId(), znRecord.getListFields(), znRecord.getMapField(PARTITIONS_KEY));
+ }
+
+ private static List<String> extractInstances(String instancesRawString) {
+ if (instancesRawString == null || instancesRawString.length() == 0) {
+ return Collections.emptyList();
+ }
+ String[] instancesArray = instancesRawString.split(INSTANCE_SEPARATOR);
+ List<String> instances = new ArrayList<>(instancesArray.length);
+ Collections.addAll(instances, instancesArray);
+ return instances;
+ }
+
+ private String convertInstancesToString(List<String> instances) {
+ if (instances == null || instances.isEmpty()) {
+ return "";
+ }
+ StringBuilder stringBuilder = new StringBuilder();
+ for (String instance : instances) {
+ if (stringBuilder.length() == 0) {
+ stringBuilder.append(instance);
+ } else {
+ stringBuilder.append(INSTANCE_SEPARATOR).append(instance);
+ }
+ }
+ return stringBuilder.toString();
+ }
+
+ private Map<String, String> convertListToStringMap() {
+ Map<String, String> convertedMap = new TreeMap<>();
+ for (Map.Entry<Integer, List<String>> entry : _partitionToInstancesMap.entrySet()) {
+ convertedMap.put(Integer.toString(entry.getKey()), convertInstancesToString(entry.getValue()));
+ }
+ return convertedMap;
}
public ZNRecord toZNRecord() {
ZNRecord znRecord = new ZNRecord(_instancePartitionsName);
- znRecord.setListFields(_partitionToInstancesMap);
+ znRecord.setListFields(_partitionWithReplicaGroupToInstancesMap);
+ znRecord.setMapField(PARTITIONS_KEY, convertListToStringMap());
return znRecord;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
index 8d9e6cc..58e677d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
@@ -137,7 +137,7 @@ public class PinotInstanceAssignmentRestletResource {
if (InstanceAssignmentConfigUtils
.allowInstanceAssignment(offlineTableConfig, InstancePartitionsType.OFFLINE)) {
instancePartitionsMap.put(InstancePartitionsType.OFFLINE, new InstanceAssignmentDriver(offlineTableConfig)
- .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs));
+ .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap()));
}
} catch (IllegalStateException e) {
throw new ControllerApplicationException(LOGGER, "Caught IllegalStateException", Response.Status.BAD_REQUEST,
@@ -156,15 +156,15 @@ public class PinotInstanceAssignmentRestletResource {
if (instancePartitionsType == InstancePartitionsType.CONSUMING || instancePartitionsType == null) {
if (InstanceAssignmentConfigUtils
.allowInstanceAssignment(realtimeTableConfig, InstancePartitionsType.CONSUMING)) {
- instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
- instanceAssignmentDriver.assignInstances(InstancePartitionsType.CONSUMING, instanceConfigs));
+ instancePartitionsMap.put(InstancePartitionsType.CONSUMING, instanceAssignmentDriver
+ .assignInstances(InstancePartitionsType.CONSUMING, instanceConfigs, Collections.emptyMap()));
}
}
if (instancePartitionsType == InstancePartitionsType.COMPLETED || instancePartitionsType == null) {
if (InstanceAssignmentConfigUtils
.allowInstanceAssignment(realtimeTableConfig, InstancePartitionsType.COMPLETED)) {
- instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
- instanceAssignmentDriver.assignInstances(InstancePartitionsType.COMPLETED, instanceConfigs));
+ instancePartitionsMap.put(InstancePartitionsType.COMPLETED, instanceAssignmentDriver
+ .assignInstances(InstancePartitionsType.COMPLETED, instanceConfigs, Collections.emptyMap()));
}
}
} catch (IllegalStateException e) {
@@ -295,8 +295,9 @@ public class PinotInstanceAssignmentRestletResource {
while (iterator.hasNext()) {
InstancePartitions instancePartitions = iterator.next();
boolean oldInstanceFound = false;
- Map<String, List<String>> partitionToInstancesMap = instancePartitions.getPartitionToInstancesMap();
- for (List<String> instances : partitionToInstancesMap.values()) {
+ Map<String, List<String>> partitionWithReplicaGroupToInstancesMap =
+ instancePartitions.getPartitionWithReplicaGroupToInstancesMap();
+ for (List<String> instances : partitionWithReplicaGroupToInstancesMap.values()) {
oldInstanceFound |= Collections.replaceAll(instances, oldInstanceId, newInstanceId);
}
if (oldInstanceFound) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ecbd3b7..754146a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1648,7 +1648,7 @@ public class PinotHelixResourceManager {
List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
for (InstancePartitionsType instancePartitionsType : instancePartitionsTypesToAssign) {
InstancePartitions instancePartitions =
- instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs);
+ instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, Collections.emptyMap());
LOGGER.info("Persisting instance partitions: {}", instancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index 1440fae..4f60d2c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
@@ -51,16 +52,31 @@ public class InstanceAssignmentDriver {
_tableConfig = tableConfig;
}
+ /**
+ * Assign instances to InstancePartitions object.
+ * @param instancePartitionsType type of instance partitions
+ * @param instanceConfigs list of instance configs
+ * @param partitionToInstancesMap existing instance with sequence that should be respected. An empty list
+ * means no preceding sequence to respect and the instances would be sorted.
+ */
public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType,
- List<InstanceConfig> instanceConfigs) {
+ List<InstanceConfig> instanceConfigs, Map<Integer, List<String>> partitionToInstancesMap) {
+ boolean shouldRetainInstanceSequence = !partitionToInstancesMap.isEmpty();
String tableNameWithType = _tableConfig.getTableName();
- LOGGER.info("Starting {} instance assignment for table: {}", instancePartitionsType, tableNameWithType);
+ LOGGER.info("Starting {} instance assignment for table: {}. Should retain instance sequence: {}",
+ instancePartitionsType, tableNameWithType, shouldRetainInstanceSequence);
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType);
InstanceTagPoolSelector tagPoolSelector =
new InstanceTagPoolSelector(assignmentConfig.getTagPoolConfig(), tableNameWithType);
- Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = tagPoolSelector.selectInstances(instanceConfigs);
+ Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap =
+ tagPoolSelector.selectInstances(instanceConfigs, partitionToInstancesMap);
+
+ InstancePartitions instancePartitions = new InstancePartitions(
+ instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)));
+ instancePartitions
+ .setPartitionToInstancesMap(extractInstanceNamesFromPoolToInstanceConfigsMap(poolToInstanceConfigsMap));
InstanceConstraintConfig constraintConfig = assignmentConfig.getConstraintConfig();
List<InstanceConstraintApplier> constraintAppliers = new ArrayList<>();
@@ -75,9 +91,26 @@ public class InstanceAssignmentDriver {
InstanceReplicaGroupPartitionSelector replicaPartitionSelector =
new InstanceReplicaGroupPartitionSelector(assignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType);
- InstancePartitions instancePartitions = new InstancePartitions(
- instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)));
replicaPartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions);
return instancePartitions;
}
+
+ private Map<Integer, List<String>> extractInstanceNamesFromPoolToInstanceConfigsMap(
+ Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap) {
+ Map<Integer, List<String>> partitionToInstancesMap = new TreeMap<>();
+ for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+ Integer pool = entry.getKey();
+ List<InstanceConfig> instanceConfigs = entry.getValue();
+ partitionToInstancesMap.put(pool, extractInstanceNamesFromInstanceConfigs(instanceConfigs));
+ }
+ return partitionToInstancesMap;
+ }
+
+ private List<String> extractInstanceNamesFromInstanceConfigs(List<InstanceConfig> instanceConfigs) {
+ List<String> instanceNames = new ArrayList<>(instanceConfigs.size());
+ for (InstanceConfig instanceConfig : instanceConfigs) {
+ instanceNames.add(instanceConfig.getInstanceName());
+ }
+ return instanceNames;
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 5aefd1a..b56c7ed 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -20,7 +20,10 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -48,52 +51,105 @@ public class InstanceTagPoolSelector {
/**
* Returns a map from pool to instance configs based on the tag and pool config for the given instance configs.
+ * @param instanceConfigs list of latest instance configs from ZK.
+ * @param partitionToInstancesMap existing instance with sequence that should be respected. An empty list
+ * means no preceding sequence to respect and the instances would be sorted.
*/
- public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> instanceConfigs) {
+ public Map<Integer, List<InstanceConfig>> selectInstances(List<InstanceConfig> instanceConfigs,
+ Map<Integer, List<String>> partitionToInstancesMap) {
int tableNameHash = Math.abs(_tableNameWithType.hashCode());
LOGGER.info("Starting instance tag/pool selection for table: {} with hash: {}", _tableNameWithType, tableNameHash);
- // Filter out the instances with the correct tag
+ // Filter out the instances with the correct tag.
String tag = _tagPoolConfig.getTag();
- List<InstanceConfig> candidateInstanceConfigs = new ArrayList<>();
+ Map<String, InstanceConfig> candidateInstanceConfigsMap = new LinkedHashMap<>();
for (InstanceConfig instanceConfig : instanceConfigs) {
if (instanceConfig.getTags().contains(tag)) {
- candidateInstanceConfigs.add(instanceConfig);
+ candidateInstanceConfigsMap.put(instanceConfig.getInstanceName(), instanceConfig);
}
}
- candidateInstanceConfigs.sort(Comparator.comparing(InstanceConfig::getInstanceName));
- int numCandidateInstances = candidateInstanceConfigs.size();
+
+ // Find out newly added instances from the latest copies of instance configs.
+ Deque<String> newlyAddedInstances = new LinkedList<>(candidateInstanceConfigsMap.keySet());
+ for (List<String> existingInstancesWithSequence : partitionToInstancesMap.values()) {
+ newlyAddedInstances.removeAll(existingInstancesWithSequence);
+ }
+
+ int numCandidateInstances = candidateInstanceConfigsMap.size();
Preconditions.checkState(numCandidateInstances > 0, "No enabled instance has the tag: %s", tag);
LOGGER.info("{} enabled instances have the tag: {} for table: {}", numCandidateInstances, tag, _tableNameWithType);
- Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = new TreeMap<>();
+ // Each pool number associates with a map that key is the instance name and value is the instance config.
+ Map<Integer, Map<String, InstanceConfig>> poolToInstanceConfigsMap = new HashMap<>();
+ // Each pool number associates with a list of newly added instance configs,
+ // so that new instances can be fetched from this list.
+ Map<Integer, Deque<InstanceConfig>> poolToNewInstanceConfigsMap = new HashMap<>();
+
+ // Extract the pool information from the instance configs.
+ for (Map.Entry<String,InstanceConfig> entry : candidateInstanceConfigsMap.entrySet()) {
+ String instanceName = entry.getKey();
+ InstanceConfig instanceConfig = entry.getValue();
+ Map<String, String> poolMap = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY);
+ if (poolMap != null && poolMap.containsKey(tag)) {
+ int pool = Integer.parseInt(poolMap.get(tag));
+ poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new TreeMap<>()).put(instanceName, instanceConfig);
+ if (newlyAddedInstances.contains(instanceName)) {
+ poolToNewInstanceConfigsMap.computeIfAbsent(pool, k -> new LinkedList<>()).add(instanceConfig);
+ }
+ }
+ }
+
+ Map<Integer, List<InstanceConfig>> poolToLatestInstanceConfigsMap = new TreeMap<>();
if (_tagPoolConfig.isPoolBased()) {
// Pool based selection
- // Extract the pool information from the instance configs
- for (InstanceConfig instanceConfig : candidateInstanceConfigs) {
- Map<String, String> poolMap = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY);
- if (poolMap != null && poolMap.containsKey(tag)) {
- int pool = Integer.parseInt(poolMap.get(tag));
- poolToInstanceConfigsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(instanceConfig);
+ for (Map.Entry<Integer, List<String>> entry : partitionToInstancesMap.entrySet()) {
+ Integer pool = entry.getKey();
+ List<String> existingInstanceAssignmentInPool = entry.getValue();
+ List<InstanceConfig> candidateInstanceConfigsWithSequence = new ArrayList<>();
+ for (String existingInstance: existingInstanceAssignmentInPool) {
+ InstanceConfig instanceConfig = poolToInstanceConfigsMap.get(pool).get(existingInstance);
+ // Add instances to the candidate list and respect the sequence of the existing instances from the ZK.
+ // The missing/removed instances will be replaced by the newly instances.
+ // If the instance still exists from ZK, then add it to the candidate list.
+ // E.g. if the old instances are: [I1, I2, I3, I4] and the new instance are: [I1, I3, I4, I5, I6],
+ // the removed instance is I2 and the newly added instances are I5 and I6.
+ // The position of I2 would be replaced by I5, the new remaining I6 would be appended to the tail.
+ // Thus, the new order would be [I1, I5, I3, I4, I6].
+ if (instanceConfig != null) {
+ candidateInstanceConfigsWithSequence.add(instanceConfig);
+ } else {
+ // The current chosen instance no longer lives in the cluster any more, thus pick a new instance.
+ candidateInstanceConfigsWithSequence.add(poolToNewInstanceConfigsMap.get(pool).pollFirst());
+ }
}
+ poolToLatestInstanceConfigsMap.put(pool, candidateInstanceConfigsWithSequence);
}
+
+ // The preceding list of instances has been traversed. Add the remaining new instances.
+ for (Map.Entry<Integer, Deque<InstanceConfig>> entry : poolToNewInstanceConfigsMap.entrySet()) {
+ Integer pool = entry.getKey();
+ Deque<InstanceConfig> remainingNewInstanceConfigs = entry.getValue();
+ poolToLatestInstanceConfigsMap.computeIfAbsent(pool, k -> new ArrayList<>())
+ .addAll(remainingNewInstanceConfigs);
+ }
+
Preconditions.checkState(!poolToInstanceConfigsMap.isEmpty(),
"No enabled instance has the pool configured for the tag: %s", tag);
Map<Integer, Integer> poolToNumInstancesMap = new TreeMap<>();
- for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
+ for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToLatestInstanceConfigsMap.entrySet()) {
poolToNumInstancesMap.put(entry.getKey(), entry.getValue().size());
}
LOGGER.info("Number instances for each pool: {} for table: {}", poolToNumInstancesMap, _tableNameWithType);
// Calculate the pools to select based on the selection config
- Set<Integer> pools = poolToInstanceConfigsMap.keySet();
+ Set<Integer> pools = poolToLatestInstanceConfigsMap.keySet();
List<Integer> poolsToSelect = _tagPoolConfig.getPools();
if (poolsToSelect != null && !poolsToSelect.isEmpty()) {
Preconditions.checkState(pools.containsAll(poolsToSelect), "Cannot find all instance pools configured: %s",
poolsToSelect);
} else {
- int numPools = poolToInstanceConfigsMap.size();
+ int numPools = poolToLatestInstanceConfigsMap.size();
int numPoolsToSelect = _tagPoolConfig.getNumPools();
if (numPoolsToSelect > 0) {
Preconditions
@@ -106,7 +162,7 @@ public class InstanceTagPoolSelector {
// Directly return the map if all the pools are selected
if (numPools == numPoolsToSelect) {
LOGGER.info("Selecting all {} pools: {} for table: {}", numPools, pools, _tableNameWithType);
- return poolToInstanceConfigsMap;
+ return poolToLatestInstanceConfigsMap;
}
// Select pools based on the table name hash to evenly distribute the tables
@@ -123,10 +179,37 @@ public class InstanceTagPoolSelector {
} else {
// Non-pool based selection
- LOGGER.info("Selecting {} instances for table: {}", numCandidateInstances, _tableNameWithType);
+ LOGGER.info("Selecting {} instances for table: {}", candidateInstanceConfigsMap.size(), _tableNameWithType);
// Put all instance configs as pool 0
- poolToInstanceConfigsMap.put(0, candidateInstanceConfigs);
+
+ for (Map.Entry<Integer, List<String>> entry : partitionToInstancesMap.entrySet()) {
+ Integer pool = entry.getKey();
+ List<String> existingInstanceAssignmentInPool = entry.getValue();
+ List<InstanceConfig> candidateInstanceConfigsWithSequence = new ArrayList<>();
+ for (String existingInstance: existingInstanceAssignmentInPool) {
+ InstanceConfig instanceConfig = candidateInstanceConfigsMap.get(existingInstance);
+ // Add instances to the candidate list and respect the sequence of the existing instances from the ZK.
+ // The missing/removed instances will be replaced by the newly instances.
+ // If the instance still exists from ZK, then add it to the candidate list.
+ // E.g. if the old instances are: [I1, I2, I3, I4] and the new instance are: [I1, I3, I4, I5, I6],
+ // the removed instance is I2 and the newly added instances are I5 and I6.
+ // The position of I2 would be replaced by I5, the new remaining I6 would be appended to the tail.
+ // Thus, the new order would be [I1, I5, I3, I4, I6].
+ if (instanceConfig != null) {
+ candidateInstanceConfigsWithSequence.add(instanceConfig);
+ } else {
+ // The current chosen instance no longer lives in the cluster any more, thus pick a new instance.
+ candidateInstanceConfigsWithSequence.add(candidateInstanceConfigsMap.get(newlyAddedInstances.pollFirst()));
+ }
+ }
+ poolToLatestInstanceConfigsMap.put(pool, candidateInstanceConfigsWithSequence);
+ }
+ // The preceding list of instances has been traversed. Add the remaining new instances.
+ for (String remainingNewInstance : newlyAddedInstances) {
+ poolToLatestInstanceConfigsMap.computeIfAbsent(0, k -> new ArrayList<>())
+ .add(candidateInstanceConfigsMap.get(remainingNewInstance));
+ }
}
- return poolToInstanceConfigsMap;
+ return poolToLatestInstanceConfigsMap;
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java
index 70bebab..d08fbbf 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfigConstants.java
@@ -33,6 +33,10 @@ public class RebalanceConfigConstants {
public static final String REASSIGN_INSTANCES = "reassignInstances";
public static final boolean DEFAULT_REASSIGN_INSTANCES = false;
+ // Whether to retain the sequence for the existing instances
+ public static final String RETAIN_INSTANCE_SEQUENCE = "retainInstancesSequence";
+ public static final boolean DEFAULT_RETAIN_INSTANCE_SEQUENCE = false;
+
// Whether to reassign CONSUMING segments
public static final String INCLUDE_CONSUMING = "includeConsuming";
public static final boolean DEFAULT_INCLUDE_CONSUMING = false;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index d6701c8..005a1ef 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.rebalance;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -145,6 +146,8 @@ public class TableRebalancer {
tableConfig.getRoutingConfig().getInstanceSelectorType());
boolean bestEfforts = rebalanceConfig.getBoolean(RebalanceConfigConstants.BEST_EFFORTS,
RebalanceConfigConstants.DEFAULT_BEST_EFFORTS);
+ boolean retainInstanceSequence = rebalanceConfig.getBoolean(RebalanceConfigConstants.RETAIN_INSTANCE_SEQUENCE,
+ RebalanceConfigConstants.DEFAULT_RETAIN_INSTANCE_SEQUENCE);
LOGGER.info(
"Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, "
+ "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, bestEfforts: {}",
@@ -194,7 +197,7 @@ public class TableRebalancer {
// Calculate instance partitions map
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap;
try {
- instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, dryRun);
+ instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, dryRun, retainInstanceSequence);
} catch (Exception e) {
LOGGER.warn(
"Caught exception while fetching/calculating instance partitions for table: {}, aborting the rebalance",
@@ -323,7 +326,7 @@ public class TableRebalancer {
currentIdealState = idealState;
currentAssignment = currentIdealState.getRecord().getMapFields();
// Re-calculate the instance partitions in case the instance configs changed during the rebalance
- instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, false);
+ instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, false, retainInstanceSequence);
tierToInstancePartitionsMap = getTierToInstancePartitionsMap(tableNameWithType, sortedTiers);
targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers,
tierToInstancePartitionsMap, rebalanceConfig);
@@ -381,21 +384,24 @@ public class TableRebalancer {
}
private Map<InstancePartitionsType, InstancePartitions> getInstancePartitionsMap(TableConfig tableConfig,
- boolean reassignInstances, boolean dryRun) {
+ boolean reassignInstances, boolean dryRun, boolean retainInstanceSequence) {
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>();
if (tableConfig.getTableType() == TableType.OFFLINE) {
instancePartitionsMap.put(InstancePartitionsType.OFFLINE,
- getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, dryRun));
+ getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, dryRun,
+ retainInstanceSequence));
} else {
instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
- getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, dryRun));
+ getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, dryRun,
+ retainInstanceSequence));
String tableNameWithType = tableConfig.getTableName();
if (InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) {
LOGGER.info(
"COMPLETED segments should be relocated, fetching/computing COMPLETED instance partitions for table: {}",
tableNameWithType);
instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
- getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, dryRun));
+ getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, dryRun,
+ retainInstanceSequence));
} else {
LOGGER.info(
"COMPLETED segments should not be relocated, skipping fetching/computing COMPLETED instance partitions "
@@ -413,14 +419,22 @@ public class TableRebalancer {
}
private InstancePartitions getInstancePartitions(TableConfig tableConfig,
- InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean dryRun) {
+ InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean dryRun,
+ boolean retainInstanceSequence) {
String tableNameWithType = tableConfig.getTableName();
if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, instancePartitionsType)) {
if (reassignInstances) {
+ Map<Integer, List<String>> partitionsToInstancesMap = Collections.emptyMap();
+ if (retainInstanceSequence) {
+ InstancePartitions existingInstancePartitions = InstancePartitionsUtils
+ .fetchOrComputeInstancePartitions(_helixManager, tableConfig, instancePartitionsType);
+ partitionsToInstancesMap = existingInstancePartitions.getPartitionToInstancesMap();
+ }
LOGGER.info("Reassigning {} instances for table: {}", instancePartitionsType, tableNameWithType);
InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType,
- _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true));
+ _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true),
+ partitionsToInstancesMap);
if (!dryRun) {
LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index f1fc049..0bbee49 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -21,7 +21,9 @@ package org.apache.pinot.controller.helix.core.assignment.instance;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
@@ -71,7 +73,8 @@ public class InstanceAssignmentTest {
}
// Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 2 instances
- InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE,
+ instanceConfigs, Collections.emptyMap());
assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
assertEquals(instancePartitions.getNumPartitions(), 1);
// Instances of index 4 to 7 are not assigned because of the hash-based rotation
@@ -95,7 +98,8 @@ public class InstanceAssignmentTest {
// Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3
// instances should be assigned to 2 partitions, each with 2 instances
- instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ instancePartitions = driver
+ .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
assertEquals(instancePartitions.getNumPartitions(), numPartitions);
// Instance of index 7 is not assigned because of the hash-based rotation
@@ -123,6 +127,91 @@ public class InstanceAssignmentTest {
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
assertEquals(instancePartitions.getInstances(1, 2),
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 6));
+
+ // ===== Test against the cases when partitionToInstancesMap isn't empty. =====
+ // Put the existing partition to instances map as the parameter to the InstanceAssignmentDriver
+ // instead of passing an empty map.
+ // The returned instance partition should be the same as the last computed one.
+ Map<Integer, List<String>> existingPartitionToInstancesMap = new HashMap<>();
+ List<String> instances = Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1,
+ SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4,
+ SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9);
+ existingPartitionToInstancesMap.put(0, instances);
+
+ // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3
+ // instances should be assigned to 2 partitions, each with 2 instances
+ instancePartitions = driver
+ .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+ assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+ // Instance of index 7 is not assigned because of the hash-based rotation
+ // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8
+ // [i8, i9, i0, i1, i2, i3, i4, i5, i6, i7]
+ // r0, r1, r2, r0, r1, r2, r0, r1, r2
+ // r0: [i8, i1, i4]
+ // p0, p0, p1
+ // p1
+ // r1: [i9, i2, i5]
+ // p0, p0, p1
+ // p1
+ // r2: [i0, i3, i6]
+ // p0, p0, p1
+ // p1
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 8));
+ assertEquals(instancePartitions.getInstances(1, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 9));
+ assertEquals(instancePartitions.getInstances(1, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9));
+ assertEquals(instancePartitions.getInstances(0, 2),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
+ assertEquals(instancePartitions.getInstances(1, 2),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 6));
+
+ // Remove two instances (i2, i6) and add two new instances (i10, i11).
+ instanceConfigs.remove(6);
+ instanceConfigs.remove(2);
+ for (int i = numInstances; i < numInstances + 2; i++) {
+ InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+ instanceConfig.addTag(OFFLINE_TAG);
+ instanceConfigs.add(instanceConfig);
+ }
+ // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3
+ // instances should be assigned to 2 partitions, each with 2 instances
+ instancePartitions = driver
+ .assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+ assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+ // Instance of index 7 is not assigned because of the hash-based rotation
+ // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8
+ // [i8, i9, i0, i1, i10, i3, i4, i5, i11, i7]
+ // r0, r1, r2, r0, r1, r2, r0, r1, r2
+ // r0: [i8, i1, i4]
+ // p0, p0, p1
+ // p1
+ // r1: [i9, i10, i5]
+ // p0, p0, p1
+ // p1
+ // r2: [i0, i3, i11]
+ // p0, p0, p1
+ // p1
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 8));
+ assertEquals(instancePartitions.getInstances(1, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 8));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 9));
+ assertEquals(instancePartitions.getInstances(1, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 9));
+ assertEquals(instancePartitions.getInstances(0, 2),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
+ assertEquals(instancePartitions.getInstances(1, 2),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 11));
}
@Test
@@ -155,7 +244,8 @@ public class InstanceAssignmentTest {
// Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
// All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
// replica-group 1
- InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs,
+ Collections.emptyMap());
assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
assertEquals(instancePartitions.getNumPartitions(), 1);
assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -182,7 +272,8 @@ public class InstanceAssignmentTest {
// Pool 0 and 2 will be selected in the pool selection
// All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 2 should be assigned to
// replica-group 1
- instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
assertEquals(instancePartitions.getNumPartitions(), 1);
assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -200,7 +291,154 @@ public class InstanceAssignmentTest {
// Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
// All instances in pool 2 should be assigned to replica-group 0, and all instances in pool 0 should be assigned to
// replica-group 1
- instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ assertEquals(instancePartitions.getInstances(0, 0), Arrays
+ .asList(SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 12,
+ SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 14));
+ assertEquals(instancePartitions.getInstances(0, 1), Arrays
+ .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+ SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+
+ // Select pool 0 and 1 in pool selection
+ tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, Arrays.asList(0, 1));
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+
+ // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+ // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
+ // replica-group 1
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ assertEquals(instancePartitions.getInstances(0, 0), Arrays
+ .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+ SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+ assertEquals(instancePartitions.getInstances(0, 1), Arrays
+ .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
+
+ // Assign instances from 2 pools to 3 replica-groups
+ numReplicaGroups = numPools;
+ replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0);
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+
+ // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+ // [pool0, pool1]
+ // r0 r1
+ // r2
+ // Each replica-group should have 2 instances assigned
+ // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+ // pool 0: [i3, i4, i0, i1, i2]
+ // r0 r2 r0 r2
+ // pool 1: [i8, i9, i5, i6, i7]
+ // r1 r1
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
+ assertEquals(instancePartitions.getInstances(0, 2),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4));
+
+ // ===== Test against the cases when partitionToInstancesMap isn't empty. =====
+ // Reset the number of replica groups to 2 and pools to 2.
+ numReplicaGroups = 2;
+ numPools = 2;
+ replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0, 0, 0);
+ tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+ // Reset the instance configs to have only two pools.
+ instanceConfigs.clear();
+ numInstances = 10;
+ for (int i = 0; i < numInstances; i++) {
+ InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+ instanceConfig.addTag(OFFLINE_TAG);
+ int pool = i / numInstancesPerPool;
+ instanceConfig.getRecord()
+ .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+ instanceConfigs.add(instanceConfig);
+ }
+
+ // Put the existing partition to instances map as the parameter to the InstanceAssignmentDriver.
+ Map<Integer, List<String>> existingPartitionToInstancesMap = new HashMap<>();
+ existingPartitionToInstancesMap.put(0, Arrays
+ .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+ SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+ existingPartitionToInstancesMap.put(1, Arrays
+ .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
+
+ // Use all pools, the instance partition should be the same as the one without using
+ // the existing partition to instances map.
+ // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+ // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
+ // replica-group 1.
+ // [pool0, pool1]
+ // r0 r1
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ assertEquals(instancePartitions.getInstances(0, 0), Arrays
+ .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+ SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+ assertEquals(instancePartitions.getInstances(0, 1), Arrays
+ .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
+
+ // Add the third pool with same number of instances
+ numPools = 3;
+ numInstances = numPools * numInstancesPerPool;
+ for (int i = numInstances - numInstancesPerPool; i < numInstances; i++) {
+ InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+ instanceConfig.addTag(OFFLINE_TAG);
+ int pool = numPools - 1;
+ instanceConfig.getRecord()
+ .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+ instanceConfigs.add(instanceConfig);
+ }
+
+ // Putting the existingPartitionToInstancesMap shouldn't change the instance assignment.
+ // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+ // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+ // Pool 0 and 2 will be selected in the pool selection
+ // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 2 should be assigned to
+ // replica-group 1.
+ // [pool0, pool1, pool2]
+ // r0 r1
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ assertEquals(instancePartitions.getInstances(0, 0), Arrays
+ .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2,
+ SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 4));
+ assertEquals(instancePartitions.getInstances(0, 1), Arrays
+ .asList(SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 12,
+ SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 14));
+
+ // Select all 3 pools in pool selection
+ tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
+ tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+
+ // Putting the existingPartitionToInstancesMap shouldn't change the instance assignment.
+ // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
+ // All instances in pool 2 should be assigned to replica-group 0, and all instances in pool 0 should be assigned to
+ // replica-group 1.
+ // [pool0, pool1, pool2]
+ // r1 r2 r0
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
assertEquals(instancePartitions.getNumPartitions(), 1);
assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -215,10 +453,12 @@ public class InstanceAssignmentTest {
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig)));
+ // Putting the existingPartitionToInstancesMap shouldn't change the instance assignment.
// Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
// All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to
// replica-group 1
- instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
assertEquals(instancePartitions.getNumPartitions(), 1);
assertEquals(instancePartitions.getInstances(0, 0), Arrays
@@ -244,7 +484,8 @@ public class InstanceAssignmentTest {
// r0 r2 r0 r2
// pool 1: [i8, i9, i5, i6, i7]
// r1 r1
- instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
assertEquals(instancePartitions.getNumPartitions(), 1);
assertEquals(instancePartitions.getInstances(0, 0),
@@ -253,6 +494,40 @@ public class InstanceAssignmentTest {
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
assertEquals(instancePartitions.getInstances(0, 2),
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4));
+
+ instanceConfigs.remove(12);
+ instanceConfigs.remove(9);
+ instanceConfigs.remove(3);
+ int poolCount = 0;
+ for (int i = numInstances; i < numInstances + 3; i++) {
+ InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+ instanceConfig.addTag(OFFLINE_TAG);
+ int pool = poolCount++;
+ instanceConfig.getRecord()
+ .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+ instanceConfigs.add(instanceConfig);
+ }
+
+ // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
+ // [pool0, pool1]
+ // r0 r1
+ // r2
+ // Each replica-group should have 2 instances assigned
+ // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
+ // pool 0: [i15, i4, i0, i1, i2]
+ // r0 r2 r0 r2
+ // pool 1: [i8, i16, i5, i6, i7]
+ // r1 r1
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingPartitionToInstancesMap);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 15));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 16, SERVER_INSTANCE_ID_PREFIX + 8));
+ assertEquals(instancePartitions.getInstances(0, 2),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4));
}
@Test
@@ -270,7 +545,7 @@ public class InstanceAssignmentTest {
// No instance assignment config
assertFalse(InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, InstancePartitionsType.OFFLINE));
try {
- driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
fail();
} catch (IllegalStateException e) {
assertEquals(e.getMessage(), "Instance assignment is not allowed for the given table config");
@@ -284,7 +559,7 @@ public class InstanceAssignmentTest {
// No instance with correct tag
try {
- driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
fail();
} catch (IllegalStateException e) {
assertEquals(e.getMessage(), "No enabled instance has the tag: tenant_OFFLINE");
@@ -295,7 +570,8 @@ public class InstanceAssignmentTest {
}
// All instances should be assigned as replica-group 0 partition 0
- InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ InstancePartitions instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
assertEquals(instancePartitions.getNumReplicaGroups(), 1);
assertEquals(instancePartitions.getNumPartitions(), 1);
List<String> expectedInstances = new ArrayList<>(numInstances);
@@ -311,7 +587,7 @@ public class InstanceAssignmentTest {
// No instance has correct pool configured
try {
- driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
fail();
} catch (IllegalStateException e) {
assertEquals(e.getMessage(), "No enabled instance has the pool configured for the tag: tenant_OFFLINE");
@@ -328,7 +604,8 @@ public class InstanceAssignmentTest {
// Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
// All instances in pool 0 should be assigned as replica-group 0 partition 0
- instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
assertEquals(instancePartitions.getNumReplicaGroups(), 1);
assertEquals(instancePartitions.getNumPartitions(), 1);
expectedInstances.clear();
@@ -343,7 +620,7 @@ public class InstanceAssignmentTest {
// Ask for too many pools
try {
- driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
fail();
} catch (IllegalStateException e) {
assertEquals(e.getMessage(), "Not enough instance pools (2 in the cluster, asked for 3)");
@@ -355,7 +632,7 @@ public class InstanceAssignmentTest {
// Ask for pool that does not exist
try {
- driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
fail();
} catch (IllegalStateException e) {
assertEquals(e.getMessage(), "Cannot find all instance pools configured: [0, 2]");
@@ -368,7 +645,7 @@ public class InstanceAssignmentTest {
// Ask for too many instances
try {
- driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
fail();
} catch (IllegalStateException e) {
assertEquals(e.getMessage(), "Not enough qualified instances from pool: 0 (5 in the pool, asked for 6)");
@@ -381,7 +658,7 @@ public class InstanceAssignmentTest {
// Number of replica-groups must be positive
try {
- driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
fail();
} catch (IllegalStateException e) {
assertEquals(e.getMessage(), "Number of replica-groups must be positive");
@@ -393,7 +670,7 @@ public class InstanceAssignmentTest {
// Ask for too many replica-groups
try {
- driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
fail();
} catch (IllegalStateException e) {
assertEquals(e.getMessage(),
@@ -406,7 +683,7 @@ public class InstanceAssignmentTest {
// Ask for too many instances
try {
- driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
fail();
} catch (IllegalStateException e) {
assertEquals(e.getMessage(), "Not enough qualified instances from pool: 0 (5 in the pool, asked for 6)");
@@ -418,7 +695,7 @@ public class InstanceAssignmentTest {
// Ask for too many instances per partition
try {
- driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, Collections.emptyMap());
fail();
} catch (IllegalStateException e) {
assertEquals(e.getMessage(),
@@ -434,7 +711,8 @@ public class InstanceAssignmentTest {
// r0 r2 r0 r2
// pool1: [i8, i9, i5, i6, i7]
// r1 r1
- instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
+ instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs,
+ Collections.emptyMap());
assertEquals(instancePartitions.getNumReplicaGroups(), 3);
assertEquals(instancePartitions.getNumPartitions(), 1);
assertEquals(instancePartitions.getInstances(0, 0),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org