You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/30 21:22:15 UTC

(pinot) branch master updated: Introduce low disk mode to table rebalance (#12072)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cc7e7a8b86 Introduce low disk mode to table rebalance (#12072)
cc7e7a8b86 is described below

commit cc7e7a8b869c3d04c15ce8c5b14c0c18b0d83fe1
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Nov 30 13:22:08 2023 -0800

    Introduce low disk mode to table rebalance (#12072)
---
 .../api/resources/PinotTableRestletResource.java   |  34 +-
 .../helix/core/rebalance/RebalanceConfig.java      |  16 +
 .../helix/core/rebalance/TableRebalancer.java      |  58 +-
 .../helix/core/rebalance/TableRebalancerTest.java  | 885 +++++++++++++++++++--
 .../apache/pinot/tools/PinotTableRebalancer.java   |   4 +-
 .../tools/admin/command/RebalanceTableCommand.java |  10 +-
 6 files changed, 898 insertions(+), 109 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 55ffb01046..5d35e1f0c3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -602,6 +602,7 @@ public class PinotTableRestletResource {
   @ApiOperation(value = "Rebalances a table (reassign instances and segments for a table)",
       notes = "Rebalances a table (reassign instances and segments for a table)")
   public RebalanceResult rebalance(
+      //@formatter:off
       @ApiParam(value = "Name of the table to rebalance", required = true) @PathParam("tableName") String tableName,
       @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
       @ApiParam(value = "Whether to rebalance table in dry-run mode") @DefaultValue("false") @QueryParam("dryRun")
@@ -609,19 +610,24 @@ public class PinotTableRestletResource {
       @ApiParam(value = "Whether to reassign instances before reassigning segments") @DefaultValue("false")
       @QueryParam("reassignInstances") boolean reassignInstances,
       @ApiParam(value = "Whether to reassign CONSUMING segments for real-time table") @DefaultValue("false")
-      @QueryParam("includeConsuming") boolean includeConsuming, @ApiParam(
-      value = "Whether to rebalance table in bootstrap mode (regardless of minimum segment movement, reassign all "
-          + "segments in a round-robin fashion as if adding new segments to an empty table)") @DefaultValue("false")
-  @QueryParam("bootstrap") boolean bootstrap,
+      @QueryParam("includeConsuming") boolean includeConsuming,
+      @ApiParam(value = "Whether to rebalance table in bootstrap mode (regardless of minimum segment movement, "
+          + "reassign all segments in a round-robin fashion as if adding new segments to an empty table)")
+      @DefaultValue("false") @QueryParam("bootstrap") boolean bootstrap,
       @ApiParam(value = "Whether to allow downtime for the rebalance") @DefaultValue("false") @QueryParam("downtime")
       boolean downtime,
       @ApiParam(value = "For no-downtime rebalance, minimum number of replicas to keep alive during rebalance, or "
           + "maximum number of replicas allowed to be unavailable if value is negative") @DefaultValue("1")
-  @QueryParam("minAvailableReplicas") int minAvailableReplicas, @ApiParam(
-      value = "Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime contract cannot "
-          + "be achieved)") @DefaultValue("false") @QueryParam("bestEfforts") boolean bestEfforts, @ApiParam(
-      value = "How often to check if external view converges with ideal states") @DefaultValue("1000")
-  @QueryParam("externalViewCheckIntervalInMs") long externalViewCheckIntervalInMs,
+      @QueryParam("minAvailableReplicas") int minAvailableReplicas,
+      @ApiParam(value = "For no-downtime rebalance, whether to enable low disk mode during rebalance. When enabled, "
+          + "segments will first be offloaded from servers, then added to servers after offload is done while "
+          + "maintaining the min available replicas. It may increase the total time of the rebalance, but can be "
+          + "useful when servers are low on disk space, and we want to scale up the cluster and rebalance the table to "
+          + "more servers.") @DefaultValue("false") @QueryParam("lowDiskMode") boolean lowDiskMode,
+      @ApiParam(value = "Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime "
+          + "contract cannot be achieved)") @DefaultValue("false") @QueryParam("bestEfforts") boolean bestEfforts,
+      @ApiParam(value = "How often to check if external view converges with ideal states") @DefaultValue("1000")
+      @QueryParam("externalViewCheckIntervalInMs") long externalViewCheckIntervalInMs,
       @ApiParam(value = "How long to wait till external view converges with ideal states") @DefaultValue("3600000")
       @QueryParam("externalViewStabilizationTimeoutInMs") long externalViewStabilizationTimeoutInMs,
       @ApiParam(value = "How often to make a status update (i.e. heartbeat)") @DefaultValue("300000")
@@ -629,10 +635,13 @@ public class PinotTableRestletResource {
       @ApiParam(value = "How long to wait for next status update (i.e. heartbeat) before the job is considered failed")
       @DefaultValue("3600000") @QueryParam("heartbeatTimeoutInMs") long heartbeatTimeoutInMs,
       @ApiParam(value = "Max number of attempts to rebalance") @DefaultValue("3") @QueryParam("maxAttempts")
-      int maxAttempts, @ApiParam(value = "Initial delay to exponentially backoff retry") @DefaultValue("300000")
-  @QueryParam("retryInitialDelayInMs") long retryInitialDelayInMs,
+      int maxAttempts,
+      @ApiParam(value = "Initial delay to exponentially backoff retry") @DefaultValue("300000")
+      @QueryParam("retryInitialDelayInMs") long retryInitialDelayInMs,
       @ApiParam(value = "Whether to update segment target tier as part of the rebalance") @DefaultValue("false")
-      @QueryParam("updateTargetTier") boolean updateTargetTier) {
+      @QueryParam("updateTargetTier") boolean updateTargetTier
+      //@formatter:on
+  ) {
     String tableNameWithType = constructTableNameWithType(tableName, tableTypeStr);
     RebalanceConfig rebalanceConfig = new RebalanceConfig();
     rebalanceConfig.setDryRun(dryRun);
@@ -641,6 +650,7 @@ public class PinotTableRestletResource {
     rebalanceConfig.setBootstrap(bootstrap);
     rebalanceConfig.setDowntime(downtime);
     rebalanceConfig.setMinAvailableReplicas(minAvailableReplicas);
+    rebalanceConfig.setLowDiskMode(lowDiskMode);
     rebalanceConfig.setBestEfforts(bestEfforts);
     rebalanceConfig.setExternalViewCheckIntervalInMs(externalViewCheckIntervalInMs);
     rebalanceConfig.setExternalViewStabilizationTimeoutInMs(externalViewStabilizationTimeoutInMs);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
index 94709f065c..b6b471cfbc 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
@@ -61,6 +61,14 @@ public class RebalanceConfig {
   @ApiModelProperty(example = "1")
   private int _minAvailableReplicas = DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME;
 
+  // For no-downtime rebalance, whether to enable low disk mode during rebalance. When enabled, segments will first be
+  // offloaded from servers, then added to servers after offload is done while maintaining the min available replicas.
+  // It may increase the total time of the rebalance, but can be useful when servers are low on disk space, and we want
+  // to scale up the cluster and rebalance the table to more servers.
+  @JsonProperty("lowDiskMode")
+  @ApiModelProperty(example = "false")
+  private boolean _lowDiskMode = false;
+
   // Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime contract cannot be achieved)
   // When using best-efforts to rebalance, the following scenarios won't fail the rebalance (will log warnings instead):
   // - Segment falls into ERROR state in ExternalView -> count ERROR state as good state
@@ -150,6 +158,14 @@ public class RebalanceConfig {
     _minAvailableReplicas = minAvailableReplicas;
   }
 
+  public boolean isLowDiskMode() {
+    return _lowDiskMode;
+  }
+
+  public void setLowDiskMode(boolean lowDiskMode) {
+    _lowDiskMode = lowDiskMode;
+  }
+
   public boolean isBestEfforts() {
     return _bestEfforts;
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index cd4d70ee3c..318d251bfc 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -172,6 +172,7 @@ public class TableRebalancer {
     boolean bootstrap = rebalanceConfig.isBootstrap();
     boolean downtime = rebalanceConfig.isDowntime();
     int minReplicasToKeepUpForNoDowntime = rebalanceConfig.getMinAvailableReplicas();
+    boolean lowDiskMode = rebalanceConfig.isLowDiskMode();
     boolean bestEfforts = rebalanceConfig.isBestEfforts();
     long externalViewCheckIntervalInMs = rebalanceConfig.getExternalViewCheckIntervalInMs();
     long externalViewStabilizationTimeoutInMs = rebalanceConfig.getExternalViewStabilizationTimeoutInMs();
@@ -180,10 +181,11 @@ public class TableRebalancer {
         tableConfig.getRoutingConfig().getInstanceSelectorType());
     LOGGER.info(
         "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, includeConsuming: {}, bootstrap: {}, "
-            + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, bestEfforts: {}, "
-            + "externalViewCheckIntervalInMs: {}, externalViewStabilizationTimeoutInMs: {}", tableNameWithType, dryRun,
-        reassignInstances, includeConsuming, bootstrap, downtime, minReplicasToKeepUpForNoDowntime,
-        enableStrictReplicaGroup, bestEfforts, externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs);
+            + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, lowDiskMode: {}, "
+            + "bestEfforts: {}, externalViewCheckIntervalInMs: {}, externalViewStabilizationTimeoutInMs: {}",
+        tableNameWithType, dryRun, reassignInstances, includeConsuming, bootstrap, downtime,
+        minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, lowDiskMode, bestEfforts,
+        externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs);
 
     // Fetch ideal state
     PropertyKey idealStatePropertyKey = _helixDataAccessor.keyBuilder().idealStates(tableNameWithType);
@@ -483,7 +485,8 @@ public class TableRebalancer {
             tierToInstancePartitionsMap, targetAssignment);
       }
       Map<String, Map<String, String>> nextAssignment =
-          getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup);
+          getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup,
+              lowDiskMode);
       LOGGER.info("For rebalanceId: {}, got the next assignment for table: {} with number of segments to be moved to "
               + "each instance: {}", rebalanceJobId, tableNameWithType,
           SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, nextAssignment));
@@ -847,15 +850,17 @@ public class TableRebalancer {
    */
   @VisibleForTesting
   static Map<String, Map<String, String>> getNextAssignment(Map<String, Map<String, String>> currentAssignment,
-      Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup) {
+      Map<String, Map<String, String>> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup,
+      boolean lowDiskMode) {
     return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-        minAvailableReplicas)
-        : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas);
+        minAvailableReplicas, lowDiskMode)
+        : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas,
+            lowDiskMode);
   }
 
   private static Map<String, Map<String, String>> getNextStrictReplicaGroupAssignment(
       Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment,
-      int minAvailableReplicas) {
+      int minAvailableReplicas, boolean lowDiskMode) {
     Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
     Map<String, Integer> numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
     Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new HashMap<>();
@@ -866,7 +871,7 @@ public class TableRebalancer {
       Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName);
       SingleSegmentAssignment assignment =
           getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas,
-              numSegmentsToOffloadMap, assignmentMap);
+              lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
       Set<String> assignedInstances = assignment._instanceStateMap.keySet();
       Set<String> availableInstances = assignment._availableInstances;
       availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> {
@@ -897,7 +902,7 @@ public class TableRebalancer {
 
   private static Map<String, Map<String, String>> getNextNonStrictReplicaGroupAssignment(
       Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment,
-      int minAvailableReplicas) {
+      int minAvailableReplicas, boolean lowDiskMode) {
     Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
     Map<String, Integer> numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
     Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new HashMap<>();
@@ -907,7 +912,7 @@ public class TableRebalancer {
       Map<String, String> targetInstanceStateMap = targetAssignment.get(segmentName);
       Map<String, String> nextInstanceStateMap =
           getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas,
-              numSegmentsToOffloadMap, assignmentMap)._instanceStateMap;
+              lowDiskMode, numSegmentsToOffloadMap, assignmentMap)._instanceStateMap;
       nextAssignment.put(segmentName, nextInstanceStateMap);
       updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(),
           nextInstanceStateMap.keySet());
@@ -954,7 +959,7 @@ public class TableRebalancer {
    */
   @VisibleForTesting
   static SingleSegmentAssignment getNextSingleSegmentAssignment(Map<String, String> currentInstanceStateMap,
-      Map<String, String> targetInstanceStateMap, int minAvailableReplicas,
+      Map<String, String> targetInstanceStateMap, int minAvailableReplicas, boolean lowDiskMode,
       Map<String, Integer> numSegmentsToOffloadMap, Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap) {
     Map<String, String> nextInstanceStateMap = new TreeMap<>();
 
@@ -1005,16 +1010,23 @@ public class TableRebalancer {
     }
     Set<String> availableInstances = new TreeSet<>(nextInstanceStateMap.keySet());
 
-    // Add target instances until the number of instances matched
-    int numInstancesToAdd = targetInstanceStateMap.size() - nextInstanceStateMap.size();
-    if (numInstancesToAdd > 0) {
-      // Sort instances by number of segments to offload, and add the ones with the least segments to offload
-      List<Triple<String, String, Integer>> instancesInfo =
-          getSortedInstancesOnNumSegmentsToOffload(targetInstanceStateMap, nextInstanceStateMap,
-              numSegmentsToOffloadMap);
-      for (int i = 0; i < numInstancesToAdd; i++) {
-        Triple<String, String, Integer> instanceInfo = instancesInfo.get(i);
-        nextInstanceStateMap.put(instanceInfo.getLeft(), instanceInfo.getMiddle());
+    // After achieving the min available replicas, when low disk mode is enabled, only add new instances when all
+    // current instances exist in the next assignment.
+    // We want to first drop the extra instances as one step, then add the target instances as another step to avoid the
+    // case where segments are first added to the instance before other segments are dropped from the instance, which
+    // might cause server running out of disk. Note that even if segment addition and drop happen in the same step,
+    // there is no guarantee that server process the segment drop before the segment addition.
+    if (!lowDiskMode || currentInstanceStateMap.size() == nextInstanceStateMap.size()) {
+      int numInstancesToAdd = targetInstanceStateMap.size() - nextInstanceStateMap.size();
+      if (numInstancesToAdd > 0) {
+        // Sort instances by number of segments to offload, and add the ones with the least segments to offload
+        List<Triple<String, String, Integer>> instancesInfo =
+            getSortedInstancesOnNumSegmentsToOffload(targetInstanceStateMap, nextInstanceStateMap,
+                numSegmentsToOffloadMap);
+        for (int i = 0; i < numInstancesToAdd; i++) {
+          Triple<String, String, Integer> instanceInfo = instancesInfo.get(i);
+          nextInstanceStateMap.put(instanceInfo.getLeft(), instanceInfo.getMiddle());
+        }
       }
     }
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
index 84a660cc5e..ecf1e0feda 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
@@ -43,189 +43,512 @@ public class TableRebalancerTest {
 
   @Test
   public void testDowntimeMode() {
-    // With common instance, next assignment should be the same as target assignment
+    // With common instance, first assignment should be the same as target assignment
     Map<String, String> currentInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE);
     Map<String, String> targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3"), ONLINE);
     TableRebalancer.SingleSegmentAssignment assignment =
-        getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0);
+        getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0, false);
     assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
     assertEquals(assignment._availableInstances, Collections.singleton("host1"));
 
-    // Without common instance, next assignment should be the same as target assignment
+    // Without common instance, first assignment should be the same as target assignment
     targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host3", "host4"), ONLINE);
-    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0, false);
     assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
     assertTrue(assignment._availableInstances.isEmpty());
 
-    // With increasing number of replicas, next assignment should be the same as target assignment
+    // With increasing number of replicas, first assignment should be the same as target assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host3", "host4", "host5"), ONLINE);
-    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0, false);
     assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
     assertTrue(assignment._availableInstances.isEmpty());
 
-    // With decreasing number of replicas, next assignment should be the same as target assignment
+    // With decreasing number of replicas, first assignment should be the same as target assignment
     currentInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE);
     targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5"), ONLINE);
-    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0, false);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertTrue(assignment._availableInstances.isEmpty());
+  }
+
+  @Test
+  public void testDowntimeWithLowDiskMode() {
+    // With common instance, first assignment should keep the common instance and remove the not common instance
+    Map<String, String> currentInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE);
+    Map<String, String> targetInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3"), ONLINE);
+    TableRebalancer.SingleSegmentAssignment assignment =
+        getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0, true);
+    assertEquals(assignment._instanceStateMap, Collections.singletonMap("host1", ONLINE));
+    assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+    // Second assignment should be the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 0, true);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+
+    // Without common instance, first assignment should drop all instances
+    targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host3", "host4"), ONLINE);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0, true);
+    assertTrue(assignment._instanceStateMap.isEmpty());
+    assertTrue(assignment._availableInstances.isEmpty());
+    // Second assignment should be the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 0, true);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertTrue(assignment._availableInstances.isEmpty());
+
+    // With increasing number of replicas, first assignment should drop all instances
+    targetInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host3", "host4", "host5"), ONLINE);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0, true);
+    assertTrue(assignment._instanceStateMap.isEmpty());
+    assertTrue(assignment._availableInstances.isEmpty());
+    // Second assignment should be the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 0, true);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertTrue(assignment._availableInstances.isEmpty());
+
+    // With decreasing number of replicas, first assignment should drop all instances
+    currentInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE);
+    targetInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5"), ONLINE);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 0, true);
+    assertTrue(assignment._instanceStateMap.isEmpty());
+    assertTrue(assignment._availableInstances.isEmpty());
+    // Second assignment should be the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 0, true);
     assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
     assertTrue(assignment._availableInstances.isEmpty());
   }
 
   @Test
   public void testOneMinAvailableReplicas() {
-    // With 2 common instances, next assignment should be the same as target assignment
+    // With 2 common instances, first assignment should be the same as target assignment
     Map<String, String> currentInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE);
     Map<String, String> targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host4"), ONLINE);
     TableRebalancer.SingleSegmentAssignment assignment =
-        getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+        getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, false);
     assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
     assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
 
-    // With 1 common instance, next assignment should be the same as target assignment
+    // With 1 common instance, first assignment should be the same as target assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE);
-    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, false);
     assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
     assertEquals(assignment._availableInstances, Collections.singleton("host1"));
 
-    // Without common instance, next assignment should have 1 common instances with current assignment
+    // Without common instance, first assignment should have 1 common instances with current assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5", "host6"), ONLINE);
-    // [host1, host4, host5]
-    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, false);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE));
     assertEquals(assignment._availableInstances, Collections.singleton("host1"));
-    // Next round should make the assignment the same as target assignment
-    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1);
+    // Second assignment should be the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, false);
     assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
     assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host4", "host5")));
 
-    // With increasing number of replicas, next assignment should have 1 common instances with current assignment
+    // With increasing number of replicas, first assignment should have 1 common instances with current assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5", "host6", "host7"), ONLINE);
-    // [host1, host4, host5, host6]
-    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, false);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5", "host6"), ONLINE));
     assertEquals(assignment._availableInstances, Collections.singleton("host1"));
-    // Next round should make the assignment the same as target assignment
-    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1);
+    // Second assignment should be the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, false);
     assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
     assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host4", "host5", "host6")));
 
-    // With decreasing number of replicas, next assignment should have 1 common instances with current assignment
+    // With decreasing number of replicas, first assignment should have 1 common instances with current assignment
     currentInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3", "host4"), ONLINE);
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7"), ONLINE);
-    // [host1, host5, host6]
-    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, false);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5", "host6"), ONLINE));
     assertEquals(assignment._availableInstances, Collections.singleton("host1"));
-    // Next round should make the assignment the same as target assignment
-    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1);
+    // Second assignment should be the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, false);
     assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
     assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
 
-    // With increasing from 1 replica, next assignment should have 1 common instances with current assignment
+    // With increasing from 1 replica, first assignment should have 1 common instances with current assignment
     currentInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Collections.singletonList("host1"), ONLINE);
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE);
-    // [host1, host2, host3]
-    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, false);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
     assertEquals(assignment._availableInstances, Collections.singleton("host1"));
-    // Next round should make the assignment the same as target assignment
-    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1);
+    // Second assignment should be the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, false);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host2", "host3")));
+  }
+
+  @Test
+  public void testOneMinAvailableReplicasWithLowDiskMode() {
+    // With 2 common instances, first assignment should keep the common instances and remove the not common instance
+    Map<String, String> currentInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE);
+    Map<String, String> targetInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host4"), ONLINE);
+    TableRebalancer.SingleSegmentAssignment assignment =
+        getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+    // Second assignment should be the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+
+    // With 1 common instance, first assignment should keep the common instance and remove the not common instances
+    targetInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, true);
+    assertEquals(assignment._instanceStateMap, Collections.singletonMap("host1", ONLINE));
+    assertEquals(assignment._availableInstances, Collections.singletonList("host1"));
+    // Second assignment should be the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+
+    // Without common instance, fist assignment should keep 1 instance from current assignment
+    targetInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5", "host6"), ONLINE);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, true);
+    assertEquals(assignment._instanceStateMap, Collections.singletonMap("host1", ONLINE));
+    assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+    // Second assignment should add 2 instances from target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE));
+    assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+    // Third assignment should remove the old instance from current assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host4", "host5")));
+    // Fourth assignment should make the assignment the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host4", "host5")));
+
+    // With increasing number of replicas, fist assignment should keep 1 instance from current assignment
+    targetInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5", "host6", "host7"), ONLINE);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, true);
+    assertEquals(assignment._instanceStateMap, Collections.singletonMap("host1", ONLINE));
+    assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+    // Second assignment should add 3 instances from target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5", "host6"), ONLINE));
+    assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+    // Third assignment should remove the old instance from current assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", "host5", "host6"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host4", "host5", "host6")));
+    // Fourth assignment should make the assignment the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host4", "host5", "host6")));
+
+    // With decreasing number of replicas, fist assignment should keep 1 instance from current assignment
+    currentInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3", "host4"), ONLINE);
+    targetInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7"), ONLINE);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, true);
+    assertEquals(assignment._instanceStateMap, Collections.singletonMap("host1", ONLINE));
+    assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+    // Second assignment should add 2 instances from target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5", "host6"), ONLINE));
+    assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+    // Third assignment should remove the old instance from current assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
+    // Fourth assignment should make the assignment the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
+
+    // With increasing from 1 replica, fist assignment should keep the instance from current assignment, and add 2
+    // instances from target assignment
+    currentInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Collections.singletonList("host1"), ONLINE);
+    targetInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 1, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
+    assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+    // Second assignment should remove the old instance from current assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host2", "host3")));
+    // Third assignment should make the assignment the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 1, true);
     assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
     assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host2", "host3")));
   }
 
   @Test
   public void testTwoMinAvailableReplicas() {
-    // With 3 common instances, next assignment should be the same as target assignment
+    // With 3 common instances, first assignment should be the same as target assignment
     Map<String, String> currentInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3", "host4"), ONLINE);
     Map<String, String> targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3", "host5"), ONLINE);
     TableRebalancer.SingleSegmentAssignment assignment =
-        getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+        getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, false);
     assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
     assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
 
-    // With 2 common instances, next assignment should be the same as target assignment
+    // With 2 common instances, first assignment should be the same as target assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6"), ONLINE);
-    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, false);
     assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
     assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
 
-    // With 1 common instance, next assignment should have 2 common instances with current assignment
+    // With 1 common instance, first assignment should have 2 common instances with current assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5", "host6", "host7"), ONLINE);
-    // [host1, host2, host5, host6]
-    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, false);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6"), ONLINE));
     assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
-    // Next round should make the assignment the same as target assignment
-    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+    // Second assignment should be the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, false);
     assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
     assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host5", "host6")));
 
-    // Without common instance, next assignment should have 2 common instances with current assignment
+    // Without common instance, first assignment should have 2 common instances with current assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7", "host8"), ONLINE);
-    // [host1, host2, host5, host6]
-    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, false);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6"), ONLINE));
     assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
-    // Next round should make the assignment the same as target assignment
-    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+    // Second assignment should be the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, false);
     assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
     assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
 
-    // With increasing number of replicas, next assignment should have 1 common instances with current assignment
+    // With increasing number of replicas, first assignment should have 1 common instances with current assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7", "host8", "host9"), ONLINE);
     // [host1, host2, host5, host6, host7]
-    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, false);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6", "host7"), ONLINE));
     assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
-    // Next round should make the assignment the same as target assignment
-    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+    // Second assignment should be the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, false);
     assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
     assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6", "host7")));
 
-    // With decreasing number of replicas, next assignment should have 2 common instances with current assignment
+    // With decreasing number of replicas, first assignment should have 2 common instances with current assignment
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7"), ONLINE);
-    // [host1, host2, host5]
-    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, false);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5"), ONLINE));
     assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
-    // Next round should have 2 common instances with first round assignment
-    // [host1, host5, host6]
-    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+    // Second assignment should have 2 common instances with first assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, false);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5", "host6"), ONLINE));
     assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host5")));
-    // Next round should make the assignment the same as target assignment
-    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+    // Third assignment should be the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, false);
     assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
     assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
 
-    // With increasing from 1 replica, next assignment should have 1 common instances with current assignment
+    // With increasing from 1 replica, first assignment should have 1 common instances with current assignment
     // NOTE: This is the best we can do because we don't have 2 replicas available
     currentInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Collections.singletonList("host1"), ONLINE);
     targetInstanceStateMap =
         SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE);
-    // [host1, host2, host3]
-    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, false);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
     assertEquals(assignment._availableInstances, Collections.singleton("host1"));
-    // Next round should make the assignment the same as target assignment
-    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2);
+    // Second assignment should make the assignment the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, false);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host2", "host3")));
+  }
+
+  @Test
+  public void testTwoMinAvailableReplicasWithLowDiskMode() {
+    // With 3 common instances, first assignment should keep the common instances and remove the not common instance
+    Map<String, String> currentInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3", "host4"), ONLINE);
+    Map<String, String> targetInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3", "host5"), ONLINE);
+    TableRebalancer.SingleSegmentAssignment assignment =
+        getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
+    // Second assignment should be the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2", "host3")));
+
+    // With 2 common instances, first assignment should keep the common instances and remove the not common instances
+    targetInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6"), ONLINE);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+    // Second assignment should be the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+
+    // With 1 common instance, fist assignment should keep the common instance, and 1 more instance from current
+    // assignment
+    targetInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5", "host6", "host7"), ONLINE);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+    // Second assignment should add 2 instances from target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+    // Third assignment should remove the old instance from current assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5", "host6"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host5", "host6")));
+    // Fourth assignment should make the assignment the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host5", "host6")));
+
+    // Without common instance, fist assignment should keep 2 instances from current assignment
+    targetInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7", "host8"), ONLINE);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+    // Second assignment should add 2 instances from target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+    // Third assignment should remove the old instances from current assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
+    // Fourth assignment should make the assignment the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
+
+    // With increasing number of replicas, fist assignment should keep 2 instances from current assignment
+    targetInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7", "host8", "host9"), ONLINE);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+    // Second assignment should add 3 instances from target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5", "host6", "host7"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+    // Third assignment should remove the old instances from current assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6", "host7")));
+    // Fourth assignment should make the assignment the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6", "host7")));
+
+    // With decreasing number of replicas, fist assignment should keep 2 instances from current assignment
+    targetInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6", "host7"), ONLINE);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+    // Second assignment should add 1 instance from target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host5"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host2")));
+    // Third assignment should remove 1 old instance from current assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host5")));
+    // Forth assignment should add 1 more instance from target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host5", "host6"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host1", "host5")));
+    // Fifth assignment should remove the other old instance from current assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host5", "host6"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
+    // Sixth assignment should make the assignment the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host5", "host6")));
+
+    // With increasing from 1 replica, fist assignment should keep the instance from current assignment, and add 2
+    // instances from target assignment
+    // NOTE: This is the best we can do because we don't have 2 replicas available
+    currentInstanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(Collections.singletonList("host1"), ONLINE);
+    targetInstanceStateMap =
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE);
+    assignment = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
+    assertEquals(assignment._availableInstances, Collections.singleton("host1"));
+    // Second assignment should remove the old instance from current assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
+    assertEquals(assignment._instanceStateMap,
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3"), ONLINE));
+    assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host2", "host3")));
+    // Third assignment should make the assignment the same as target assignment
+    assignment = getNextSingleSegmentAssignment(assignment._instanceStateMap, targetInstanceStateMap, 2, true);
     assertEquals(assignment._instanceStateMap, targetInstanceStateMap);
     assertEquals(assignment._availableInstances, new TreeSet<>(Arrays.asList("host2", "host3")));
   }
 
   private TableRebalancer.SingleSegmentAssignment getNextSingleSegmentAssignment(
-      Map<String, String> currentInstanceStateMap, Map<String, String> targetInstanceStateMap,
-      int minAvailableReplicas) {
+      Map<String, String> currentInstanceStateMap, Map<String, String> targetInstanceStateMap, int minAvailableReplicas,
+      boolean lowDiskMode) {
     Map<String, Integer> numSegmentsToOffloadMap = new HashMap<>();
     for (String currentInstance : currentInstanceStateMap.keySet()) {
       numSegmentsToOffloadMap.put(currentInstance, 1);
@@ -235,7 +558,7 @@ public class TableRebalancerTest {
     }
     Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new HashMap<>();
     return TableRebalancer.getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap,
-        minAvailableReplicas, numSegmentsToOffloadMap, assignmentMap);
+        minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
   }
 
   @Test
@@ -329,7 +652,7 @@ public class TableRebalancerTest {
     // assignment
     for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
       Map<String, Map<String, String>> nextAssignment =
-          TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, enableStrictReplicaGroup);
+          TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, enableStrictReplicaGroup, false);
       assertEquals(nextAssignment, targetAssignment);
     }
 
@@ -415,13 +738,14 @@ public class TableRebalancerTest {
     // The second assignment should reach the target assignment
     for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
       Map<String, Map<String, String>> nextAssignment =
-          TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, enableStrictReplicaGroup);
+          TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, enableStrictReplicaGroup, false);
       assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
       assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host2", "host4", "host5")));
       assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
       assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host2", "host4", "host5")));
 
-      nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup);
+      nextAssignment =
+          TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup, false);
       assertEquals(nextAssignment, targetAssignment);
     }
 
@@ -474,7 +798,7 @@ public class TableRebalancerTest {
 
     // Next assignment with 2 minimum available replicas without strict replica-group should reach the target assignment
     Map<String, Map<String, String>> nextAssignment =
-        TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, false);
+        TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, false, false);
     assertEquals(nextAssignment, targetAssignment);
 
     // Next assignment with 2 minimum available replicas with strict replica-group should finish in 2 steps:
@@ -506,12 +830,429 @@ public class TableRebalancerTest {
     // }
     //
     // The second assignment should reach the target assignment
-    nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, true);
+    nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, true, false);
     assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host3", "host4")));
     assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host2", "host3", "host4")));
     assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host3", "host4")));
     assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host2", "host3", "host4")));
-    nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, true);
+    nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, true, false);
+    assertEquals(nextAssignment, targetAssignment);
+  }
+
+  @Test
+  public void testAssignmentWithLowDiskMode() {
+    // Current assignment:
+    // {
+    //   "segment1": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host2": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
+    currentAssignment.put("segment1",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
+    currentAssignment.put("segment2",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE));
+    currentAssignment.put("segment3",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE));
+    currentAssignment.put("segment4",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE));
+
+    // Target assignment 1:
+    // {
+    //   "segment1": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   }
+    // }
+    Map<String, Map<String, String>> targetAssignment = new TreeMap<>();
+    targetAssignment.put("segment1",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host5"), ONLINE));
+    targetAssignment.put("segment2",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE));
+    targetAssignment.put("segment3",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host5"), ONLINE));
+    targetAssignment.put("segment4",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE));
+
+    // Number of segments to offload:
+    // {
+    //   "host1": 0,
+    //   "host2": 2,
+    //   "host3": 2,
+    //   "host4": 0,
+    //   "host5": -2,
+    //   "host6": -2
+    // }
+    Map<String, Integer> numSegmentsToOffloadMap =
+        TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+    assertEquals(numSegmentsToOffloadMap.size(), 6);
+    assertEquals((int) numSegmentsToOffloadMap.get("host1"), 0);
+    assertEquals((int) numSegmentsToOffloadMap.get("host2"), 2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host3"), 2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host4"), 0);
+    assertEquals((int) numSegmentsToOffloadMap.get("host5"), -2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host6"), -2);
+
+    // Next assignment with 2 minimum available replicas with or without strict replica-group should finish in 2 steps:
+    //
+    // The first assignment will remove "segment1" and "segment3" from "host2", and remove "segment2" and "segment4"
+    // from "host3":
+    // {
+    //   "segment1": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    //
+    // The second assignment should reach the target assignment
+    for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
+      Map<String, Map<String, String>> nextAssignment =
+          TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, enableStrictReplicaGroup, true);
+      assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host3")));
+      assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host2", "host4")));
+      assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host3")));
+      assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host2", "host4")));
+
+      nextAssignment =
+          TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup, true);
+      assertEquals(nextAssignment, targetAssignment);
+    }
+
+    // Target assignment 2:
+    // {
+    //   "segment1": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host1": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host6": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host1": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   }
+    // }
+    targetAssignment = new TreeMap<>();
+    targetAssignment.put("segment1",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE));
+    targetAssignment.put("segment2",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE));
+    targetAssignment.put("segment3",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE));
+    targetAssignment.put("segment4",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host4", "host5"), ONLINE));
+
+    // Number of segments to offload:
+    // {
+    //   "host1": 0,
+    //   "host2": 2,
+    //   "host3": 4,
+    //   "host4": -2,
+    //   "host5": -2,
+    //   "host6": -2
+    // }
+    numSegmentsToOffloadMap = TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+    assertEquals(numSegmentsToOffloadMap.size(), 6);
+    assertEquals((int) numSegmentsToOffloadMap.get("host1"), 0);
+    assertEquals((int) numSegmentsToOffloadMap.get("host2"), 2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host3"), 4);
+    assertEquals((int) numSegmentsToOffloadMap.get("host4"), -2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host5"), -2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host6"), -2);
+
+    // Next assignment with 2 minimum available replicas with or without strict replica-group should finish in 4 steps:
+    //
+    // The first assignment will remove "segment1" and "segment3" from "host3" (with the most segments to offload), and
+    // remove "segment2" and "segment4" from "host3 (with the most segments to offload)":
+    // {
+    //   "segment1": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    //
+    // The second assignment will add "segment1" and "segment3" to "host4" (with the least segments to offload), and add
+    // "segment2" and "segment4" to "host5" (with the least segments to offload):
+    // {
+    //   "segment1": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host1": "ONLINE",
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   }
+    // }
+    //
+    // The third assignment will remove "segment1" and "segment3" from "host1", and remove "segment2" and "segment4"
+    // from "host2":
+    // {
+    //   "segment1": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host2": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host4": "ONLINE",
+    //     "host5": "ONLINE"
+    //   }
+    // }
+    //
+    // The fourth assignment should reach the target assignment
+    for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
+      Map<String, Map<String, String>> nextAssignment =
+          TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, enableStrictReplicaGroup, true);
+      assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host2")));
+      assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host2", "host4")));
+      assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host2")));
+      assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host2", "host4")));
+
+      nextAssignment =
+          TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup, true);
+      assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+      assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host2", "host4", "host5")));
+      assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host2", "host4")));
+      assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host2", "host4", "host5")));
+
+      nextAssignment =
+          TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup, true);
+      assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host2", "host4")));
+      assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host4", "host5")));
+      assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host2", "host4")));
+      assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host4", "host5")));
+
+      nextAssignment =
+          TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, enableStrictReplicaGroup, true);
+      assertEquals(nextAssignment, targetAssignment);
+    }
+
+    // Target assignment 3:
+    // {
+    //   "segment1": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    targetAssignment = new TreeMap<>();
+    targetAssignment.put("segment1",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host4"), ONLINE));
+    targetAssignment.put("segment2",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host4"), ONLINE));
+    targetAssignment.put("segment3",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host4"), ONLINE));
+    targetAssignment.put("segment4",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host4"), ONLINE));
+
+    // Number of segments to offload:
+    // {
+    //   "host1": -2,
+    //   "host2": 4,
+    //   "host3": 0,
+    //   "host4": -2
+    // }
+    numSegmentsToOffloadMap = TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+    assertEquals(numSegmentsToOffloadMap.size(), 4);
+    assertEquals((int) numSegmentsToOffloadMap.get("host1"), -2);
+    assertEquals((int) numSegmentsToOffloadMap.get("host2"), 4);
+    assertEquals((int) numSegmentsToOffloadMap.get("host3"), 0);
+    assertEquals((int) numSegmentsToOffloadMap.get("host4"), -2);
+
+    // Next assignment with 2 minimum available replicas without strict replica-group should finish in 2 steps:
+    //
+    // The first assignment will remove "segment1" and "segment3" from "host2", and remove "segment2" and "segment4"
+    // from "host2":
+    // {
+    //   "segment1": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    //
+    // The second assignment should reach the target assignment
+    Map<String, Map<String, String>> nextAssignment =
+        TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, false, true);
+    assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host3")));
+    assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host3", "host4")));
+    assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host3")));
+    assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host3", "host4")));
+
+    nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, false, true);
+    assertEquals(nextAssignment, targetAssignment);
+
+    // Next assignment with 2 minimum available replicas with strict replica-group should finish in 3 steps:
+    //
+    // The first assignment will remove "segment1" and "segment3" from "host2", and remove "segment2" and "segment4"
+    // from "host2":
+    // {
+    //   "segment1": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    //
+    // The second assignment will bring "segment1" and "segment3" to the target state. It cannot bring "segment2" and
+    // "segment4" to the target state because "host1" and "host4" might be unavailable for strict replica-group routing,
+    // which breaks the minimum available replicas requirement:
+    // {
+    //   "segment1": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment2": {
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment3": {
+    //     "host1": "ONLINE",
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   },
+    //   "segment4": {
+    //     "host3": "ONLINE",
+    //     "host4": "ONLINE"
+    //   }
+    // }
+    //
+    // The third assignment should reach the target assignment
+    nextAssignment = TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, true, true);
+    assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host3")));
+    assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host3", "host4")));
+    assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host3")));
+    assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host3", "host4")));
+
+    nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, true, true);
+    assertEquals(nextAssignment.get("segment1").keySet(), new TreeSet<>(Arrays.asList("host1", "host3", "host4")));
+    assertEquals(nextAssignment.get("segment2").keySet(), new TreeSet<>(Arrays.asList("host3", "host4")));
+    assertEquals(nextAssignment.get("segment3").keySet(), new TreeSet<>(Arrays.asList("host1", "host3", "host4")));
+    assertEquals(nextAssignment.get("segment4").keySet(), new TreeSet<>(Arrays.asList("host3", "host4")));
+
+    nextAssignment = TableRebalancer.getNextAssignment(nextAssignment, targetAssignment, 2, true, true);
     assertEquals(nextAssignment, targetAssignment);
   }
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
index 712a9fbc77..1ac12c7f21 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
@@ -34,7 +34,8 @@ public class PinotTableRebalancer extends PinotZKChanger {
 
   public PinotTableRebalancer(String zkAddress, String clusterName, boolean dryRun, boolean reassignInstances,
       boolean includeConsuming, boolean bootstrap, boolean downtime, int minReplicasToKeepUpForNoDowntime,
-      boolean bestEffort, long externalViewCheckIntervalInMs, long externalViewStabilizationTimeoutInMs) {
+      boolean lowDiskMode, boolean bestEffort, long externalViewCheckIntervalInMs,
+      long externalViewStabilizationTimeoutInMs) {
     super(zkAddress, clusterName);
     _rebalanceConfig.setDryRun(dryRun);
     _rebalanceConfig.setReassignInstances(reassignInstances);
@@ -42,6 +43,7 @@ public class PinotTableRebalancer extends PinotZKChanger {
     _rebalanceConfig.setBootstrap(bootstrap);
     _rebalanceConfig.setDowntime(downtime);
     _rebalanceConfig.setMinAvailableReplicas(minReplicasToKeepUpForNoDowntime);
+    _rebalanceConfig.setLowDiskMode(lowDiskMode);
     _rebalanceConfig.setBestEfforts(bestEffort);
     _rebalanceConfig.setExternalViewCheckIntervalInMs(externalViewCheckIntervalInMs);
     _rebalanceConfig.setExternalViewStabilizationTimeoutInMs(externalViewStabilizationTimeoutInMs);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
index 5b4c3260d4..97f59bc08b 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
@@ -73,6 +73,14 @@ public class RebalanceTableCommand extends AbstractBaseAdminCommand implements C
           + "number of replicas allowed to be unavailable if value is negative (1 by default)")
   private int _minAvailableReplicas = 1;
 
+  @CommandLine.Option(names = {"-lowDiskMode"}, description =
+      "For no-downtime rebalance, whether to enable low disk mode during rebalance. When enabled, "
+          + "segments will first be offloaded from servers, then added to servers after offload is done while "
+          + "maintaining the min available replicas. It may increase the total time of the rebalance, but can be "
+          + "useful when servers are low on disk space, and we want to scale up the cluster and rebalance the table "
+          + "to more servers (false by default)")
+  private boolean _lowDiskMode = false;
+
   @CommandLine.Option(names = {"-bestEfforts"},
       description = "Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime contract"
           + " cannot be achieved, false by default)")
@@ -104,7 +112,7 @@ public class RebalanceTableCommand extends AbstractBaseAdminCommand implements C
       throws Exception {
     PinotTableRebalancer tableRebalancer =
         new PinotTableRebalancer(_zkAddress, _clusterName, _dryRun, _reassignInstances, _includeConsuming, _bootstrap,
-            _downtime, _minAvailableReplicas, _bestEfforts, _externalViewCheckIntervalInMs,
+            _downtime, _minAvailableReplicas, _lowDiskMode, _bestEfforts, _externalViewCheckIntervalInMs,
             _externalViewStabilizationTimeoutInMs);
     RebalanceResult rebalanceResult = tableRebalancer.rebalance(_tableNameWithType);
     LOGGER


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org