You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/30 21:22:15 UTC
(pinot) branch master updated: Introduce low disk mode to table rebalance (#12072)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cc7e7a8b86 Introduce low disk mode to table rebalance (#12072)
cc7e7a8b86 is described below
commit cc7e7a8b869c3d04c15ce8c5b14c0c18b0d83fe1
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Nov 30 13:22:08 2023 -0800
Introduce low disk mode to table rebalance (#12072)
---
.../api/resources/PinotTableRestletResource.java | 34 +-
.../helix/core/rebalance/RebalanceConfig.java | 16 +
.../helix/core/rebalance/TableRebalancer.java | 58 +-
.../helix/core/rebalance/TableRebalancerTest.java | 885 +++++++++++++++++++--
.../apache/pinot/tools/PinotTableRebalancer.java | 4 +-
.../tools/admin/command/RebalanceTableCommand.java | 10 +-
6 files changed, 898 insertions(+), 109 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 55ffb01046..5d35e1f0c3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -602,6 +602,7 @@ public class PinotTableRestletResource {
@ApiOperation(value = "Rebalances a table (reassign instances and segments for a table)",
notes = "Rebalances a table (reassign instances and segments for a table)")
public RebalanceResult rebalance(
+ //@formatter:off
@ApiParam(value = "Name of the table to rebalance", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
@ApiParam(value = "Whether to rebalance table in dry-run mode") @DefaultValue("false") @QueryParam("dryRun")
@@ -609,19 +610,24 @@ public class PinotTableRestletResource {
@ApiParam(value = "Whether to reassign instances before reassigning segments") @DefaultValue("false")
@QueryParam("reassignInstances") boolean reassignInstances,
@ApiParam(value = "Whether to reassign CONSUMING segments for real-time table") @DefaultValue("false")
- @QueryParam("includeConsuming") boolean includeConsuming, @ApiParam(
- value = "Whether to rebalance table in bootstrap mode (regardless of minimum segment movement, reassign all "
- + "segments in a round-robin fashion as if adding new segments to an empty table)") @DefaultValue("false")
- @QueryParam("bootstrap") boolean bootstrap,
+ @QueryParam("includeConsuming") boolean includeConsuming,
+ @ApiParam(value = "Whether to rebalance table in bootstrap mode (regardless of minimum segment movement, "
+ + "reassign all segments in a round-robin fashion as if adding new segments to an empty table)")
+ @DefaultValue("false") @QueryParam("bootstrap") boolean bootstrap,
@ApiParam(value = "Whether to allow downtime for the rebalance") @DefaultValue("false") @QueryParam("downtime")
boolean downtime,
@ApiParam(value = "For no-downtime rebalance, minimum number of replicas to keep alive during rebalance, or "
+ "maximum number of replicas allowed to be unavailable if value is negative") @DefaultValue("1")
- @QueryParam("minAvailableReplicas") int minAvailableReplicas, @ApiParam(
- value = "Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime contract cannot "
- + "be achieved)") @DefaultValue("false") @QueryParam("bestEfforts") boolean bestEfforts, @ApiParam(
- value = "How often to check if external view converges with ideal states") @DefaultValue("1000")
- @QueryParam("externalViewCheckIntervalInMs") long externalViewCheckIntervalInMs,
+ @QueryParam("minAvailableReplicas") int minAvailableReplicas,
+ @ApiParam(value = "For no-downtime rebalance, whether to enable low disk mode during rebalance. When enabled, "
+ + "segments will first be offloaded from servers, then added to servers after offload is done while "
+ + "maintaining the min available replicas. It may increase the total time of the rebalance, but can be "
+ + "useful when servers are low on disk space, and we want to scale up the cluster and rebalance the table to "
+ + "more servers.") @DefaultValue("false") @QueryParam("lowDiskMode") boolean lowDiskMode,
+ @ApiParam(value = "Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime "
+ + "contract cannot be achieved)") @DefaultValue("false") @QueryParam("bestEfforts") boolean bestEfforts,
+ @ApiParam(value = "How often to check if external view converges with ideal states") @DefaultValue("1000")
+ @QueryParam("externalViewCheckIntervalInMs") long externalViewCheckIntervalInMs,
@ApiParam(value = "How long to wait till external view converges with ideal states") @DefaultValue("3600000")
@QueryParam("externalViewStabilizationTimeoutInMs") long externalViewStabilizationTimeoutInMs,
@ApiParam(value = "How often to make a status update (i.e. heartbeat)") @DefaultValue("300000")
@@ -629,10 +635,13 @@ public class PinotTableRestletResource {
@ApiParam(value = "How long to wait for next status update (i.e. heartbeat) before the job is considered failed")
@DefaultValue("3600000") @QueryParam("heartbeatTimeoutInMs") long heartbeatTimeoutInMs,
@ApiParam(value = "Max number of attempts to rebalance") @DefaultValue("3") @QueryParam("maxAttempts")
- int maxAttempts, @ApiParam(value = "Initial delay to exponentially backoff retry") @DefaultValue("300000")
- @QueryParam("retryInitialDelayInMs") long retryInitialDelayInMs,
+ int maxAttempts,
+ @ApiParam(value = "Initial delay to exponentially backoff retry") @DefaultValue("300000")
+ @QueryParam("retryInitialDelayInMs") long retryInitialDelayInMs,
@ApiParam(value = "Whether to update segment target tier as part of the rebalance") @DefaultValue("false")
- @QueryParam("updateTargetTier") boolean updateTargetTier) {
+ @QueryParam("updateTargetTier") boolean updateTargetTier
+ //@formatter:on
+ ) {
String tableNameWithType = constructTableNameWithType(tableName, tableTypeStr);
RebalanceConfig rebalanceConfig = new RebalanceConfig();
rebalanceConfig.setDryRun(dryRun);
@@ -641,6 +650,7 @@ public class PinotTableRestletResource {
rebalanceConfig.setBootstrap(bootstrap);
rebalanceConfig.setDowntime(downtime);
rebalanceConfig.setMinAvailableReplicas(minAvailableReplicas);
+ rebalanceConfig.setLowDiskMode(lowDiskMode);
rebalanceConfig.setBestEfforts(bestEfforts);
rebalanceConfig.setExternalViewCheckIntervalInMs(externalViewCheckIntervalInMs);
rebalanceConfig.setExternalViewStabilizationTimeoutInMs(externalViewStabilizationTimeoutInMs);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
index 94709f065c..b6b471cfbc 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
@@ -61,6 +61,14 @@ public class RebalanceConfig {
@ApiModelProperty(example = "1")
private int _minAvailableReplicas = DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME;
+ // For no-downtime rebalance, whether to enable low disk mode during rebalance. When enabled, segments will first be
+ // offloaded from servers, then added to servers after offload is done while maintaining the min available replicas.
+ // It may increase the total time of the rebalance, but can be useful when servers are low on disk space, and we want
+ // to scale up the cluster and rebalance the table to more servers.
+ @JsonProperty("lowDiskMode")
+ @ApiModelProperty(example = "false")
+ private boolean _lowDiskMode = false;
+
// Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime contract cannot be achieved)
// When using best-efforts to rebalance, the following scenarios won't fail the rebalance (will log warnings instead):
// - Segment falls into ERROR state in ExternalView -> count ERROR state as good state
@@ -150,6 +158,14 @@ public class RebalanceConfig {
_minAvailableReplicas = minAvailableReplicas;
}
+ public boolean isLowDiskMode() {
+ return _lowDiskMode;
+ }
+
+ public void setLowDiskMode(boolean lowDiskMode) {
+ _lowDiskMode = lowDiskMode;
+ }
+
public boolean isBestEfforts() {
return _bestEfforts;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index cd4d70ee3c..318d251bfc 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -172,6 +172,7 @@ public class TableRebalancer {
boolean bootstrap = rebalanceConfig.isBootstrap();
boolean downtime = rebalanceConfig.isDowntime();
int minReplicasToKeepUpForNoDowntime = rebalanceConfig.getMinAvailableReplicas();
+ boolean lowDiskMode = rebalanceConfig.isLowDiskMode();
boolean bestEfforts = rebalanceConfig.isBestEfforts();
long externalViewCheckIntervalInMs = rebalanceConfig.getExternalViewCheckIntervalInMs();
long externalViewStabilizationTimeoutInMs = rebalanceConfig.getExternalViewStabilizationTimeoutInMs();
@@ -180,10 +181,11 @@ public class TableRebalancer {
tableConfig.getRoutingConfig().getInstanceSelectorType());
LOGGER.info(
"Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, "
- + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, bestEfforts: {}, "
- + "externalViewCheckIntervalInMs: {}, externalViewStabilizationTimeoutInMs: {}", tableNameWithType, dryRun,
- reassignInstances, includeConsuming, bootstrap, downtime, minReplicasToKeepUpForNoDowntime,
- enableStrictReplicaGroup, bestEfforts, externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs);
+ + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, lowDiskMode: {}, "
+ + "bestEfforts: {}, externalViewCheckIntervalInMs: {}, externalViewStabilizationTimeoutInMs: {}",
+ tableNameWithType, dryRun, reassignInstances, includeConsuming, bootstrap, downtime,
+ minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, lowDiskMode, bestEfforts,
+ externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs);
// Fetch ideal state
PropertyKey idealStatePropertyKey = _helixDataAccessor.keyBuilder().idealStates(tableNameWithType);
@@ -483,7 +485,8 @@ public class TableRebalancer {
tierToInstancePartitionsMap, targetAssignment);
}
Map<String, Map<String, String>> nextAssignment =
- getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup);
+ getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup,
+ lowDiskMode);
LOGGER.info("For rebalanceId: {}, got the next assignment for table: {} with number of segments to be moved to "
+ "each instance: {}", rebalanceJobId, tableNameWithType,
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, nextAssignment));
@@ -847,15 +850,17 @@ public class TableRebalancer {
*/
@VisibleForTesting
static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment,
- Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup) {
+ Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup,
+ boolean lowDiskMode) {
return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
- minAvailableReplicas)
- : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas);
+ minAvailableReplicas, lowDiskMode)
+ : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas,
+ lowDiskMode);
}
private static Map<String, Map<String, String>> getNextStrictReplicaGroupAssignment(
Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment,
- int minAvailableReplicas) {
+ int minAvailableReplicas, boolean lowDiskMode) {
Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
Map<String, Integer> numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new HashMap<>();
@@ -866,7 +871,7 @@ public class TableRebalancer {
Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName);
SingleSegmentAssignment assignment =
getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas,
- numSegmentsToOffloadMap, assignmentMap);
+ lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
Set<String> assignedInstances = assignment._instanceStateMap.keySet();
Set<String> availableInstances = assignment._availableInstances;
availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> {
@@ -897,7 +902,7 @@ public class TableRebalancer {
private static Map<String, Map<String, String>> getNextNonStrictReplicaGroupAssignment(
Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment,
- int minAvailableReplicas) {
+ int minAvailableReplicas, boolean lowDiskMode) {
Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
Map<String, Integer> numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new HashMap<>();
@@ -907,7 +912,7 @@ public class TableRebalancer {
Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName);
Map<String, String> nextInstanceStateMap =
getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas,
- numSegmentsToOffloadMap, assignmentMap)._instanceStateMap;
+ lowDiskMode, numSegmentsToOffloadMap, assignmentMap)._instanceStateMap;
nextAssignment.put(segmentName, nextInstanceStateMap);
updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(),
nextInstanceStateMap.keySet());
@@ -954,7 +959,7 @@ public class TableRebalancer {
*/
@VisibleForTesting
static SingleSegmentAssignment getNextSingleSegmentAssignment(Map<String, String> currentInstanceStateMap,
- Map<String, String> targetInstanceStateMap, int minAvailableReplicas,
+ Map<String, String> targetInstanceStateMap, int minAvailableReplicas, boolean lowDiskMode,
Map<String, Integer> numSegmentsToOffloadMap, Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap) {
Map<String, String> nextInstanceStateMap = new TreeMap<>();
@@ -1005,16 +1010,23 @@ public class TableRebalancer {
}
Set<String> availableInstances = new TreeSet<>(nextInstanceStateMap.keySet());
- // Add target instances until the number of instances matched
- int numInstancesToAdd = targetInstanceStateMap.size() - nextInstanceStateMap.size();
- if (numInstancesToAdd > 0) {
- // Sort instances by number of segments to offload, and add the ones with the least segments to offload
- List<Triple<String, String, Integer>> instancesInfo =
- getSortedInstancesOnNumSegmentsToOffload(targetInstanceStateMap, nextInstanceStateMap,
- numSegmentsToOffloadMap);
- for (int i = 0; i < numInstancesToAdd; i++) {
- Triple<String, String, Integer> instanceInfo = instancesInfo.get(i);
- nextInstanceStateMap.put(instanceInfo.getLeft(), instanceInfo.getMiddle());
+ // After achieving the min available replicas, when low disk mode is enabled, only add new instances when all
+ // current instances exist in the next assignment.
+ // We want to first drop the extra instances as one step, then add the target instances as another step to avoid the
+ // case where segments are first added to the instance before other segments are dropped from the instance, which
+ // might cause server running out of disk. Note that even if segment addition and drop happen in the same step,
+ // there is no guarantee that server process the segment drop before the segment addition.
+ if (!lowDiskMode || currentInstanceStateMap.size() == nextInstanceStateMap.size()) {
+ int numInstancesToAdd = targetInstanceStateMap.size() - nextInstanceStateMap.size();
+ if (numInstancesToAdd > 0) {
+ // Sort instances by number of segments to offload, and add the ones with the least segments to offload
+ List<Triple<String, String, Integer>> instancesInfo =
+ getSortedInstancesOnNumSegmentsToOffload(targetInstanceStateMap, nextInstanceStateMap,
+ numSegmentsToOffloadMap);
+ for (int i = 0; i < numInstancesToAdd; i++) {
+ Triple<String, String, Integer> instanceInfo = instancesInfo.get(i);
+ nextInstanceStateMap.put(instanceInfo.getLeft(), instanceInfo.getMiddle());
+ }
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
index 84a660cc5e..ecf1e0feda 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
@@ -43,189 +43,512 @@ public class TableRebalancerTest {
@Test
public void testDowntimeMode() {
- // With common instance, next assignment should be the same as target assignment
+ // With common instance, first assignment should be the same as target assignment
Map<String, String> currentInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE);
Map<String, String> targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3"), ONLINE);
TableRebalancer.SingleSegmentAssignment assignment =
- getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0);
+ getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0, false);
assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
assertEquals(assignment._availableInstances, Collections.singleton("host1"));
- // Without common instance, next assignment should be the same as target assignment
+ // Without common instance, first assignment should be the same as target assignment
targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host3", "host4"), ONLINE);
- assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0, false);
assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
assertTrue(assignment._availableInstances.isEmpty());
- // With increasing number of replicas, next assignment should be the same as target assignment
+ // With increasing number of replicas, first assignment should be the same as target assignment
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host3", "host4", "host5"), ONLINE);
- assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0, false);
assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
assertTrue(assignment._availableInstances.isEmpty());
- // With decreasing number of replicas, next assignment should be the same as target assignment
+ // With decreasing number of replicas, first assignment should be the same as target assignment
currentInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE);
targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5"), ONLINE);
- assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0, false);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertTrue(assignment._availableInstances.isEmpty());
+ }
+
+ @Test
+ public void testDowntimeWithLowDiskMode() {
+ // With common instance, first assignment should keep the common instance and remove the not common instance
+ Map<String, String> currentInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE);
+ Map<String, String> targetInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3"), ONLINE);
+ TableRebalancer.SingleSegmentAssignment assignment =
+ getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0, true);
+ assertEquals(assignment._instanceStateMap, Collections.singletonMap("host1", ONLINE));
+ assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+ // Second assignment should be the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 0, true);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+
+ // Without common instance, first assignment should drop all instances
+ targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host3", "host4"), ONLINE);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0, true);
+ assertTrue(assignment._instanceStateMap.isEmpty());
+ assertTrue(assignment._availableInstances.isEmpty());
+ // Second assignment should be the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 0, true);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertTrue(assignment._availableInstances.isEmpty());
+
+ // With increasing number of replicas, first assignment should drop all instances
+ targetInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host3", "host4", "host5"), ONLINE);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0, true);
+ assertTrue(assignment._instanceStateMap.isEmpty());
+ assertTrue(assignment._availableInstances.isEmpty());
+ // Second assignment should be the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 0, true);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertTrue(assignment._availableInstances.isEmpty());
+
+ // With decreasing number of replicas, first assignment should drop all instances
+ currentInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE);
+ targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5"), ONLINE);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0, true);
+ assertTrue(assignment._instanceStateMap.isEmpty());
+ assertTrue(assignment._availableInstances.isEmpty());
+ // Second assignment should be the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 0, true);
assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
assertTrue(assignment._availableInstances.isEmpty());
}
@Test
public void testOneMinAvailableReplicas() {
- // With 2 common instances, next assignment should be the same as target assignment
+ // With 2 common instances, first assignment should be the same as target assignment
Map<String, String> currentInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE);
Map<String, String> targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host4"), ONLINE);
TableRebalancer.SingleSegmentAssignment assignment =
- getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+ getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, false);
assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
- // With 1 common instance, next assignment should be the same as target assignment
+ // With 1 common instance, first assignment should be the same as target assignment
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE);
- assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, false);
assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
assertEquals(assignment._availableInstances, Collections.singleton("host1"));
- // Without common instance, next assignment should have 1 common instances with current assignment
+ // Without common instance, first assignment should have 1 common instances with current assignment
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5", "host6"), ONLINE);
- // [host1, host4, host5]
- assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, false);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE));
assertEquals(assignment._availableInstances, Collections.singleton("host1"));
- // Next round should make the assignment the same as target assignment
- assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1);
+ // Second assignment should be the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, false);
assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host4", "host5")));
- // With increasing number of replicas, next assignment should have 1 common instances with current assignment
+ // With increasing number of replicas, first assignment should have 1 common instances with current assignment
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5", "host6", "host7"), ONLINE);
- // [host1, host4, host5, host6]
- assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, false);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5", "host6"), ONLINE));
assertEquals(assignment._availableInstances, Collections.singleton("host1"));
- // Next round should make the assignment the same as target assignment
- assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1);
+ // Second assignment should be the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, false);
assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host4", "host5", "host6")));
- // With decreasing number of replicas, next assignment should have 1 common instances with current assignment
+ // With decreasing number of replicas, first assignment should have 1 common instances with current assignment
currentInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3", "host4"), ONLINE);
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7"), ONLINE);
- // [host1, host5, host6]
- assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, false);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5", "host6"), ONLINE));
assertEquals(assignment._availableInstances, Collections.singleton("host1"));
- // Next round should make the assignment the same as target assignment
- assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1);
+ // Second assignment should be the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, false);
assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
- // With increasing from 1 replica, next assignment should have 1 common instances with current assignment
+ // With increasing from 1 replica, first assignment should have 1 common instances with current assignment
currentInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Collections.singletonList("host1"), ONLINE);
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE);
- // [host1, host2, host3]
- assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, false);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
assertEquals(assignment._availableInstances, Collections.singleton("host1"));
- // Next round should make the assignment the same as target assignment
- assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1);
+ // Second assignment should be the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, false);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host2", "host3")));
+ }
+
+ @Test
+ public void testOneMinAvailableReplicasWithLowDiskMode() {
+ // With 2 common instances, first assignment should keep the common instances and remove the not common instance
+ Map<String, String> currentInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE);
+ Map<String, String> targetInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host4"), ONLINE);
+ TableRebalancer.SingleSegmentAssignment assignment =
+ getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+ // Second assignment should be the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+
+ // With 1 common instance, first assignment should keep the common instance and remove the not common instances
+ targetInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, true);
+ assertEquals(assignment._instanceStateMap, Collections.singletonMap("host1", ONLINE));
+ assertEquals(assignment._availableInstances, Collections.singletonList("host1"));
+ // Second assignment should be the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+
+ // Without common instance, fist assignment should keep 1 instance from current assignment
+ targetInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5", "host6"), ONLINE);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, true);
+ assertEquals(assignment._instanceStateMap, Collections.singletonMap("host1", ONLINE));
+ assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+ // Second assignment should add 2 instances from target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE));
+ assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+ // Third assignment should remove the old instance from current assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host4", "host5")));
+ // Fourth assignment should make the assignment the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host4", "host5")));
+
+ // With increasing number of replicas, fist assignment should keep 1 instance from current assignment
+ targetInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5", "host6", "host7"), ONLINE);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, true);
+ assertEquals(assignment._instanceStateMap, Collections.singletonMap("host1", ONLINE));
+ assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+ // Second assignment should add 3 instances from target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5", "host6"), ONLINE));
+ assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+ // Third assignment should remove the old instance from current assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5", "host6"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host4", "host5", "host6")));
+ // Fourth assignment should make the assignment the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host4", "host5", "host6")));
+
+ // With decreasing number of replicas, fist assignment should keep 1 instance from current assignment
+ currentInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3", "host4"), ONLINE);
+ targetInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7"), ONLINE);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, true);
+ assertEquals(assignment._instanceStateMap, Collections.singletonMap("host1", ONLINE));
+ assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+ // Second assignment should add 2 instances from target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5", "host6"), ONLINE));
+ assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+ // Third assignment should remove the old instance from current assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
+ // Fourth assignment should make the assignment the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
+
+ // With increasing from 1 replica, fist assignment should keep the instance from current assignment, and add 2
+ // instances from target assignment
+ currentInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Collections.singletonList("host1"), ONLINE);
+ targetInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
+ assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+ // Second assignment should remove the old instance from current assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host2", "host3")));
+ // Third assignment should make the assignment the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host2", "host3")));
}
@Test
public void testTwoMinAvailableReplicas() {
- // With 3 common instances, next assignment should be the same as target assignment
+ // With 3 common instances, first assignment should be the same as target assignment
Map<String, String> currentInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3", "host4"), ONLINE);
Map<String, String> targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3", "host5"), ONLINE);
TableRebalancer.SingleSegmentAssignment assignment =
- getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+ getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, false);
assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
- // With 2 common instances, next assignment should be the same as target assignment
+ // With 2 common instances, first assignment should be the same as target assignment
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6"), ONLINE);
- assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, false);
assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
- // With 1 common instance, next assignment should have 2 common instances with current assignment
+ // With 1 common instance, first assignment should have 2 common instances with current assignment
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5", "host6", "host7"), ONLINE);
- // [host1, host2, host5, host6]
- assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, false);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6"), ONLINE));
assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
- // Next round should make the assignment the same as target assignment
- assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+ // Second assignment should be the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, false);
assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host5", "host6")));
- // Without common instance, next assignment should have 2 common instances with current assignment
+ // Without common instance, first assignment should have 2 common instances with current assignment
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7", "host8"), ONLINE);
- // [host1, host2, host5, host6]
- assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, false);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6"), ONLINE));
assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
- // Next round should make the assignment the same as target assignment
- assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+ // Second assignment should be the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, false);
assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
- // With increasing number of replicas, next assignment should have 1 common instances with current assignment
+ // With increasing number of replicas, first assignment should have 1 common instances with current assignment
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7", "host8", "host9"), ONLINE);
// [host1, host2, host5, host6, host7]
- assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, false);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6", "host7"), ONLINE));
assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
- // Next round should make the assignment the same as target assignment
- assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+ // Second assignment should be the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, false);
assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6", "host7")));
- // With decreasing number of replicas, next assignment should have 2 common instances with current assignment
+ // With decreasing number of replicas, first assignment should have 2 common instances with current assignment
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7"), ONLINE);
- // [host1, host2, host5]
- assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, false);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5"), ONLINE));
assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
- // Next round should have 2 common instances with first round assignment
- // [host1, host5, host6]
- assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+ // Second assignment should have 2 common instances with first assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, false);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5", "host6"), ONLINE));
assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host5")));
- // Next round should make the assignment the same as target assignment
- assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+ // Third assignment should be the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, false);
assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
- // With increasing from 1 replica, next assignment should have 1 common instances with current assignment
+ // With increasing from 1 replica, first assignment should have 1 common instances with current assignment
// NOTE: This is the best we can do because we don't have 2 replicas available
currentInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Collections.singletonList("host1"), ONLINE);
targetInstanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE);
- // [host1, host2, host3]
- assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, false);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
assertEquals(assignment._availableInstances, Collections.singleton("host1"));
- // Next round should make the assignment the same as target assignment
- assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+ // Second assignment should make the assignment the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, false);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host2", "host3")));
+ }
+
+ @Test
+ public void testTwoMinAvailableReplicasWithLowDiskMode() {
+ // With 3 common instances, first assignment should keep the common instances and remove the not common instance
+ Map<String, String> currentInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3", "host4"), ONLINE);
+ Map<String, String> targetInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3", "host5"), ONLINE);
+ TableRebalancer.SingleSegmentAssignment assignment =
+ getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
+ // Second assignment should be the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
+
+ // With 2 common instances, first assignment should keep the common instances and remove the not common instances
+ targetInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6"), ONLINE);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+ // Second assignment should be the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+
+ // With 1 common instance, fist assignment should keep the common instance, and 1 more instance from current
+ // assignment
+ targetInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5", "host6", "host7"), ONLINE);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+ // Second assignment should add 2 instances from target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+ // Third assignment should remove the old instance from current assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5", "host6"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host5", "host6")));
+ // Fourth assignment should make the assignment the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host5", "host6")));
+
+ // Without common instance, fist assignment should keep 2 instances from current assignment
+ targetInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7", "host8"), ONLINE);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+ // Second assignment should add 2 instances from target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+ // Third assignment should remove the old instances from current assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
+ // Fourth assignment should make the assignment the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
+
+ // With increasing number of replicas, fist assignment should keep 2 instances from current assignment
+ targetInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7", "host8", "host9"), ONLINE);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+ // Second assignment should add 3 instances from target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6", "host7"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+ // Third assignment should remove the old instances from current assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6", "host7")));
+ // Fourth assignment should make the assignment the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6", "host7")));
+
+ // With decreasing number of replicas, fist assignment should keep 2 instances from current assignment
+ targetInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7"), ONLINE);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+ // Second assignment should add 1 instance from target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+ // Third assignment should remove 1 old instance from current assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host5")));
+ // Forth assignment should add 1 more instance from target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5", "host6"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host5")));
+ // Fifth assignment should remove the other old instance from current assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
+ // Sixth assignment should make the assignment the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
+
+ // With increasing from 1 replica, fist assignment should keep the instance from current assignment, and add 2
+ // instances from target assignment
+ // NOTE: This is the best we can do because we don't have 2 replicas available
+ currentInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Collections.singletonList("host1"), ONLINE);
+ targetInstanceStateMap =
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE);
+ assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
+ assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+ // Second assignment should remove the old instance from current assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+ assertEquals(assignment._instanceStateMap,
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3"), ONLINE));
+ assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host2", "host3")));
+ // Third assignment should make the assignment the same as target assignment
+ assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host2", "host3")));
}
private TableRebalancer.SingleSegmentAssignment getNextSingleSegmentAssignment(
- Map<String, String> currentInstanceStateMap, Map<String, String> targetInstanceStateMap,
- int minAvailableReplicas) {
+ Map<String, String> currentInstanceStateMap, Map<String, String> targetInstanceStateMap, int minAvailableReplicas,
+ boolean lowDiskMode) {
Map<String, Integer> numSegmentsToOffloadMap = new HashMap<>();
for (String currentInstance : currentInstanceStateMap.keySet()) {
numSegmentsToOffloadMap.put(currentInstance, 1);
@@ -235,7 +558,7 @@ public class TableRebalancerTest {
}
Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new HashMap<>();
return TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap,
- minAvailableReplicas, numSegmentsToOffloadMap, assignmentMap);
+ minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
}
@Test
@@ -329,7 +652,7 @@ public class TableRebalancerTest {
// assignment
for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
Map<String, Map<String, String>> nextAssignment =
- TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, enableStrictReplicaGroup);
+ TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, enableStrictReplicaGroup, false);
assertEquals(nextAssignment, targetAssignment);
}
@@ -415,13 +738,14 @@ public class TableRebalancerTest {
// The second assignment should reach the target assignment
for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
Map<String, Map<String, String>> nextAssignment =
- TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, enableStrictReplicaGroup);
+ TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, enableStrictReplicaGroup, false);
assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host2", "host4", "host5")));
assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host2", "host4", "host5")));
- nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup);
+ nextAssignment =
+ TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup, false);
assertEquals(nextAssignment, targetAssignment);
}
@@ -474,7 +798,7 @@ public class TableRebalancerTest {
// Next assignment with 2 minimum available replicas without strict replica-group should reach the target assignment
Map<String, Map<String, String>> nextAssignment =
- TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, false);
+ TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, false, false);
assertEquals(nextAssignment, targetAssignment);
// Next assignment with 2 minimum available replicas with strict replica-group should finish in 2 steps:
@@ -506,12 +830,429 @@ public class TableRebalancerTest {
// }
//
// The second assignment should reach the target assignment
- nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, true);
+ nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, true, false);
assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host3", "host4")));
assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host2", "host3", "host4")));
assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host3", "host4")));
assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host2", "host3", "host4")));
- nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, true);
+ nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, true, false);
+ assertEquals(nextAssignment, targetAssignment);
+ }
+
+ @Test
+ public void testAssignmentWithLowDiskMode() {
+ // Current assignment:
+ // {
+ // "segment1": {
+ // "host1": "ONLINE",
+ // "host2": "ONLINE",
+ // "host3": "ONLINE"
+ // },
+ // "segment2": {
+ // "host2": "ONLINE",
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment3": {
+ // "host1": "ONLINE",
+ // "host2": "ONLINE",
+ // "host3": "ONLINE"
+ // },
+ // "segment4": {
+ // "host2": "ONLINE",
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // }
+ // }
+ Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
+ currentAssignment.put("segment1",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
+ currentAssignment.put("segment2",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE));
+ currentAssignment.put("segment3",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
+ currentAssignment.put("segment4",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE));
+
+ // Target assignment 1:
+ // {
+ // "segment1": {
+ // "host1": "ONLINE",
+ // "host3": "ONLINE",
+ // "host5": "ONLINE"
+ // },
+ // "segment2": {
+ // "host2": "ONLINE",
+ // "host4": "ONLINE",
+ // "host6": "ONLINE"
+ // },
+ // "segment3": {
+ // "host1": "ONLINE",
+ // "host3": "ONLINE",
+ // "host5": "ONLINE"
+ // },
+ // "segment4": {
+ // "host2": "ONLINE",
+ // "host4": "ONLINE",
+ // "host6": "ONLINE"
+ // }
+ // }
+ Map<String, Map<String, String>> targetAssignment = new TreeMap<>();
+ targetAssignment.put("segment1",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host5"), ONLINE));
+ targetAssignment.put("segment2",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE));
+ targetAssignment.put("segment3",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host5"), ONLINE));
+ targetAssignment.put("segment4",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE));
+
+ // Number of segments to offload:
+ // {
+ // "host1": 0,
+ // "host2": 2,
+ // "host3": 2,
+ // "host4": 0,
+ // "host5": -2,
+ // "host6": -2
+ // }
+ Map<String, Integer> numSegmentsToOffloadMap =
+ TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+ assertEquals(numSegmentsToOffloadMap.size(), 6);
+ assertEquals((int) numSegmentsToOffloadMap.get("host1"), 0);
+ assertEquals((int) numSegmentsToOffloadMap.get("host2"), 2);
+ assertEquals((int) numSegmentsToOffloadMap.get("host3"), 2);
+ assertEquals((int) numSegmentsToOffloadMap.get("host4"), 0);
+ assertEquals((int) numSegmentsToOffloadMap.get("host5"), -2);
+ assertEquals((int) numSegmentsToOffloadMap.get("host6"), -2);
+
+ // Next assignment with 2 minimum available replicas with or without strict replica-group should finish in 2 steps:
+ //
+ // The first assignment will remove "segment1" and "segment3" from "host2", and remove "segment2" and "segment4"
+ // from "host3":
+ // {
+ // "segment1": {
+ // "host1": "ONLINE",
+ // "host3": "ONLINE"
+ // },
+ // "segment2": {
+ // "host2": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment3": {
+ // "host1": "ONLINE",
+ // "host3": "ONLINE"
+ // },
+ // "segment4": {
+ // "host2": "ONLINE",
+ // "host4": "ONLINE"
+ // }
+ // }
+ //
+ // The second assignment should reach the target assignment
+ for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
+ Map<String, Map<String, String>> nextAssignment =
+ TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, enableStrictReplicaGroup, true);
+ assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host3")));
+ assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host2", "host4")));
+ assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host3")));
+ assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host2", "host4")));
+
+ nextAssignment =
+ TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup, true);
+ assertEquals(nextAssignment, targetAssignment);
+ }
+
+ // Target assignment 2:
+ // {
+ // "segment1": {
+ // "host2": "ONLINE",
+ // "host4": "ONLINE",
+ // "host6": "ONLINE"
+ // },
+ // "segment2": {
+ // "host1": "ONLINE",
+ // "host4": "ONLINE",
+ // "host5": "ONLINE"
+ // },
+ // "segment3": {
+ // "host2": "ONLINE",
+ // "host4": "ONLINE",
+ // "host6": "ONLINE"
+ // },
+ // "segment4": {
+ // "host1": "ONLINE",
+ // "host4": "ONLINE",
+ // "host5": "ONLINE"
+ // }
+ // }
+ targetAssignment = new TreeMap<>();
+ targetAssignment.put("segment1",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE));
+ targetAssignment.put("segment2",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE));
+ targetAssignment.put("segment3",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE));
+ targetAssignment.put("segment4",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE));
+
+ // Number of segments to offload:
+ // {
+ // "host1": 0,
+ // "host2": 2,
+ // "host3": 4,
+ // "host4": -2,
+ // "host5": -2,
+ // "host6": -2
+ // }
+ numSegmentsToOffloadMap = TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+ assertEquals(numSegmentsToOffloadMap.size(), 6);
+ assertEquals((int) numSegmentsToOffloadMap.get("host1"), 0);
+ assertEquals((int) numSegmentsToOffloadMap.get("host2"), 2);
+ assertEquals((int) numSegmentsToOffloadMap.get("host3"), 4);
+ assertEquals((int) numSegmentsToOffloadMap.get("host4"), -2);
+ assertEquals((int) numSegmentsToOffloadMap.get("host5"), -2);
+ assertEquals((int) numSegmentsToOffloadMap.get("host6"), -2);
+
+ // Next assignment with 2 minimum available replicas with or without strict replica-group should finish in 4 steps:
+ //
+ // The first assignment will remove "segment1" and "segment3" from "host3" (with the most segments to offload), and
+ // remove "segment2" and "segment4" from "host3 (with the most segments to offload)":
+ // {
+ // "segment1": {
+ // "host1": "ONLINE",
+ // "host2": "ONLINE"
+ // },
+ // "segment2": {
+ // "host2": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment3": {
+ // "host1": "ONLINE",
+ // "host2": "ONLINE"
+ // },
+ // "segment4": {
+ // "host2": "ONLINE",
+ // "host4": "ONLINE"
+ // }
+ // }
+ //
+ // The second assignment will add "segment1" and "segment3" to "host4" (with the least segments to offload), and add
+ // "segment2" and "segment4" to "host5" (with the least segments to offload):
+ // {
+ // "segment1": {
+ // "host1": "ONLINE",
+ // "host2": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment2": {
+ // "host2": "ONLINE",
+ // "host4": "ONLINE",
+ // "host5": "ONLINE"
+ // },
+ // "segment3": {
+ // "host1": "ONLINE",
+ // "host2": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment4": {
+ // "host2": "ONLINE",
+ // "host4": "ONLINE",
+ // "host5": "ONLINE"
+ // }
+ // }
+ //
+ // The third assignment will remove "segment1" and "segment3" from "host1", and remove "segment2" and "segment4"
+ // from "host2":
+ // {
+ // "segment1": {
+ // "host2": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment2": {
+ // "host4": "ONLINE",
+ // "host5": "ONLINE"
+ // },
+ // "segment3": {
+ // "host2": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment4": {
+ // "host4": "ONLINE",
+ // "host5": "ONLINE"
+ // }
+ // }
+ //
+ // The fourth assignment should reach the target assignment
+ for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
+ Map<String, Map<String, String>> nextAssignment =
+ TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, enableStrictReplicaGroup, true);
+ assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host2")));
+ assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host2", "host4")));
+ assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host2")));
+ assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host2", "host4")));
+
+ nextAssignment =
+ TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup, true);
+ assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+ assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host2", "host4", "host5")));
+ assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+ assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host2", "host4", "host5")));
+
+ nextAssignment =
+ TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup, true);
+ assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host2", "host4")));
+ assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host4", "host5")));
+ assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host2", "host4")));
+ assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host4", "host5")));
+
+ nextAssignment =
+ TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup, true);
+ assertEquals(nextAssignment, targetAssignment);
+ }
+
+ // Target assignment 3:
+ // {
+ // "segment1": {
+ // "host1": "ONLINE",
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment2": {
+ // "host1": "ONLINE",
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment3": {
+ // "host1": "ONLINE",
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment4": {
+ // "host1": "ONLINE",
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // }
+ // }
+ targetAssignment = new TreeMap<>();
+ targetAssignment.put("segment1",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host4"), ONLINE));
+ targetAssignment.put("segment2",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host4"), ONLINE));
+ targetAssignment.put("segment3",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host4"), ONLINE));
+ targetAssignment.put("segment4",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host4"), ONLINE));
+
+ // Number of segments to offload:
+ // {
+ // "host1": -2,
+ // "host2": 4,
+ // "host3": 0,
+ // "host4": -2
+ // }
+ numSegmentsToOffloadMap = TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+ assertEquals(numSegmentsToOffloadMap.size(), 4);
+ assertEquals((int) numSegmentsToOffloadMap.get("host1"), -2);
+ assertEquals((int) numSegmentsToOffloadMap.get("host2"), 4);
+ assertEquals((int) numSegmentsToOffloadMap.get("host3"), 0);
+ assertEquals((int) numSegmentsToOffloadMap.get("host4"), -2);
+
+ // Next assignment with 2 minimum available replicas without strict replica-group should finish in 2 steps:
+ //
+ // The first assignment will remove "segment1" and "segment3" from "host2", and remove "segment2" and "segment4"
+ // from "host2":
+ // {
+ // "segment1": {
+ // "host1": "ONLINE",
+ // "host3": "ONLINE"
+ // },
+ // "segment2": {
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment3": {
+ // "host1": "ONLINE",
+ // "host3": "ONLINE"
+ // },
+ // "segment4": {
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // }
+ // }
+ //
+ // The second assignment should reach the target assignment
+ Map<String, Map<String, String>> nextAssignment =
+ TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, false, true);
+ assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host3")));
+ assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host3", "host4")));
+ assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host3")));
+ assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host3", "host4")));
+
+ nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, false, true);
+ assertEquals(nextAssignment, targetAssignment);
+
+ // Next assignment with 2 minimum available replicas with strict replica-group should finish in 3 steps:
+ //
+ // The first assignment will remove "segment1" and "segment3" from "host2", and remove "segment2" and "segment4"
+ // from "host2":
+ // {
+ // "segment1": {
+ // "host1": "ONLINE",
+ // "host3": "ONLINE"
+ // },
+ // "segment2": {
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment3": {
+ // "host1": "ONLINE",
+ // "host3": "ONLINE"
+ // },
+ // "segment4": {
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // }
+ // }
+ //
+ // The second assignment will bring "segment1" and "segment3" to the target state. It cannot bring "segment2" and
+ // "segment4" to the target state because "host1" and "host4" might be unavailable for strict replica-group routing,
+ // which breaks the minimum available replicas requirement:
+ // {
+ // "segment1": {
+ // "host1": "ONLINE",
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment2": {
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment3": {
+ // "host1": "ONLINE",
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // },
+ // "segment4": {
+ // "host3": "ONLINE",
+ // "host4": "ONLINE"
+ // }
+ // }
+ //
+ // The third assignment should reach the target assignment
+ nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, true, true);
+ assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host3")));
+ assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host3", "host4")));
+ assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host3")));
+ assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host3", "host4")));
+
+ nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, true, true);
+ assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host3", "host4")));
+ assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host3", "host4")));
+ assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host3", "host4")));
+ assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host3", "host4")));
+
+ nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, true, true);
assertEquals(nextAssignment, targetAssignment);
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
index 712a9fbc77..1ac12c7f21 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
@@ -34,7 +34,8 @@ public class PinotTableRebalancer extends PinotZKChanger {
public PinotTableRebalancer(String zkAddress, String clusterName, boolean dryRun, boolean reassignInstances,
boolean includeConsuming, boolean bootstrap, boolean downtime, int minReplicasToKeepUpForNoDowntime,
- boolean bestEffort, long externalViewCheckIntervalInMs, long externalViewStabilizationTimeoutInMs) {
+ boolean lowDiskMode, boolean bestEffort, long externalViewCheckIntervalInMs,
+ long externalViewStabilizationTimeoutInMs) {
super(zkAddress, clusterName);
_rebalanceConfig.setDryRun(dryRun);
_rebalanceConfig.setReassignInstances(reassignInstances);
@@ -42,6 +43,7 @@ public class PinotTableRebalancer extends PinotZKChanger {
_rebalanceConfig.setBootstrap(bootstrap);
_rebalanceConfig.setDowntime(downtime);
_rebalanceConfig.setMinAvailableReplicas(minReplicasToKeepUpForNoDowntime);
+ _rebalanceConfig.setLowDiskMode(lowDiskMode);
_rebalanceConfig.setBestEfforts(bestEffort);
_rebalanceConfig.setExternalViewCheckIntervalInMs(externalViewCheckIntervalInMs);
_rebalanceConfig.setExternalViewStabilizationTimeoutInMs(externalViewStabilizationTimeoutInMs);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
index 5b4c3260d4..97f59bc08b 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
@@ -73,6 +73,14 @@ public class RebalanceTableCommand extends AbstractBaseAdminCommand implements C
+ "number of replicas allowed to be unavailable if value is negative (1 by default)")
private int _minAvailableReplicas = 1;
+ @CommandLine.Option(names = {"-lowDiskMode"}, description =
+ "For no-downtime rebalance, whether to enable low disk mode during rebalance. When enabled, "
+ + "segments will first be offloaded from servers, then added to servers after offload is done while "
+ + "maintaining the min available replicas. It may increase the total time of the rebalance, but can be "
+ + "useful when servers are low on disk space, and we want to scale up the cluster and rebalance the table "
+ + "to more servers (false by default)")
+ private boolean _lowDiskMode = false;
+
@CommandLine.Option(names = {"-bestEfforts"},
description = "Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime contract"
+ " cannot be achieved, false by default)")
@@ -104,7 +112,7 @@ public class RebalanceTableCommand extends AbstractBaseAdminCommand implements C
throws Exception {
PinotTableRebalancer tableRebalancer =
new PinotTableRebalancer(_zkAddress, _clusterName, _dryRun, _reassignInstances, _includeConsuming, _bootstrap,
- _downtime, _minAvailableReplicas, _bestEfforts, _externalViewCheckIntervalInMs,
+ _downtime, _minAvailableReplicas, _lowDiskMode, _bestEfforts, _externalViewCheckIntervalInMs,
_externalViewStabilizationTimeoutInMs);
RebalanceResult rebalanceResult = tableRebalancer.rebalance(_tableNameWithType);
LOGGER
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org