You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/08 18:00:06 UTC
[31/38] helix git commit: Add cluster-level and resource-level config
option to allow disable delayed rebalance of entire cluster or individual
resource.
Add cluster-level and resource-level config option to allow disable delayed rebalance of entire cluster or individual resource.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c5e12b1e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c5e12b1e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c5e12b1e
Branch: refs/heads/helix-0.6.x
Commit: c5e12b1e6b7aa7f8b2e43e4878be3f4c3de81f82
Parents: a294ab2
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Sep 21 10:53:31 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Feb 8 09:54:08 2017 -0800
----------------------------------------------------------------------
.../rebalancer/DelayedAutoRebalancer.java | 80 +++++++++++++-----
.../rebalancer/util/RebalanceScheduler.java | 6 +-
.../org/apache/helix/model/ClusterConfig.java | 12 ++-
.../java/org/apache/helix/model/IdealState.java | 21 ++++-
.../helix/model/builder/IdealStateBuilder.java | 25 +++++-
.../integration/TestDelayedAutoRebalance.java | 89 ++++++++++++++++++++
.../integration/ZkIntegrationTestBase.java | 12 +++
7 files changed, 217 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 1e127bc..d1718fc 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -24,6 +24,7 @@ import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
@@ -80,10 +81,12 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
allNodes = clusterData.getEnabledInstances();
}
+ ClusterConfig clusterConfig = clusterData.getClusterConfig();
+ long delayTime = getRebalanceDelay(currentIdealState, clusterConfig);
Set<String> activeNodes = getActiveInstances(currentIdealState, allNodes, liveNodes,
- clusterData.getInstanceOfflineTimeMap());
-
- setRebalanceScheduler(currentIdealState, activeNodes, clusterData.getInstanceOfflineTimeMap());
+ clusterData.getInstanceOfflineTimeMap(), delayTime, clusterConfig);
+ setRebalanceScheduler(currentIdealState, activeNodes, clusterData.getInstanceOfflineTimeMap(),
+ delayTime, clusterConfig);
if (allNodes.isEmpty() || activeNodes.isEmpty()) {
LOG.error(String.format(
@@ -105,8 +108,6 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
emptyMapping(currentIdealState));
}
- int minActiveReplicas = getMinActiveReplica(currentIdealState, replicaCount);
-
LinkedHashMap<String, Integer> stateCountMap =
StateModelDefinition.getStateCountMap(stateModelDef, activeNodes.size(), replicaCount);
Map<String, Map<String, String>> currentMapping =
@@ -120,18 +121,25 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
// sort node lists to ensure consistent preferred assignments
List<String> allNodeList = new ArrayList<String>(allNodes);
List<String> liveNodeList = new ArrayList<String>(liveNodes);
- List<String> activeNodeList = new ArrayList<String>(activeNodes);
Collections.sort(allNodeList);
Collections.sort(liveNodeList);
- Collections.sort(activeNodeList);
ZNRecord newIdealMapping = _rebalanceStrategy
.computePartitionAssignment(allNodeList, liveNodeList, currentMapping, clusterData);
- ZNRecord newActiveMapping = _rebalanceStrategy
- .computePartitionAssignment(allNodeList, activeNodeList, currentMapping, clusterData);
- ZNRecord finalMapping =
- getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping, liveNodes,
- replicaCount, minActiveReplicas);
+ ZNRecord finalMapping = newIdealMapping;
+
+ if (!isDelayRebalanceDisabled(currentIdealState, clusterConfig)) {
+ List<String> activeNodeList = new ArrayList<String>(activeNodes);
+ Collections.sort(activeNodeList);
+ int minActiveReplicas = getMinActiveReplica(currentIdealState, replicaCount);
+
+ ZNRecord newActiveMapping = _rebalanceStrategy
+ .computePartitionAssignment(allNodeList, activeNodeList, currentMapping, clusterData);
+ finalMapping =
+ getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping, liveNodes,
+ replicaCount, minActiveReplicas);
+ LOG.debug("newActiveMapping: " + newActiveMapping);
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("currentMapping: " + currentMapping);
@@ -140,7 +148,6 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
LOG.debug("allNodes: " + allNodes);
LOG.debug("maxPartition: " + maxPartition);
LOG.debug("newIdealMapping: " + newIdealMapping);
- LOG.debug("newActiveMapping: " + newActiveMapping);
LOG.debug("finalMapping: " + finalMapping);
}
@@ -159,13 +166,18 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
/* get all active instances (live instances plus offline-yet-active instances */
private Set<String> getActiveInstances(IdealState idealState, Set<String> allNodes,
- Set<String> liveNodes, Map<String, Long> instanceOfflineTimeMap) {
+ Set<String> liveNodes, Map<String, Long> instanceOfflineTimeMap, long delayTime,
+ ClusterConfig clusterConfig) {
Set<String> activeInstances = new HashSet<String>(liveNodes);
+
+ if (isDelayRebalanceDisabled(idealState, clusterConfig)) {
+ return activeInstances;
+ }
+
Set<String> offlineInstances = new HashSet<String>(allNodes);
offlineInstances.removeAll(liveNodes);
long currentTime = System.currentTimeMillis();
- long delayTime = idealState.getRebalanceDelay();
for (String ins : offlineInstances) {
Long offlineTime = instanceOfflineTimeMap.get(ins);
if (offlineTime != null && offlineTime > 0) {
@@ -180,10 +192,14 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
/* Set a rebalance scheduler for the closest future rebalance time. */
private void setRebalanceScheduler(IdealState idealState, Set<String> activeInstances,
- Map<String, Long> instanceOfflineTimeMap) {
- long nextRebalanceTime = Long.MAX_VALUE;
- long delayTime = idealState.getRebalanceDelay();
+ Map<String, Long> instanceOfflineTimeMap, long delayTime, ClusterConfig clusterConfig) {
+ String resourceName = idealState.getResourceName();
+ if (isDelayRebalanceDisabled(idealState, clusterConfig)) {
+ _scheduledRebalancer.removeScheduledRebalance(resourceName);
+ return;
+ }
+ long nextRebalanceTime = Long.MAX_VALUE;
for (String ins : activeInstances) {
Long offlineTime = instanceOfflineTimeMap.get(ins);
if (offlineTime != null && offlineTime > 0) {
@@ -197,16 +213,31 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
}
}
- String resourceName = idealState.getResourceName();
- LOG.debug(String
- .format("Next rebalance time for resource %s is %d\n", resourceName, nextRebalanceTime));
if (nextRebalanceTime == Long.MAX_VALUE) {
- _scheduledRebalancer.removeScheduledRebalance(resourceName);
+ long startTime = _scheduledRebalancer.removeScheduledRebalance(resourceName);
+ LOG.debug(String
+ .format("Remove exist rebalance timer for resource %s at %d\n", resourceName, startTime));
} else {
_scheduledRebalancer.scheduleRebalance(_manager, resourceName, nextRebalanceTime);
+ LOG.debug(String.format("Set next rebalance time for resource %s at time %d\n", resourceName,
+ nextRebalanceTime));
}
}
+ private long getRebalanceDelay(IdealState idealState, ClusterConfig clusterConfig) {
+ long delayTime = idealState.getRebalanceDelay();
+ if (delayTime < 0) {
+ delayTime = clusterConfig.getRebalanceDelayTime();
+ }
+ return delayTime;
+ }
+
+ private boolean isDelayRebalanceDisabled(IdealState idealState, ClusterConfig clusterConfig) {
+ long delayTime = getRebalanceDelay(idealState, clusterConfig);
+ return (delayTime < 0 || idealState.isDelayRebalanceDisabled() || clusterConfig
+ .isDelayRebalaceDisabled());
+ }
+
private ZNRecord getFinalDelayedMapping(IdealState idealState, ZNRecord newIdealMapping,
ZNRecord newActiveMapping, Set<String> liveInstances, int numReplica, int minActiveReplica) {
if (minActiveReplica >= numReplica) {
@@ -274,8 +305,11 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
Set<String> offlineNodes = cache.getAllInstances();
offlineNodes.removeAll(cache.getLiveInstances().keySet());
+ ClusterConfig clusterConfig = cache.getClusterConfig();
+ long delayTime = getRebalanceDelay(idealState, clusterConfig);
Set<String> activeNodes =
- getActiveInstances(idealState, allNodes, liveNodes, cache.getInstanceOfflineTimeMap());
+ getActiveInstances(idealState, allNodes, liveNodes, cache.getInstanceOfflineTimeMap(),
+ delayTime, clusterConfig);
String stateModelDefName = idealState.getStateModelDefRef();
StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
index bbc03d0..641c755 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
@@ -95,7 +95,7 @@ public class RebalanceScheduler {
*
* @param resource
*/
- public void removeScheduledRebalance(String resource) {
+ public long removeScheduledRebalance(String resource) {
ScheduledTask existTask = _rebalanceTasks.remove(resource);
if (existTask != null && !existTask.getFuture().isDone()) {
if (!existTask.getFuture().cancel(true)) {
@@ -104,7 +104,11 @@ public class RebalanceScheduler {
LOG.info(
"Remove scheduled rebalance task at time " + existTask.getStartTime() + " for resource: "
+ resource);
+
+ return existTask.getStartTime();
}
+
+ return -1;
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 7b30fd7..23d66a4 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -33,7 +33,9 @@ public class ClusterConfig extends HelixProperty {
HELIX_DISABLE_PIPELINE_TRIGGERS,
TOPOLOGY, // cluster topology definition, for example, "/zone/rack/host/instance"
PERSIST_BEST_POSSIBLE_ASSIGNMENT,
- FAULT_ZONE_TYPE // the type in which isolation should be applied on when Helix places the replicas from same partition.
+ FAULT_ZONE_TYPE, // the type in which isolation should be applied on when Helix places the replicas from same partition.
+ DELAY_REBALANCE_DISABLED, // enabled the delayed rebalaning in case node goes offline.
+ DELAY_REBALANCE_TIME // delayed time in ms that the delay time Helix should hold until rebalancing.
}
/**
@@ -73,6 +75,14 @@ public class ClusterConfig extends HelixProperty {
.getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), false);
}
+ public long getRebalanceDelayTime() {
+ return _record.getLongField(ClusterConfigProperty.DELAY_REBALANCE_TIME.name(), -1);
+ }
+
+ public boolean isDelayRebalaceDisabled() {
+ return _record.getBooleanField(ClusterConfigProperty.DELAY_REBALANCE_DISABLED.name(), false);
+ }
+
@Override
public boolean equals(Object obj) {
if (obj instanceof ClusterConfig) {
http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 769a369..5ced7a6 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -52,6 +52,7 @@ public class IdealState extends HelixProperty {
REPLICAS,
MIN_ACTIVE_REPLICAS,
REBALANCE_DELAY,
+ DELAY_REBALANCE_DISABLED,
@Deprecated
IDEAL_STATE_MODE,
REBALANCE_MODE,
@@ -204,7 +205,7 @@ public class IdealState extends HelixProperty {
* @param delayInMilliseconds
*/
public void setRebalanceDelay(long delayInMilliseconds) {
- _record.setLongField(IdealStateProperty.REBALANCE_DELAY.toString(), delayInMilliseconds);
+ _record.setLongField(IdealStateProperty.REBALANCE_DELAY.name(), delayInMilliseconds);
}
/**
@@ -212,7 +213,23 @@ public class IdealState extends HelixProperty {
* @return
*/
public long getRebalanceDelay() {
- return _record.getLongField(IdealStateProperty.REBALANCE_DELAY.toString(), 0);
+ return _record.getLongField(IdealStateProperty.REBALANCE_DELAY.name(), -1);
+ }
+
+ /**
+ * If disabled is true, the delayed rebalance time will be ignored.
+ * @param disabled
+ */
+ public void setDelayRebalanceDisabled(boolean disabled) {
+ _record.setBooleanField(IdealStateProperty.DELAY_REBALANCE_DISABLED.name(), disabled);
+ }
+
+ /**
+ * Whether the delay rebalance is disabled.
+ * @return
+ */
+ public boolean isDelayRebalanceDisabled() {
+ return _record.getBooleanField(IdealStateProperty.DELAY_REBALANCE_DISABLED.name(), false);
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
index e3000c2..09a7528 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
@@ -51,6 +51,11 @@ public abstract class IdealStateBuilder {
private long rebalanceDelayInMs = -1;
/**
+ * Whether delay rebalance should be disabled.
+ */
+ private Boolean delayRebalanceDisabled = null;
+
+ /**
* State model that is applicable for this resource
*/
private String stateModel;
@@ -135,6 +140,20 @@ public abstract class IdealStateBuilder {
}
/**
+ * Disable Delayed Rebalance.
+ */
+ public void disableDelayRebalance() {
+ delayRebalanceDisabled = true;
+ }
+
+ /**
+ * Disable Delayed Rebalance.
+ */
+ public void enableDelayRebalance() {
+ delayRebalanceDisabled = false;
+ }
+
+ /**
* @param numPartitions
*/
public IdealStateBuilder setNumPartitions(int numPartitions) {
@@ -272,10 +291,14 @@ public abstract class IdealStateBuilder {
idealstate.enableGroupRouting(enableGroupRouting);
}
- if (rebalanceDelayInMs > 0) {
+ if (rebalanceDelayInMs >= 0) {
idealstate.setRebalanceDelay(rebalanceDelayInMs);
}
+ if (delayRebalanceDisabled != null) {
+ idealstate.setDelayRebalanceDisabled(delayRebalanceDisabled);
+ }
+
if (!idealstate.isValid()) {
throw new HelixException("invalid ideal-state: " + idealstate);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
index ba7f46e..6342d13 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
@@ -234,6 +234,95 @@ public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
}
}
+ @Test
+ public void testDisableResourceDelayRebalance() throws Exception {
+ Map<String, IdealState> idealStates = new HashMap<String, IdealState>();
+ Map<String, ExternalView> externalViewsBefore = new HashMap<String, ExternalView>();
+
+ int minActiveReplica = _replica - 1;
+ int i = 0;
+ for (String stateModel : TestStateModels) {
+ String db = "Test-DB-" + i++;
+ IdealState idealState =
+ createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+ minActiveReplica, 100000);
+ _testDBs.add(db);
+ idealStates.put(db, idealState);
+ }
+
+ Thread.sleep(1000);
+ Assert.assertTrue(_clusterVerifier.verify());
+
+ for (String db : _testDBs) {
+ ExternalView ev =
+ _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+ externalViewsBefore.put(db, ev);
+ }
+
+ // bring down one node, no partition should be moved.
+ _participants.get(0).syncStop();
+ Thread.sleep(1000);
+ Assert.assertTrue(_clusterVerifier.verify());
+
+ for (String db : _testDBs) {
+ ExternalView ev =
+ _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+ validateNoPartitionMove(idealStates.get(db), externalViewsBefore.get(db), ev,
+ _participants.get(0).getInstanceName());
+ }
+
+ // disable delay rebalance for one db, partition should be moved immediately
+ String testDb = _testDBs.get(0);
+ IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState(
+ CLUSTER_NAME, testDb);
+ idealState.setDelayRebalanceDisabled(true);
+ _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, testDb, idealState);
+ Thread.sleep(1000);
+
+ // once delay rebalance is disabled, it should maintain required number of replicas.
+ ExternalView ev =
+ _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, testDb);
+ idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
+ validateMinActiveAndTopStateReplica(idealState, ev, _replica);
+ }
+
+ @Test
+ public void testDisableDelayRebalanceInCluster() throws Exception {
+ Map<String, IdealState> idealStates = new HashMap<String, IdealState>();
+ disableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
+
+ int minActiveReplica = _replica - 1;
+ int i = 0;
+ for (String stateModel : TestStateModels) {
+ String db = "Test-DB-" + i++;
+ IdealState idealState =
+ createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+ minActiveReplica, 100000);
+ _testDBs.add(db);
+ idealStates.put(db, idealState);
+ }
+
+ Thread.sleep(1000);
+ Assert.assertTrue(_clusterVerifier.verify());
+
+ // bring down one node, no partition should be moved.
+ _participants.get(0).syncStop();
+ Thread.sleep(1000);
+ Assert.assertTrue(_clusterVerifier.verify());
+
+ // disable delay rebalance for the entire cluster.
+ disableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
+ Thread.sleep(1000);
+ for (String db : _testDBs) {
+ ExternalView ev =
+ _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(idealStates.get(db), ev, _replica);
+ }
+
+ disableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false);
+ }
+
@AfterMethod
public void afterTest() {
// delete all DBs create in last test
http://git-wip-us.apache.org/repos/asf/helix/blob/c5e12b1e/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
index 9810f81..0edd4d3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
@@ -114,4 +114,16 @@ public class ZkIntegrationTestBase {
ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(),
enable.toString());
}
+
+ protected void disableDelayRebalanceInCluster(ZkClient zkClient, String clusterName,
+ Boolean disabled) {
+ ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+ HelixConfigScope clusterScope =
+ new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+ .forCluster(clusterName).build();
+
+ configAccessor
+ .set(clusterScope, ClusterConfig.ClusterConfigProperty.DELAY_REBALANCE_DISABLED.name(),
+ disabled.toString());
+ }
}