You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/11/02 06:34:51 UTC
[incubator-pinot] branch master updated: Enhance TableRebalancer to
support no-downtime rebalance for strict replica-group routing tables
(#6212)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e0f15aa Enhance TableRebalancer to support no-downtime rebalance for strict replica-group routing tables (#6212)
e0f15aa is described below
commit e0f15aab71cda8fce3f06d2f53a4c7e7a4f3c523
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Sun Nov 1 22:34:34 2020 -0800
Enhance TableRebalancer to support no-downtime rebalance for strict replica-group routing tables (#6212)
On top of #6208, this PR enhances the TableRebalancer to support the no-downtime rebalance for strict replica-group routing, which still hold the minimum available replicas requirement.
---
.../helix/core/rebalance/TableRebalancer.java | 106 +++++-
.../helix/core/rebalance/TableRebalancerTest.java | 372 +++++++++++++++++----
2 files changed, 404 insertions(+), 74 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 644e990..081e52c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -23,7 +23,9 @@ import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
import org.I0Itec.zkclient.exception.ZkBadVersionException;
import org.apache.commons.configuration.Configuration;
@@ -46,6 +48,7 @@ import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@@ -127,12 +130,15 @@ public class TableRebalancer {
int minReplicasToKeepUpForNoDowntime = rebalanceConfig
.getInt(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME);
+ boolean enableStrictReplicaGroup =
+ tableConfig.getRoutingConfig() != null && RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE
+ .equalsIgnoreCase(tableConfig.getRoutingConfig().getInstanceSelectorType());
boolean bestEfforts = rebalanceConfig
.getBoolean(RebalanceConfigConstants.BEST_EFFORTS, RebalanceConfigConstants.DEFAULT_BEST_EFFORTS);
LOGGER.info(
- "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: {}, bestEfforts: {}",
+ "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, bestEfforts: {}",
tableNameWithType, dryRun, reassignInstances, includeConsuming, bootstrap, downtime,
- minReplicasToKeepUpForNoDowntime, bestEfforts);
+ minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, bestEfforts);
// Validate table config
try {
@@ -334,8 +340,8 @@ public class TableRebalancer {
minAvailableReplicas = Math.max(numReplicas + minReplicasToKeepUpForNoDowntime, 0);
}
- LOGGER.info("Rebalancing table: {} with minAvailableReplicas: {}, bestEfforts: {}", tableNameWithType,
- minAvailableReplicas, bestEfforts);
+ LOGGER.info("Rebalancing table: {} with minAvailableReplicas: {}, enableStrictReplicaGroup: {}, bestEfforts: {}",
+ tableNameWithType, minAvailableReplicas, enableStrictReplicaGroup, bestEfforts);
int expectedVersion = currentIdealState.getRecord().getVersion();
while (true) {
// Wait for ExternalView to converge before updating the next IdealState
@@ -373,8 +379,10 @@ public class TableRebalancer {
}
if (currentAssignment.equals(targetAssignment)) {
- LOGGER.info("Finished rebalancing table: {} with minAvailableReplicas: {}, bestEfforts: {} in {}ms.",
- tableNameWithType, minAvailableReplicas, bestEfforts, System.currentTimeMillis() - startTimeMs);
+ LOGGER.info(
+ "Finished rebalancing table: {} with minAvailableReplicas: {}, enableStrictReplicaGroup: {}, bestEfforts: {} in {}ms.",
+ tableNameWithType, minAvailableReplicas, enableStrictReplicaGroup, bestEfforts,
+ System.currentTimeMillis() - startTimeMs);
return new RebalanceResult(RebalanceResult.Status.DONE,
"Success with minAvailableReplicas: " + minAvailableReplicas
+ " (both IdealState and ExternalView should reach the target segment assignment)",
@@ -382,7 +390,7 @@ public class TableRebalancer {
}
Map<String, Map<String, String>> nextAssignment =
- getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas);
+ getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup);
LOGGER.info("Got the next assignment for table: {} with number of segments to be moved to each instance: {}",
tableNameWithType,
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, nextAssignment));
@@ -546,22 +554,76 @@ public class TableRebalancer {
return true;
}
- private static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment,
- Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas) {
+ /**
+ * Returns the next assignment for the table based on the current assignment and the target assignment with regards to
+ * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all
+ * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement
+ * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment.
+ */
+ @VisibleForTesting
+ static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup) {
+ return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
+ minAvailableReplicas)
+ : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas);
+ }
+
+ private static Map<String, Map<String, String>> getNextStrictReplicaGroupAssignment(
+ Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment,
+ int minAvailableReplicas) {
Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
+ Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>();
+ for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> currentInstanceStateMap = entry.getValue();
+ Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName);
+ SingleSegmentAssignment assignment =
+ getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas);
+ Set<String> assignedInstances = assignment._instanceStateMap.keySet();
+ Set<String> availableInstances = assignment._availableInstances;
+ availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> {
+ if (currentAvailableInstances == null) {
+ // First segment assigned to these instances, use the new assignment and update the available instances
+ nextAssignment.put(segmentName, assignment._instanceStateMap);
+ return availableInstances;
+ } else {
+ // There are other segments assigned to the same instances, check the available instances to see if adding the
+ // new assignment can still hold the minimum available replicas requirement
+ availableInstances.retainAll(currentAvailableInstances);
+ if (availableInstances.size() >= minAvailableReplicas) {
+ // New assignment can be added
+ nextAssignment.put(segmentName, assignment._instanceStateMap);
+ return availableInstances;
+ } else {
+ // New assignment cannot be added, use the current instance state map
+ nextAssignment.put(segmentName, currentInstanceStateMap);
+ return currentAvailableInstances;
+ }
+ }
+ });
+ }
+ return nextAssignment;
+ }
+ private static Map<String, Map<String, String>> getNextNonStrictReplicaGroupAssignment(
+ Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment,
+ int minAvailableReplicas) {
+ Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) {
String segmentName = entry.getKey();
nextAssignment.put(segmentName,
- getNextInstanceStateMap(entry.getValue(), targetAssignment.get(segmentName), minAvailableReplicas));
+ getNextSingleSegmentAssignment(entry.getValue(), targetAssignment.get(segmentName),
+ minAvailableReplicas)._instanceStateMap);
}
-
return nextAssignment;
}
+ /**
+ * Returns the next assignment for a segment based on the current instance state map and the target instance state map
+ * with regards to the minimum available replicas requirement.
+ */
@VisibleForTesting
- @SuppressWarnings("Duplicates")
- static Map<String, String> getNextInstanceStateMap(Map<String, String> currentInstanceStateMap,
+ static SingleSegmentAssignment getNextSingleSegmentAssignment(Map<String, String> currentInstanceStateMap,
Map<String, String> targetInstanceStateMap, int minAvailableReplicas) {
Map<String, String> nextInstanceStateMap = new TreeMap<>();
@@ -586,6 +648,7 @@ public class TableRebalancer {
}
}
}
+ Set<String> availableInstances = new TreeSet<>(nextInstanceStateMap.keySet());
// Add target instances until the number of instances matched
int instancesToAdd = targetInstanceStateMap.size() - nextInstanceStateMap.size();
@@ -601,6 +664,21 @@ public class TableRebalancer {
}
}
- return nextInstanceStateMap;
+ return new SingleSegmentAssignment(nextInstanceStateMap, availableInstances);
+ }
+
+ /**
+ * Assignment result for a single segment.
+ */
+ @VisibleForTesting
+ static class SingleSegmentAssignment {
+ final Map<String, String> _instanceStateMap;
+ // Instances that are common in both current instance state and next instance state of the segment
+ final Set<String> _availableInstances;
+
+ SingleSegmentAssignment(Map<String, String> instanceStateMap, Set<String> availableInstances) {
+ _instanceStateMap = instanceStateMap;
+ _availableInstances = availableInstances;
+ }
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
index a4e41f9..681ed2c 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
@@ -19,8 +19,10 @@
package org.apache.pinot.controller.helix.core.rebalance;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
+import java.util.TreeSet;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.testng.annotations.Test;
@@ -43,27 +45,31 @@ public class TableRebalancerTest {
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE);
Map<String, String> targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3"), ONLINE);
- Map<String, String> nextInstanceStateMap =
- TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 0);
- assertEquals(nextInstanceStateMap, targetInstanceStateMap);
+ TableRebalancer.SingleSegmentAssignment assignment =
+ TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, Collections.singleton("host1"));
// Without common instance, next assignment should be the same as target assignment
targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host3", "host4"), ONLINE);
- nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 0);
- assertEquals(nextInstanceStateMap, targetInstanceStateMap);
+ assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertTrue(assignment._availableInstances.isEmpty());
// With increasing number of replicas, next assignment should be the same as target assignment
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host3", "host4", "host5"), ONLINE);
- nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 0);
- assertEquals(nextInstanceStateMap, targetInstanceStateMap);
+ assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertTrue(assignment._availableInstances.isEmpty());
// With decreasing number of replicas, next assignment should be the same as target assignment
currentInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE);
targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5"), ONLINE);
- nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 0);
- assertEquals(nextInstanceStateMap, targetInstanceStateMap);
+ assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertTrue(assignment._availableInstances.isEmpty());
}
@Test
@@ -73,35 +79,41 @@ public class TableRebalancerTest {
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE);
Map<String, String> targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host4"), ONLINE);
- Map<String, String> nextInstanceStateMap =
- TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 1);
- assertEquals(nextInstanceStateMap, targetInstanceStateMap);
+ TableRebalancer.SingleSegmentAssignment assignment =
+ TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
// With 1 common instance, next assignment should be the same as target assignment
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE);
- nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 1);
- assertEquals(nextInstanceStateMap, targetInstanceStateMap);
+ assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, Collections.singleton("host1"));
// Without common instance, next assignment should have 1 common instances with current assignment
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5", "host6"), ONLINE);
// [host1, host4, host5]
- nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 1);
- assertEquals(getNumCommonInstances(currentInstanceStateMap, nextInstanceStateMap), 1);
+ assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+ assertEquals(assignment._availableInstances, Collections.singleton("host1"));
// Next round should make the assignment the same as target assignment
- assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, targetInstanceStateMap, 1),
- targetInstanceStateMap);
+ assignment =
+ TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host4", "host5")));
// With increasing number of replicas, next assignment should have 1 common instances with current assignment
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5", "host6", "host7"), ONLINE);
// [host1, host4, host5, host6]
- nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 1);
- assertEquals(getNumCommonInstances(currentInstanceStateMap, nextInstanceStateMap), 1);
+ assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+ assertEquals(assignment._availableInstances, Collections.singleton("host1"));
// Next round should make the assignment the same as target assignment
- assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, targetInstanceStateMap, 1),
- targetInstanceStateMap);
+ assignment =
+ TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host4", "host5", "host6")));
// With decreasing number of replicas, next assignment should have 1 common instances with current assignment
currentInstanceStateMap =
@@ -109,11 +121,13 @@ public class TableRebalancerTest {
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7"), ONLINE);
// [host1, host5, host6]
- nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 1);
- assertEquals(getNumCommonInstances(currentInstanceStateMap, nextInstanceStateMap), 1);
+ assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+ assertEquals(assignment._availableInstances, Collections.singleton("host1"));
// Next round should make the assignment the same as target assignment
- assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, targetInstanceStateMap, 1),
- targetInstanceStateMap);
+ assignment =
+ TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
}
@Test
@@ -123,72 +137,310 @@ public class TableRebalancerTest {
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3", "host4"), ONLINE);
Map<String, String> targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3", "host5"), ONLINE);
- Map<String, String> nextInstanceStateMap =
- TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 2);
- assertEquals(nextInstanceStateMap, targetInstanceStateMap);
+ TableRebalancer.SingleSegmentAssignment assignment =
+ TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
// With 2 common instances, next assignment should be the same as target assignment
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6"), ONLINE);
- nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 2);
- assertEquals(nextInstanceStateMap, targetInstanceStateMap);
+ assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
// With 1 common instance, next assignment should have 2 common instances with current assignment
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5", "host6", "host7"), ONLINE);
- nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 2);
// [host1, host2, host5, host6]
- assertEquals(getNumCommonInstances(currentInstanceStateMap, nextInstanceStateMap), 2);
+ assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
// Next round should make the assignment the same as target assignment
- assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, targetInstanceStateMap, 2),
- targetInstanceStateMap);
+ assignment =
+ TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host5", "host6")));
// Without common instance, next assignment should have 2 common instances with current assignment
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7", "host8"), ONLINE);
// [host1, host2, host5, host6]
- nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 2);
- assertEquals(getNumCommonInstances(currentInstanceStateMap, nextInstanceStateMap), 2);
+ assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
// Next round should make the assignment the same as target assignment
- assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, targetInstanceStateMap, 2),
- targetInstanceStateMap);
+ assignment =
+ TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
// With increasing number of replicas, next assignment should have 1 common instances with current assignment
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7", "host8", "host9"), ONLINE);
// [host1, host2, host5, host6, host7]
- nextInstanceStateMap = TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 2);
- assertEquals(getNumCommonInstances(currentInstanceStateMap, nextInstanceStateMap), 2);
+ assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
// Next round should make the assignment the same as target assignment
- assertEquals(TableRebalancer.getNextInstanceStateMap(nextInstanceStateMap, targetInstanceStateMap, 2),
- targetInstanceStateMap);
+ assignment =
+ TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6", "host7")));
// With decreasing number of replicas, next assignment should have 2 common instances with current assignment
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7"), ONLINE);
// [host1, host2, host5]
- Map<String, String> firstRoundInstanceStateMap =
- TableRebalancer.getNextInstanceStateMap(currentInstanceStateMap, targetInstanceStateMap, 2);
- assertEquals(getNumCommonInstances(currentInstanceStateMap, firstRoundInstanceStateMap), 2);
+ assignment = TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
// Next round should have 2 common instances with first round assignment
// [host1, host5, host6]
- Map<String, String> secondRoundInstanceStateMap =
- TableRebalancer.getNextInstanceStateMap(firstRoundInstanceStateMap, targetInstanceStateMap, 2);
- assertEquals(getNumCommonInstances(firstRoundInstanceStateMap, secondRoundInstanceStateMap), 2);
+ assignment =
+ TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host5")));
// Next round should make the assignment the same as target assignment
- assertEquals(TableRebalancer.getNextInstanceStateMap(secondRoundInstanceStateMap, targetInstanceStateMap, 2),
- targetInstanceStateMap);
+ assignment =
+ TableRebalancer.getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
}
- private int getNumCommonInstances(Map<String, String> currentInstanceStateMap,
- Map<String, String> nextInstanceStateMap) {
- int numCommonInstances = 0;
- for (String instanceId : currentInstanceStateMap.keySet()) {
- if (nextInstanceStateMap.containsKey(instanceId)) {
- numCommonInstances++;
- }
- }
- return numCommonInstances;
+ @Test
+ public void testStrictReplicaGroup() {
+ // Current assignment:
+ // {
+ // "segment1": {
+ // "host1": "ONLINE",
+ // "host2": "ONLINE",
+ // "host3": "ONLINE"
+ // },
+ // "segment2": {
+ // "host2": "ONLINE",
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment3": {
+ // "host1": "ONLINE",
+ // "host2": "ONLINE",
+ // "host3": "ONLINE"
+ // },
+ // "segment4": {
+ // "host2": "ONLINE",
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // }
+ // }
+ Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
+ currentAssignment
+ .put("segment1", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
+ currentAssignment
+ .put("segment2", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE));
+ currentAssignment
+ .put("segment3", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
+ currentAssignment
+ .put("segment4", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE));
+
+ // Target assignment:
+ // {
+ // "segment1": {
+ // "host1": "ONLINE",
+ // "host3": "ONLINE",
+ // "host5": "ONLINE"
+ // },
+ // "segment2": {
+ // "host2": "ONLINE",
+ // "host4": "ONLINE",
+ // "host6": "ONLINE"
+ // },
+ // "segment3": {
+ // "host1": "ONLINE",
+ // "host3": "ONLINE",
+ // "host5": "ONLINE"
+ // },
+ // "segment4": {
+ // "host2": "ONLINE",
+ // "host4": "ONLINE",
+ // "host6": "ONLINE"
+ // }
+ // }
+ Map<String, Map<String, String>> targetAssignment = new TreeMap<>();
+ targetAssignment
+ .put("segment1", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host5"), ONLINE));
+ targetAssignment
+ .put("segment2", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE));
+ targetAssignment
+ .put("segment3", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host5"), ONLINE));
+ targetAssignment
+ .put("segment4", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE));
+
+ // Next assignment with 2 minimum available replicas with or without strict replica-group should reach the target
+ // assignment
+ Map<String, Map<String, String>> nextAssignment =
+ TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, false);
+ assertEquals(nextAssignment, targetAssignment);
+ nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, true);
+ assertEquals(nextAssignment, targetAssignment);
+
+ // Current assignment:
+ // {
+ // "segment1": {
+ // "host1": "ONLINE",
+ // "host2": "ONLINE",
+ // "host3": "ONLINE"
+ // },
+ // "segment2": {
+ // "host2": "ONLINE",
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment3": {
+ // "host1": "ONLINE",
+ // "host2": "ONLINE",
+ // "host3": "ONLINE"
+ // },
+ // "segment4": {
+ // "host2": "ONLINE",
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // }
+ // }
+ currentAssignment = new TreeMap<>();
+ currentAssignment
+ .put("segment1", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
+ currentAssignment
+ .put("segment2", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE));
+ currentAssignment
+ .put("segment3", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
+ currentAssignment
+ .put("segment4", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE));
+
+ // Target assignment:
+ // {
+ // "segment1": {
+ // "host2": "ONLINE",
+ // "host4": "ONLINE",
+ // "host6": "ONLINE"
+ // },
+ // "segment2": {
+ // "host1": "ONLINE",
+ // "host4": "ONLINE",
+ // "host5": "ONLINE"
+ // },
+ // "segment3": {
+ // "host2": "ONLINE",
+ // "host4": "ONLINE",
+ // "host6": "ONLINE"
+ // },
+ // "segment4": {
+ // "host1": "ONLINE",
+ // "host4": "ONLINE",
+ // "host5": "ONLINE"
+ // }
+ // }
+ targetAssignment = new TreeMap<>();
+ targetAssignment
+ .put("segment1", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE));
+ targetAssignment
+ .put("segment2", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE));
+ targetAssignment
+ .put("segment3", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE));
+ targetAssignment
+ .put("segment4", SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE));
+
+ // Next assignment with 2 minimum available replicas without strict replica-group:
+ // (This assignment will move "segment1" and "segment3" from "host3" to "host4", and move "segment2" and "segment4"
+ // from "host3" to "host1". "host1" and "host4" might be unavailable for strict replica-group routing, which breaks
+ // the minimum available replicas requirement.)
+ // {
+ // "segment1": {
+ // "host1": "ONLINE",
+ // "host2": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment2": {
+ // "host1": "ONLINE",
+ // "host2": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment3": {
+ // "host1": "ONLINE",
+ // "host2": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment4": {
+ // "host1": "ONLINE",
+ // "host2": "ONLINE",
+ // "host4": "ONLINE"
+ // }
+ // }
+ nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, false);
+ assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+ assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+ assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+ assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+
+ // Next assignment with 2 minimum available replicas with strict replica-group:
+ // (This assignment will only move "segment1" and "segment3" from "host3" to "host4". Only "host4" can be
+ // unavailable for strict replica-group routing during the rebalance, which meets the minimum available replicas
+ // requirement.)
+ // {
+ // "segment1": {
+ // "host1": "ONLINE",
+ // "host2": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment2": {
+ // "host2": "ONLINE",
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment3": {
+ // "host1": "ONLINE",
+ // "host2": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment4": {
+ // "host2": "ONLINE",
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // }
+ // }
+ nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, true);
+ assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+ assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host2", "host3", "host4")));
+ assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+ assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host2", "host3", "host4")));
+
+ // Next assignment with 2 minimum available replicas with strict replica-group:
+ // {
+ // "segment1": {
+ // "host2": "ONLINE",
+ // "host4": "ONLINE",
+ // "host6": "ONLINE"
+ // },
+ // "segment2": {
+ // "host1": "ONLINE",
+ // "host2": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment3": {
+ // "host2": "ONLINE",
+ // "host4": "ONLINE",
+ // "host6": "ONLINE"
+ // },
+ // "segment4": {
+ // "host1": "ONLINE",
+ // "host2": "ONLINE",
+ // "host4": "ONLINE"
+ // }
+ // }
+ nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, true);
+ assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host2", "host4", "host6")));
+ assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+ assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host2", "host4", "host6")));
+ assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+
+ // Next assignment with 2 minimum available replicas with strict replica-group should reach the target assignment
+ nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, true);
+ assertEquals(nextAssignment, targetAssignment);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org