You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/08 18:00:06 UTC

[31/38] helix git commit: Add cluster-level and resource-level config option to allow disable delayed rebalance of entire cluster or individual resource.

Add cluster-level and resource-level config option to allow disable delayed rebalance of entire cluster or individual resource.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c5e12b1e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c5e12b1e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c5e12b1e

Branch: refs/heads/helix-0.6.x
Commit: c5e12b1e6b7aa7f8b2e43e4878be3f4c3de81f82
Parents: a294ab2
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Sep 21 10:53:31 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Feb 8 09:54:08 2017 -0800

----------------------------------------------------------------------
 .../rebalancer/DelayedAutoRebalancer.java       | 80 +++++++++++++-----
 .../rebalancer/util/RebalanceScheduler.java     |  6 +-
 .../org/apache/helix/model/ClusterConfig.java   | 12 ++-
 .../java/org/apache/helix/model/IdealState.java | 21 ++++-
 .../helix/model/builder/IdealStateBuilder.java  | 25 +++++-
 .../integration/TestDelayedAutoRebalance.java   | 89 ++++++++++++++++++++
 .../integration/ZkIntegrationTestBase.java      | 12 +++
 7 files changed, 217 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 1e127bc..d1718fc 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -24,6 +24,7 @@ import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
@@ -80,10 +81,12 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
       allNodes = clusterData.getEnabledInstances();
     }
 
+    ClusterConfig clusterConfig = clusterData.getClusterConfig();
+    long delayTime = getRebalanceDelay(currentIdealState, clusterConfig);
     Set<String> activeNodes = getActiveInstances(currentIdealState, allNodes, liveNodes,
-        clusterData.getInstanceOfflineTimeMap());
-
-    setRebalanceScheduler(currentIdealState, activeNodes, clusterData.getInstanceOfflineTimeMap());
+        clusterData.getInstanceOfflineTimeMap(), delayTime, clusterConfig);
+    setRebalanceScheduler(currentIdealState, activeNodes, clusterData.getInstanceOfflineTimeMap(),
+        delayTime, clusterConfig);
 
     if (allNodes.isEmpty() || activeNodes.isEmpty()) {
       LOG.error(String.format(
@@ -105,8 +108,6 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
           emptyMapping(currentIdealState));
     }
 
-    int minActiveReplicas = getMinActiveReplica(currentIdealState, replicaCount);
-
     LinkedHashMap<String, Integer> stateCountMap =
         StateModelDefinition.getStateCountMap(stateModelDef, activeNodes.size(), replicaCount);
     Map<String, Map<String, String>> currentMapping =
@@ -120,18 +121,25 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
     // sort node lists to ensure consistent preferred assignments
     List<String> allNodeList = new ArrayList<String>(allNodes);
     List<String> liveNodeList = new ArrayList<String>(liveNodes);
-    List<String> activeNodeList = new ArrayList<String>(activeNodes);
     Collections.sort(allNodeList);
     Collections.sort(liveNodeList);
-    Collections.sort(activeNodeList);
 
     ZNRecord newIdealMapping = _rebalanceStrategy
         .computePartitionAssignment(allNodeList, liveNodeList, currentMapping, clusterData);
-    ZNRecord newActiveMapping = _rebalanceStrategy
-        .computePartitionAssignment(allNodeList, activeNodeList, currentMapping, clusterData);
-    ZNRecord finalMapping =
-        getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping, liveNodes,
-            replicaCount, minActiveReplicas);
+    ZNRecord finalMapping = newIdealMapping;
+
+    if (!isDelayRebalanceDisabled(currentIdealState, clusterConfig)) {
+      List<String> activeNodeList = new ArrayList<String>(activeNodes);
+      Collections.sort(activeNodeList);
+      int minActiveReplicas = getMinActiveReplica(currentIdealState, replicaCount);
+
+      ZNRecord newActiveMapping = _rebalanceStrategy
+          .computePartitionAssignment(allNodeList, activeNodeList, currentMapping, clusterData);
+      finalMapping =
+          getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping, liveNodes,
+              replicaCount, minActiveReplicas);
+      LOG.debug("newActiveMapping: " + newActiveMapping);
+    }
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("currentMapping: " + currentMapping);
@@ -140,7 +148,6 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
       LOG.debug("allNodes: " + allNodes);
       LOG.debug("maxPartition: " + maxPartition);
       LOG.debug("newIdealMapping: " + newIdealMapping);
-      LOG.debug("newActiveMapping: " + newActiveMapping);
       LOG.debug("finalMapping: " + finalMapping);
     }
 
@@ -159,13 +166,18 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
 
   /* get all active instances (live instances plus offline-yet-active instances */
   private Set<String> getActiveInstances(IdealState idealState, Set<String> allNodes,
-      Set<String> liveNodes, Map<String, Long> instanceOfflineTimeMap) {
+      Set<String> liveNodes, Map<String, Long> instanceOfflineTimeMap, long delayTime,
+      ClusterConfig clusterConfig) {
     Set<String> activeInstances = new HashSet<String>(liveNodes);
+
+    if (isDelayRebalanceDisabled(idealState, clusterConfig)) {
+      return activeInstances;
+    }
+
     Set<String> offlineInstances = new HashSet<String>(allNodes);
     offlineInstances.removeAll(liveNodes);
 
     long currentTime = System.currentTimeMillis();
-    long delayTime = idealState.getRebalanceDelay();
     for (String ins : offlineInstances) {
       Long offlineTime = instanceOfflineTimeMap.get(ins);
       if (offlineTime != null && offlineTime > 0) {
@@ -180,10 +192,14 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
 
   /* Set a rebalance scheduler for the closest future rebalance time. */
   private void setRebalanceScheduler(IdealState idealState, Set<String> activeInstances,
-      Map<String, Long> instanceOfflineTimeMap) {
-    long nextRebalanceTime = Long.MAX_VALUE;
-    long delayTime = idealState.getRebalanceDelay();
+      Map<String, Long> instanceOfflineTimeMap, long delayTime, ClusterConfig clusterConfig) {
+    String resourceName = idealState.getResourceName();
+    if (isDelayRebalanceDisabled(idealState, clusterConfig)) {
+      _scheduledRebalancer.removeScheduledRebalance(resourceName);
+      return;
+    }
 
+    long nextRebalanceTime = Long.MAX_VALUE;
     for (String ins : activeInstances) {
       Long offlineTime = instanceOfflineTimeMap.get(ins);
       if (offlineTime != null && offlineTime > 0) {
@@ -197,16 +213,31 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
       }
     }
 
-    String resourceName = idealState.getResourceName();
-    LOG.debug(String
-        .format("Next rebalance time for resource %s is %d\n", resourceName, nextRebalanceTime));
     if (nextRebalanceTime == Long.MAX_VALUE) {
-      _scheduledRebalancer.removeScheduledRebalance(resourceName);
+      long startTime = _scheduledRebalancer.removeScheduledRebalance(resourceName);
+      LOG.debug(String
+          .format("Remove exist rebalance timer for resource %s at %d\n", resourceName, startTime));
     } else {
       _scheduledRebalancer.scheduleRebalance(_manager, resourceName, nextRebalanceTime);
+      LOG.debug(String.format("Set next rebalance time for resource %s at time %d\n", resourceName,
+          nextRebalanceTime));
     }
   }
 
+  private long getRebalanceDelay(IdealState idealState, ClusterConfig clusterConfig) {
+    long delayTime = idealState.getRebalanceDelay();
+    if (delayTime < 0) {
+      delayTime = clusterConfig.getRebalanceDelayTime();
+    }
+    return delayTime;
+  }
+
+  private boolean isDelayRebalanceDisabled(IdealState idealState, ClusterConfig clusterConfig) {
+    long delayTime = getRebalanceDelay(idealState, clusterConfig);
+    return (delayTime < 0 || idealState.isDelayRebalanceDisabled() || clusterConfig
+        .isDelayRebalaceDisabled());
+  }
+
   private ZNRecord getFinalDelayedMapping(IdealState idealState, ZNRecord newIdealMapping,
       ZNRecord newActiveMapping, Set<String> liveInstances, int numReplica, int minActiveReplica) {
     if (minActiveReplica >= numReplica) {
@@ -274,8 +305,11 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
     Set<String> offlineNodes = cache.getAllInstances();
     offlineNodes.removeAll(cache.getLiveInstances().keySet());
 
+    ClusterConfig clusterConfig = cache.getClusterConfig();
+    long delayTime = getRebalanceDelay(idealState, clusterConfig);
     Set<String> activeNodes =
-        getActiveInstances(idealState, allNodes, liveNodes, cache.getInstanceOfflineTimeMap());
+        getActiveInstances(idealState, allNodes, liveNodes, cache.getInstanceOfflineTimeMap(),
+            delayTime, clusterConfig);
 
     String stateModelDefName = idealState.getStateModelDefRef();
     StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);

http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
index bbc03d0..641c755 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
@@ -95,7 +95,7 @@ public class RebalanceScheduler {
    *
    * @param resource
    */
-  public void removeScheduledRebalance(String resource) {
+  public long removeScheduledRebalance(String resource) {
     ScheduledTask existTask = _rebalanceTasks.remove(resource);
     if (existTask != null && !existTask.getFuture().isDone()) {
       if (!existTask.getFuture().cancel(true)) {
@@ -104,7 +104,11 @@ public class RebalanceScheduler {
       LOG.info(
           "Remove scheduled rebalance task at time " + existTask.getStartTime() + " for resource: "
               + resource);
+
+      return existTask.getStartTime();
     }
+
+    return -1;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 7b30fd7..23d66a4 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -33,7 +33,9 @@ public class ClusterConfig extends HelixProperty {
     HELIX_DISABLE_PIPELINE_TRIGGERS,
     TOPOLOGY,  // cluster topology definition, for example, "/zone/rack/host/instance"
     PERSIST_BEST_POSSIBLE_ASSIGNMENT,
-    FAULT_ZONE_TYPE // the type in which isolation should be applied on when Helix places the replicas from same partition.
+    FAULT_ZONE_TYPE, // the type in which isolation should be applied on when Helix places the replicas from same partition.
+    DELAY_REBALANCE_DISABLED,  // enabled the delayed rebalaning in case node goes offline.
+    DELAY_REBALANCE_TIME     // delayed time in ms that the delay time Helix should hold until rebalancing.
   }
 
   /**
@@ -73,6 +75,14 @@ public class ClusterConfig extends HelixProperty {
         .getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), false);
   }
 
+  public long getRebalanceDelayTime() {
+    return _record.getLongField(ClusterConfigProperty.DELAY_REBALANCE_TIME.name(), -1);
+  }
+
+  public boolean isDelayRebalaceDisabled() {
+    return _record.getBooleanField(ClusterConfigProperty.DELAY_REBALANCE_DISABLED.name(), false);
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (obj instanceof ClusterConfig) {

http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 769a369..5ced7a6 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -52,6 +52,7 @@ public class IdealState extends HelixProperty {
     REPLICAS,
     MIN_ACTIVE_REPLICAS,
     REBALANCE_DELAY,
+    DELAY_REBALANCE_DISABLED,
     @Deprecated
     IDEAL_STATE_MODE,
     REBALANCE_MODE,
@@ -204,7 +205,7 @@ public class IdealState extends HelixProperty {
    * @param delayInMilliseconds
    */
   public void setRebalanceDelay(long delayInMilliseconds) {
-    _record.setLongField(IdealStateProperty.REBALANCE_DELAY.toString(), delayInMilliseconds);
+    _record.setLongField(IdealStateProperty.REBALANCE_DELAY.name(), delayInMilliseconds);
   }
 
   /**
@@ -212,7 +213,23 @@ public class IdealState extends HelixProperty {
    * @return
    */
   public long getRebalanceDelay() {
-    return _record.getLongField(IdealStateProperty.REBALANCE_DELAY.toString(), 0);
+    return _record.getLongField(IdealStateProperty.REBALANCE_DELAY.name(), -1);
+  }
+
+  /**
+   * If disabled is true, the delayed rebalance time will be ignored.
+   * @param disabled
+   */
+  public void setDelayRebalanceDisabled(boolean disabled) {
+    _record.setBooleanField(IdealStateProperty.DELAY_REBALANCE_DISABLED.name(), disabled);
+  }
+
+  /**
+   * Whether the delay rebalance is disabled.
+   * @return
+   */
+  public boolean isDelayRebalanceDisabled() {
+    return _record.getBooleanField(IdealStateProperty.DELAY_REBALANCE_DISABLED.name(), false);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
index e3000c2..09a7528 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
@@ -51,6 +51,11 @@ public abstract class IdealStateBuilder {
   private long rebalanceDelayInMs = -1;
 
   /**
+   * Whether delay rebalance should be disabled.
+   */
+  private Boolean delayRebalanceDisabled = null;
+
+  /**
    * State model that is applicable for this resource
    */
   private String stateModel;
@@ -135,6 +140,20 @@ public abstract class IdealStateBuilder {
   }
 
   /**
+   * Disable Delayed Rebalance.
+   */
+  public void disableDelayRebalance() {
+    delayRebalanceDisabled = true;
+  }
+
+  /**
+   * Disable Delayed Rebalance.
+   */
+  public void enableDelayRebalance() {
+    delayRebalanceDisabled = false;
+  }
+
+  /**
    * @param numPartitions
    */
   public IdealStateBuilder setNumPartitions(int numPartitions) {
@@ -272,10 +291,14 @@ public abstract class IdealStateBuilder {
       idealstate.enableGroupRouting(enableGroupRouting);
     }
 
-    if (rebalanceDelayInMs > 0) {
+    if (rebalanceDelayInMs >= 0) {
       idealstate.setRebalanceDelay(rebalanceDelayInMs);
     }
 
+    if (delayRebalanceDisabled != null) {
+      idealstate.setDelayRebalanceDisabled(delayRebalanceDisabled);
+    }
+
     if (!idealstate.isValid()) {
       throw new HelixException("invalid ideal-state: " + idealstate);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
index ba7f46e..6342d13 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
@@ -234,6 +234,95 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
     }
   }
 
+  @Test
+  public void testDisableResourceDelayRebalance() throws Exception {
+    Map<String, IdealState> idealStates = new HashMap<String, IdealState>();
+    Map<String, ExternalView> externalViewsBefore = new HashMap<String, ExternalView>();
+
+    int minActiveReplica = _replica - 1;
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      String db = "Test-DB-" + i++;
+      IdealState idealState =
+          createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+              minActiveReplica, 100000);
+      _testDBs.add(db);
+      idealStates.put(db, idealState);
+    }
+
+    Thread.sleep(1000);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _testDBs) {
+      ExternalView ev =
+          _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      externalViewsBefore.put(db, ev);
+    }
+
+    // bring down one node, no partition should be moved.
+    _participants.get(0).syncStop();
+    Thread.sleep(1000);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    for (String db : _testDBs) {
+      ExternalView ev =
+          _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+      validateNoPartitionMove(idealStates.get(db), externalViewsBefore.get(db), ev,
+          _participants.get(0).getInstanceName());
+    }
+
+    // disable delay rebalance for one db, partition should be moved immediately
+    String testDb = _testDBs.get(0);
+    IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState(
+        CLUSTER_NAME, testDb);
+    idealState.setDelayRebalanceDisabled(true);
+    _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, testDb, idealState);
+    Thread.sleep(1000);
+
+    // once delay rebalance is disabled, it should maintain required number of replicas.
+    ExternalView ev =
+        _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, testDb);
+    idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
+    validateMinActiveAndTopStateReplica(idealState, ev, _replica);
+  }
+
+  @Test
+  public void testDisableDelayRebalanceInCluster() throws Exception {
+    Map<String, IdealState> idealStates = new HashMap<String, IdealState>();
+    disableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
+
+    int minActiveReplica = _replica - 1;
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      String db = "Test-DB-" + i++;
+      IdealState idealState =
+          createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+              minActiveReplica, 100000);
+      _testDBs.add(db);
+      idealStates.put(db, idealState);
+    }
+
+    Thread.sleep(1000);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    // bring down one node, no partition should be moved.
+    _participants.get(0).syncStop();
+    Thread.sleep(1000);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    // disable delay rebalance for the entire cluster.
+    disableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
+    Thread.sleep(1000);
+    for (String db : _testDBs) {
+      ExternalView ev =
+          _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateMinActiveAndTopStateReplica(idealStates.get(db), ev, _replica);
+    }
+
+    disableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false);
+  }
+
   @AfterMethod
   public void afterTest() {
     // delete all DBs create in last test

http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
index 9810f81..0edd4d3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
@@ -114,4 +114,16 @@ public class ZkIntegrationTestBase {
         ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(),
         enable.toString());
   }
+
+  protected void disableDelayRebalanceInCluster(ZkClient zkClient, String clusterName,
+      Boolean disabled) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    HelixConfigScope clusterScope =
+        new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+            .forCluster(clusterName).build();
+
+    configAccessor
+        .set(clusterScope, ClusterConfig.ClusterConfigProperty.DELAY_REBALANCE_DISABLED.name(),
+            disabled.toString());
+  }
 }