You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/10/03 06:01:25 UTC
[2/3] helix git commit: Add state transition throttling logic into
intermediateStateCalcStage.
Add state transition throttling logic into intermediateStateCalcStage.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4e487196
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4e487196
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4e487196
Branch: refs/heads/master
Commit: 4e4871967db07cee191debb9d26bfcd53c401945
Parents: 79ebc04
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Jan 6 16:31:38 2017 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Oct 2 19:06:26 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/ConfigAccessor.java | 14 +-
.../config/StateTransitionThrottleConfig.java | 162 ++---------
.../stages/BestPossibleStateCalcStage.java | 4 +-
.../stages/BestPossibleStateOutput.java | 3 +-
.../stages/CurrentStateComputationStage.java | 4 +-
.../stages/ExternalViewComputeStage.java | 4 +-
.../stages/IntermediateStateCalcStage.java | 291 ++++++++++++++++++-
.../stages/MessageGenerationPhase.java | 8 +-
.../stages/MessageSelectionStage.java | 8 +-
.../controller/stages/MessageThrottleStage.java | 6 +-
.../stages/PersistAssignmentStage.java | 27 +-
.../stages/ResourceComputationStage.java | 2 +-
.../stages/ResourceValidationStage.java | 2 +-
.../StateTransitionThrottleController.java | 176 +++++++++++
.../controller/stages/TaskAssignmentStage.java | 4 +-
.../org/apache/helix/manager/zk/ZKUtil.java | 2 -
.../org/apache/helix/model/ClusterConfig.java | 22 +-
.../tools/ClusterExternalViewVerifier.java | 9 +-
.../helix/tools/ClusterStateVerifier.java | 3 +-
.../BestPossibleExternalViewVerifier.java | 2 +-
.../ClusterExternalViewVerifier.java | 2 +-
.../ClusterVerifiers/ClusterStateVerifier.java | 8 +-
.../TestBestPossibleCalcStageCompatibility.java | 12 +-
.../stages/TestBestPossibleStateCalcStage.java | 6 +-
.../TestCurrentStateComputationStage.java | 12 +-
.../stages/TestMessageThrottleStage.java | 8 +-
.../stages/TestRebalancePipeline.java | 14 +-
.../stages/TestResourceComputationStage.java | 6 +-
.../stages/TestResourceValidationStage.java | 12 +-
.../helix/integration/TestAutoRebalance.java | 3 +-
.../TestPartitionMovementThrottle.java | 283 ++++++++++++++++++
.../helix/integration/task/TaskTestUtil.java | 2 +-
32 files changed, 885 insertions(+), 236 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index 27a30cb..5970de0 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -162,20 +162,14 @@ public class ConfigAccessor {
LOG.error("fail to get configs. invalid config scope. scope: " + scope + ", keys: " + keys);
return null;
}
+ ZNRecord record = getConfigZnRecord(scope);
- String clusterName = scope.getClusterName();
- if (!ZKUtil.isClusterSetup(clusterName, zkClient)) {
- throw new HelixException("fail to get configs. cluster " + clusterName + " is not setup yet");
- }
-
- Map<String, String> map = new HashMap<String, String>();
-
- ZNRecord record = zkClient.readData(scope.getZkPath(), true);
if (record == null) {
LOG.warn("No config found at " + scope.getZkPath());
return null;
}
+ Map<String, String> map = new HashMap<String, String>();
String mapKey = scope.getMapKey();
if (mapKey == null) {
for (String key : keys) {
@@ -304,8 +298,8 @@ public class ConfigAccessor {
}
}
- String zkPath = scope.getZkPath();
String mapKey = scope.getMapKey();
+ String zkPath = scope.getZkPath();
String id = zkPath.substring(zkPath.lastIndexOf('/') + 1);
ZNRecord update = new ZNRecord(id);
if (mapKey == null) {
@@ -313,6 +307,7 @@ public class ConfigAccessor {
} else {
update.setMapField(mapKey, keyValueMap);
}
+
ZKUtil.createOrUpdate(zkClient, zkPath, update, true, true);
return;
}
@@ -620,6 +615,7 @@ public class ConfigAccessor {
}
/**
+<<<<<<< HEAD
* Set config of the given resource.
* The current Resource config will be replaced with the given clusterConfig.
*
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
index 39bd458..1ca25c5 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
@@ -35,128 +35,42 @@ public class StateTransitionThrottleConfig {
private enum ConfigProperty {
CONFIG_TYPE,
REBALANCE_TYPE,
- THROTTLE_SCOPE
+ THROTTLE_SCOPE,
+ MAX_PARTITION_IN_TRANSITION
}
public enum ThrottleScope {
CLUSTER,
RESOURCE,
- INSTANCE,
- PARTITION
+ INSTANCE
}
public enum RebalanceType {
LOAD_BALANCE,
- RECOVERY_BALANCE,
- ANY
+ RECOVERY_BALANCE
}
- public static class StateTransitionType {
- final static String ANY_STATE = "*";
- final static String FROM_KEY = "from";
- final static String TO_KEY = "to";
- String _fromState;
- String _toState;
+ RebalanceType _rebalanceType;
+ ThrottleScope _throttleScope;
+ Long _maxPartitionInTransition;
- StateTransitionType(String fromState, String toState) {
- _fromState = fromState;
- _toState = toState;
- }
-
- @Override
- public String toString() {
- return FROM_KEY + "." + _fromState + "." + TO_KEY + "." + _toState;
- }
-
- public static StateTransitionType parseFromString(String stateTransTypeStr) {
- String states[] = stateTransTypeStr.split(".");
- if (states.length < 4 || !states[0].equalsIgnoreCase(FROM_KEY) || !states[2]
- .equalsIgnoreCase(TO_KEY)) {
- return null;
- }
- return new StateTransitionType(states[1], states[3]);
- }
- }
-
- private ThrottleScope _throttleScope;
- private RebalanceType _rebalanceType;
- private Map<StateTransitionType, Long> _maxPendingStateTransitionMap;
-
- public StateTransitionThrottleConfig(RebalanceType rebalanceType, ThrottleScope throttleScope) {
+ public StateTransitionThrottleConfig(RebalanceType rebalanceType,
+ ThrottleScope throttleScope, long maxPartitionInTransition) {
_rebalanceType = rebalanceType;
_throttleScope = throttleScope;
- _maxPendingStateTransitionMap = new HashMap<StateTransitionType, Long>();
+ _maxPartitionInTransition = maxPartitionInTransition;
}
- /**
- * Add a max pending transition from given from state to the specified to state.
- *
- * @param fromState
- * @param toState
- * @param maxPendingStateTransition
- * @return
- */
- public StateTransitionThrottleConfig addThrottle(String fromState, String toState,
- long maxPendingStateTransition) {
- _maxPendingStateTransitionMap
- .put(new StateTransitionType(fromState, toState), maxPendingStateTransition);
- return this;
+ public RebalanceType getRebalanceType() {
+ return _rebalanceType;
}
- /**
- * Add a max pending transition from ANY state to ANY state.
- *
- * @param maxPendingStateTransition
- * @return
- */
- public StateTransitionThrottleConfig addThrottle(long maxPendingStateTransition) {
- _maxPendingStateTransitionMap
- .put(new StateTransitionType(StateTransitionType.ANY_STATE, StateTransitionType.ANY_STATE),
- maxPendingStateTransition);
- return this;
+ public ThrottleScope getThrottleScope() {
+ return _throttleScope;
}
- /**
- * Add a max pending transition for a given state transition type.
- *
- * @param stateTransitionType
- * @param maxPendingStateTransition
- * @return
- */
- public StateTransitionThrottleConfig addThrottle(StateTransitionType stateTransitionType,
- long maxPendingStateTransition) {
- _maxPendingStateTransitionMap.put(stateTransitionType, maxPendingStateTransition);
- return this;
- }
-
- /**
- * Add a max pending transition from ANY state to the specified state.
- *
- * @param toState
- * @param maxPendingStateTransition
- * @return
- */
- public StateTransitionThrottleConfig addThrottleFromAnyState(String toState,
- long maxPendingStateTransition) {
- _maxPendingStateTransitionMap
- .put(new StateTransitionType(StateTransitionType.ANY_STATE, toState),
- maxPendingStateTransition);
- return this;
- }
-
- /**
- * Add a max pending transition from given state to ANY state.
- *
- * @param fromState
- * @param maxPendingStateTransition
- * @return
- */
- public StateTransitionThrottleConfig addThrottleToAnyState(String fromState,
- long maxPendingStateTransition) {
- _maxPendingStateTransitionMap
- .put(new StateTransitionType(fromState, StateTransitionType.ANY_STATE),
- maxPendingStateTransition);
- return this;
+ public Long getMaxPartitionInTransition() {
+ return _maxPartitionInTransition;
}
private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -167,21 +81,18 @@ public class StateTransitionThrottleConfig {
* @return Json String for this config.
*/
public String toJSON() {
- Map<String, String> configsMap = new HashMap<String, String>();
-
- configsMap.put(ConfigProperty.REBALANCE_TYPE.name(), _rebalanceType.name());
- configsMap.put(ConfigProperty.THROTTLE_SCOPE.name(), _throttleScope.name());
-
- for (Map.Entry<StateTransitionType, Long> e : _maxPendingStateTransitionMap.entrySet()) {
- configsMap.put(e.getKey().toString(), String.valueOf(e.getValue()));
- }
+ Map<String, String> configMap = new HashMap<String, String>();
+ configMap.put(ConfigProperty.REBALANCE_TYPE.name(), _rebalanceType.name());
+ configMap.put(ConfigProperty.THROTTLE_SCOPE.name(), _throttleScope.name());
+ configMap.put(ConfigProperty.MAX_PARTITION_IN_TRANSITION.name(),
+ String.valueOf(_maxPartitionInTransition));
String jsonStr = null;
try {
ObjectWriter objectWriter = OBJECT_MAPPER.writer();
- jsonStr = objectWriter.writeValueAsString(configsMap);
+ jsonStr = objectWriter.writeValueAsString(configMap);
} catch (IOException e) {
- logger.error("Failed to convert config map to JSON object! " + configsMap);
+ logger.error("Failed to convert config map to JSON object! " + configMap);
}
return jsonStr;
@@ -206,16 +117,17 @@ public class StateTransitionThrottleConfig {
return throttleConfig;
}
-
/**
* Instantiate a throttle config from a config map
*
* @param configsMap
- * @return StateTransitionThrottleConfig or null if the given configs map is not a valid StateTransitionThrottleConfig.
+ *
+ * @return StateTransitionThrottleConfig or null if the given configs map is not a valid
+ * StateTransitionThrottleConfig.
*/
public static StateTransitionThrottleConfig fromConfigMap(Map<String, String> configsMap) {
- if (!configsMap.containsKey(ConfigProperty.REBALANCE_TYPE.name()) ||
- !configsMap.containsKey(ConfigProperty.THROTTLE_SCOPE.name())) {
+ if (!configsMap.containsKey(ConfigProperty.REBALANCE_TYPE.name()) || !configsMap
+ .containsKey(ConfigProperty.THROTTLE_SCOPE.name())) {
// not a valid StateTransitionThrottleConfig
return null;
}
@@ -226,25 +138,13 @@ public class StateTransitionThrottleConfig {
RebalanceType.valueOf(configsMap.get(ConfigProperty.REBALANCE_TYPE.name()));
ThrottleScope throttleScope =
ThrottleScope.valueOf(configsMap.get(ConfigProperty.THROTTLE_SCOPE.name()));
- config = new StateTransitionThrottleConfig(rebalanceType, throttleScope);
+ Long maxPartition =
+ Long.valueOf(configsMap.get(ConfigProperty.MAX_PARTITION_IN_TRANSITION.name()));
+ config = new StateTransitionThrottleConfig(rebalanceType, throttleScope, maxPartition);
} catch (IllegalArgumentException ex) {
return null;
}
- for (String configKey : configsMap.keySet()) {
- StateTransitionType transitionType = StateTransitionType.parseFromString(configKey);
- if (transitionType != null) {
- try {
- long value = Long.valueOf(configsMap.get(configKey));
- config.addThrottle(transitionType, value);
- } catch (NumberFormatException ex) {
- // ignore the config item with invalid number.
- logger.warn(String.format("Invalid config entry, key=%s, value=%s", configKey,
- configsMap.get(configKey)));
- }
- }
- }
-
return config;
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index fbb7f86..0a13a8d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -57,8 +57,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
logger.info("START BestPossibleStateCalcStage.process()");
CurrentStateOutput currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ event.getAttribute(AttributeName.CURRENT_STATE.name());
+ Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
ClusterDataCache cache = event.getAttribute("ClusterDataCache");
if (currentStateOutput == null || resourceMap == null || cache == null) {
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
index c64c8cf..9b5faea 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
@@ -40,7 +40,8 @@ public class BestPossibleStateOutput extends ResourcesStateMap {
* @return
*/
// TODO: remove this.
- @Deprecated public Map<Partition, Map<String, String>> getResourceMap(String resourceName) {
+ @Deprecated
+ public Map<Partition, Map<String, String>> getResourceMap(String resourceName) {
PartitionStateMap map = _resourceStateMap.get(resourceName);
if (map != null) {
return map.getStateMap();
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 624698d..0dd4165 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -40,7 +40,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
if (cache == null || resourceMap == null) {
throw new StageException("Missing attributes in event:" + event
@@ -126,6 +126,6 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
}
}
}
- event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index d83518d..5eaf08a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -59,7 +59,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
LOG.info("START ExternalViewComputeStage.process()");
HelixManager manager = event.getAttribute("helixmanager");
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
ClusterDataCache cache = event.getAttribute("ClusterDataCache");
if (manager == null || resourceMap == null || cache == null) {
@@ -71,7 +71,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
CurrentStateOutput currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ event.getAttribute(AttributeName.CURRENT_STATE.name());
List<ExternalView> newExtViews = new ArrayList<ExternalView>();
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 5a13c7a..babc938 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -19,31 +19,41 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.api.config.StateTransitionThrottleConfig;
+import org.apache.helix.controller.common.PartitionStateMap;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
- * For partition compute the Intermediate State (instance,state) pair based on
- * the BestPossible State and Current State, with all constraints applied (such as state transition throttling).
+ * For partition compute the Intermediate State (instance,state) pair based on the BestPossible
+ * State and Current State, with all constraints applied (such as state transition throttling).
*/
public class IntermediateStateCalcStage extends AbstractBaseStage {
private static final Logger logger = Logger.getLogger(IntermediateStateCalcStage.class.getName());
- @Override
- public void process(ClusterEvent event) throws Exception {
+ @Override public void process(ClusterEvent event) throws Exception {
long startTime = System.currentTimeMillis();
logger.info("START Intermediate.process()");
CurrentStateOutput currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ event.getAttribute(AttributeName.CURRENT_STATE.name());
BestPossibleStateOutput bestPossibleStateOutput =
- event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+ Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
ClusterDataCache cache = event.getAttribute("ClusterDataCache");
if (currentStateOutput == null || bestPossibleStateOutput == null || resourceMap == null
@@ -53,26 +63,281 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
}
IntermediateStateOutput immediateStateOutput =
- compute(event, resourceMap, currentStateOutput, bestPossibleStateOutput);
+ compute(cache, resourceMap, currentStateOutput, bestPossibleStateOutput);
+
event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), immediateStateOutput);
long endTime = System.currentTimeMillis();
logger.info("END ImmediateStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
}
- private IntermediateStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
- CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput) {
+ private IntermediateStateOutput compute(ClusterDataCache dataCache,
+ Map<String, Resource> resourceMap, CurrentStateOutput currentStateOutput,
+ BestPossibleStateOutput bestPossibleStateOutput) {
// for each resource
// get the best possible state and current state
// try to bring immediate state close to best possible state until
// the possible pending state transition numbers reach the set throttle number.
IntermediateStateOutput output = new IntermediateStateOutput();
- // TODO: add throttling logic here.
+ StateTransitionThrottleController throttleController =
+ new StateTransitionThrottleController(resourceMap.keySet(), dataCache.getClusterConfig(),
+ dataCache.getLiveInstances().keySet());
+
for (String resourceName : resourceMap.keySet()) {
- logger.debug("Processing resource:" + resourceName);
- output.setState(resourceName, bestPossibleStateOutput.getPartitionStateMap(resourceName));
+ PartitionStateMap intermediatePartitionStateMap =
+ computeIntermediatePartitionState(dataCache, dataCache.getIdealState(resourceName),
+ resourceMap.get(resourceName), currentStateOutput,
+ bestPossibleStateOutput.getPartitionStateMap(resourceName), throttleController);
+ output.setState(resourceName, intermediatePartitionStateMap);
}
return output;
}
+
+ public PartitionStateMap computeIntermediatePartitionState(ClusterDataCache cache,
+ IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput,
+ PartitionStateMap bestPossiblePartitionStateMap,
+ StateTransitionThrottleController throttleController) {
+ String resourceName = resource.getResourceName();
+ logger.info("Processing resource:" + resourceName);
+
+ if (!throttleController.isThrottleEnabled()) {
+ logger.info("None of any type of transition throttling is set for resource " + resourceName
+ + " skip computing intermediate partition state.");
+ return bestPossiblePartitionStateMap;
+ }
+
+ String stateModelDefName = idealState.getStateModelDefRef();
+ StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+
+ boolean pendingRecoveryRebalance = false;
+
+ // check and charge pending transitions
+ for (Partition partition : resource.getPartitions()) {
+ Map<String, String> currentStateMap =
+ currentStateOutput.getCurrentStateMap(resourceName, partition);
+ Map<String, String> pendingMap =
+ currentStateOutput.getPendingStateMap(resourceName, partition);
+ Map<String, String> bestPossibleMap =
+ bestPossiblePartitionStateMap.getPartitionMap(partition);
+
+ StateTransitionThrottleConfig.RebalanceType rebalanceType;
+ if (needRecoveryRebalance(bestPossibleMap, stateModelDef, currentStateMap)) {
+ rebalanceType = StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE;
+ pendingRecoveryRebalance = true;
+ } else {
+ rebalanceType = StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE;
+ }
+
+ if (pendingMap.size() > 0) {
+ throttleController.chargeCluster(rebalanceType);
+ throttleController.chargeResource(rebalanceType, resourceName);
+ }
+
+ Set<String> allInstances = new HashSet<String>(currentStateMap.keySet());
+ allInstances.addAll(pendingMap.keySet());
+
+ for (String ins : allInstances) {
+ String currentState = currentStateMap.get(ins);
+ String pendingState = pendingMap.get(ins);
+ if (pendingState != null && !pendingState.equals(currentState)) {
+ throttleController.chargeInstance(rebalanceType, ins);
+ }
+ }
+ }
+
+ PartitionStateMap output = new PartitionStateMap(resourceName);
+
+ int recoveryNeededCount = 0, recoveryThrottledCount = 0;
+ int loadbalanceNeededCount = 0, loadbalanceThrottledCount = 0;
+
+ Set<Partition> partitionsNeedRecovery = new HashSet<Partition>();
+ Set<Partition> partitionsNeedLoadbalance = new HashSet<Partition>();
+ Set<Partition> partitionsRecoveryThrotted = new HashSet<Partition>();
+ Set<Partition> partitionsLoadbalanceThrottled = new HashSet<Partition>();
+
+ // check recovery rebalance
+ for (Partition partition : resource.getPartitions()) {
+ Map<String, String> currentStateMap =
+ currentStateOutput.getCurrentStateMap(resourceName, partition);
+ Map<String, String> bestPossibleMap =
+ bestPossiblePartitionStateMap.getPartitionMap(partition);
+ Map<String, String> intermediateMap = new HashMap<String, String>();
+
+ if (currentStateMap.equals(bestPossibleMap)) {
+ // no rebalance needed.
+ intermediateMap.putAll(bestPossibleMap);
+ } else if (needRecoveryRebalance(bestPossibleMap, stateModelDef, currentStateMap)) {
+ //TODO: add throttling on recovery balance
+ recoveryNeededCount++;
+ intermediateMap.putAll(bestPossibleMap);
+ pendingRecoveryRebalance = true;
+ partitionsNeedRecovery.add(partition);
+ } else {
+ partitionsNeedLoadbalance.add(partition);
+ }
+ output.setState(partition, intermediateMap);
+ }
+
+ // perform load balance only if no partition need recovery rebalance.
+ loadbalanceNeededCount = partitionsNeedLoadbalance.size();
+ if (!pendingRecoveryRebalance) {
+ for (Partition partition : partitionsNeedLoadbalance) {
+ Map<String, String> currentStateMap =
+ currentStateOutput.getCurrentStateMap(resourceName, partition);
+ Map<String, String> bestPossibleMap =
+ bestPossiblePartitionStateMap.getPartitionMap(partition);
+ Map<String, String> intermediateMap = new HashMap<String, String>();
+ ;
+
+ Set<String> allInstances = new HashSet<String>(currentStateMap.keySet());
+ allInstances.addAll(bestPossibleMap.keySet());
+
+ boolean throttled = false;
+ if (throttleController
+ .throttleforResource(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+ resourceName)) {
+ throttled = true;
+ logger.debug("Load balance throttled on resource for " + resourceName + " " + partition
+ .getPartitionName());
+ } else {
+ // throttle the load balance if any of the instance can not handle the state transition
+ // TODO: may need finer grained control here.
+ for (String ins : allInstances) {
+ String currentState = currentStateMap.get(ins);
+ String bestPossibleState = bestPossibleMap.get(ins);
+ if (bestPossibleState != null && !bestPossibleState.equals(currentState)) {
+ if (throttleController
+ .throttleForInstance(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+ ins)) {
+ throttled = true;
+ logger.debug(
+ "Load balance throttled because instance " + ins + " for " + resourceName + " "
+ + partition.getPartitionName());
+ }
+ }
+ }
+ }
+
+ if (!throttled) {
+ intermediateMap.putAll(bestPossibleMap);
+ for (String ins : allInstances) {
+ String currentState = currentStateMap.get(ins);
+ String bestPossibleState = bestPossibleMap.get(ins);
+ if (bestPossibleState != null && !bestPossibleState.equals(currentState)) {
+ throttleController
+ .chargeInstance(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, ins);
+ }
+ }
+
+ throttleController
+ .chargeCluster(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE);
+ throttleController
+ .chargeResource(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+ resourceName);
+ } else {
+ intermediateMap.putAll(currentStateMap);
+ loadbalanceThrottledCount++;
+ partitionsLoadbalanceThrottled.add(partition);
+ }
+ output.setState(partition, intermediateMap);
+ }
+ }
+
+ logger.info(String.format(
+ "RecoveryNeeded: %d, RecoveryThrottled: %d, loadbalanceNeeded: %d, loadbalanceThrottled: %d",
+ recoveryNeededCount, recoveryThrottledCount, loadbalanceNeededCount,
+ loadbalanceThrottledCount));
+
+ if (logger.isDebugEnabled()) {
+ logParitionMapState(resourceName, new HashSet(resource.getPartitions()),
+ partitionsNeedRecovery, partitionsRecoveryThrotted, partitionsNeedLoadbalance,
+ partitionsLoadbalanceThrottled, currentStateOutput, bestPossiblePartitionStateMap,
+ output);
+ }
+
+ logger.info("End processing resource:" + resourceName);
+
+ return output;
+ }
+
+ private void logParitionMapState(String resource, Set<Partition> allPartitions,
+ Set<Partition> recoveryPartitions, Set<Partition> recoveryThrottledPartitions,
+ Set<Partition> loadbalancePartitions, Set<Partition> loadbalanceThrottledPartitions,
+ CurrentStateOutput currentStateOutput,
+ PartitionStateMap bestPossibleStateMap,
+ PartitionStateMap intermediateStateMap) {
+
+ logger.debug("Partitions need recovery: " + recoveryPartitions
+ + "\nPartitions get throttled on recovery: " + recoveryThrottledPartitions);
+ logger.debug("Partitions need loadbalance: " + loadbalancePartitions
+ + "\nPartitions get throttled on load-balance: " + loadbalanceThrottledPartitions);
+
+ for (Partition partition : allPartitions) {
+ if (recoveryPartitions.contains(partition)) {
+ logger
+ .debug("recovery balance needed for " + resource + " " + partition.getPartitionName());
+ if (recoveryThrottledPartitions.contains(partition)) {
+ logger.debug("Recovery balance throttled on resource for " + resource + " " + partition
+ .getPartitionName());
+ }
+ } else if (loadbalancePartitions.contains(partition)) {
+ logger.debug("load balance needed for " + resource + " " + partition.getPartitionName());
+ if (loadbalanceThrottledPartitions.contains(partition)) {
+ logger.debug("Load balance throttled on resource for " + resource + " " + partition
+ .getPartitionName());
+ }
+ } else {
+ logger.debug("no balance needed for " + resource + " " + partition.getPartitionName());
+ }
+
+ logger.debug(
+ partition + ": Best possible map: " + bestPossibleStateMap.getPartitionMap(partition));
+ logger.debug(partition + ": Current State: " + currentStateOutput
+ .getCurrentStateMap(resource, partition));
+ logger.debug(partition + ": Pending state: " + currentStateOutput
+ .getPendingMessageMap(resource, partition));
+ logger.debug(
+ partition + ": Intermediate state: " + intermediateStateMap.getPartitionMap(partition));
+ }
+ }
+
+ private boolean needRecoveryRebalance(Map<String, String> bestPossibleMap,
+ StateModelDefinition stateModelDef, Map<String, String> currentStateMap) {
+ boolean recoveryBalanceNeeded = false;
+ List<String> states = stateModelDef.getStatesPriorityList();
+ Map<String, Long> bestPossibleStateCounts = getStateCounts(bestPossibleMap);
+ Map<String, Long> currentStateCounts = getStateCounts(currentStateMap);
+
+ for (String state : states) {
+ Long bestPossibleCount = bestPossibleStateCounts.get(state);
+ Long currentCount = currentStateCounts.get(state);
+
+ if (bestPossibleCount == null && currentCount == null) {
+ continue;
+ } else if (bestPossibleCount == null || currentCount == null ||
+ !bestPossibleCount.equals(currentCount)) {
+ if (!state.equals(HelixDefinedState.DROPPED.name()) &&
+ !state.equals(HelixDefinedState.ERROR.name()) &&
+ !state.equals(stateModelDef.getInitialState())) {
+ recoveryBalanceNeeded = true;
+ break;
+ }
+ }
+ }
+
+ return recoveryBalanceNeeded;
+ }
+
+ /* given instance->state map, return the state counts */
+ private Map<String, Long> getStateCounts(Map<String, String> stateMap) {
+ Map<String, Long> stateCounts = new HashMap<String, Long>();
+ for (String state : stateMap.values()) {
+ if (!stateCounts.containsKey(state)) {
+ stateCounts.put(state, 0L);
+ }
+ stateCounts.put(state, stateCounts.get(state) + 1);
+ }
+ return stateCounts;
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 2f4a331..f5f912e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -51,11 +51,11 @@ public class MessageGenerationPhase extends AbstractBaseStage {
public void process(ClusterEvent event) throws Exception {
HelixManager manager = event.getAttribute("helixmanager");
ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
CurrentStateOutput currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ event.getAttribute(AttributeName.CURRENT_STATE.name());
IntermediateStateOutput intermediateStateOutput =
- event.getAttribute(AttributeName.INTERMEDIATE_STATE.toString());
+ event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
if (manager == null || cache == null || resourceMap == null || currentStateOutput == null
|| intermediateStateOutput == null) {
throw new StageException("Missing attributes in event:" + event
@@ -168,7 +168,7 @@ public class MessageGenerationPhase extends AbstractBaseStage {
} // end of for-each-partition
}
- event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
+ event.addAttribute(AttributeName.MESSAGES_ALL.name(), output);
}
private Message createMessage(HelixManager manager, Resource resource, String partitionName,
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 0bc1905..8e50d83 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -76,11 +76,11 @@ public class MessageSelectionStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
CurrentStateOutput currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ event.getAttribute(AttributeName.CURRENT_STATE.name());
MessageGenerationOutput messageGenOutput =
- event.getAttribute(AttributeName.MESSAGES_ALL.toString());
+ event.getAttribute(AttributeName.MESSAGES_ALL.name());
if (cache == null || resourceMap == null || currentStateOutput == null
|| messageGenOutput == null) {
throw new StageException("Missing attributes in event:" + event
@@ -107,7 +107,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
output.addMessages(resourceName, partition, selectedMessages);
}
}
- event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
+ event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), output);
}
private void increaseStateCnt(Map<String, Bounds> stateConstraints, String state,
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
index 6bf610a..9a764ff 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
@@ -114,8 +114,8 @@ public class MessageThrottleStage extends AbstractBaseStage {
public void process(ClusterEvent event) throws Exception {
ClusterDataCache cache = event.getAttribute("ClusterDataCache");
MessageSelectionStageOutput msgSelectionOutput =
- event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+ Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
if (cache == null || resourceMap == null || msgSelectionOutput == null) {
throw new StageException("Missing attributes in event: " + event
@@ -148,7 +148,7 @@ public class MessageThrottleStage extends AbstractBaseStage {
}
}
- event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output);
+ event.addAttribute(AttributeName.MESSAGES_THROTTLE.name(), output);
}
private List<Message> throttle(Map<String, Integer> throttleMap, ClusterConstraints constraint,
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index 8255cf4..b55a838 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -28,6 +28,7 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.common.PartitionStateMap;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
@@ -48,7 +49,8 @@ public class PersistAssignmentStage extends AbstractBaseStage {
ClusterDataCache cache = event.getAttribute("ClusterDataCache");
ClusterConfig clusterConfig = cache.getClusterConfig();
- if (!clusterConfig.isPersistBestPossibleAssignment()) {
+ if (!clusterConfig.isPersistBestPossibleAssignment() && !clusterConfig
+ .isPersistIntermediateAssignment()) {
return;
}
@@ -86,13 +88,19 @@ public class PersistAssignmentStage extends AbstractBaseStage {
}
}
- Map<Partition, Map<String, String>> bestPossibleAssignements =
- bestPossibleAssignment.getResourceMap(resourceId);
+ PartitionStateMap partitionStateMap =
+ bestPossibleAssignment.getPartitionStateMap(resourceId);
+ if (clusterConfig.isPersistIntermediateAssignment()) {
+ IntermediateStateOutput intermediateAssignment =
+ event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
+ partitionStateMap = intermediateAssignment.getPartitionStateMap(resourceId);
+ }
+
+ Map<Partition, Map<String, String>> assignmentToPersist = partitionStateMap.getStateMap());
- if (bestPossibleAssignements != null && hasInstanceMapChanged(bestPossibleAssignements,
- idealState)) {
- for (Partition partition : bestPossibleAssignements.keySet()) {
- Map<String, String> instanceMap = bestPossibleAssignements.get(partition);
+ if (assignmentToPersist != null && hasInstanceMapChanged(assignmentToPersist, idealState)) {
+ for (Partition partition : assignmentToPersist.keySet()) {
+ Map<String, String> instanceMap = assignmentToPersist.get(partition);
idealState.setInstanceStateMap(partition.getPartitionName(), instanceMap);
}
needPersist = true;
@@ -101,8 +109,7 @@ public class PersistAssignmentStage extends AbstractBaseStage {
if (needPersist) {
// Update instead of set to ensure any intermediate changes that the controller does not update are kept.
accessor.updateProperty(keyBuilder.idealStates(resourceId), new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord current) {
+ @Override public ZNRecord update(ZNRecord current) {
if (current != null) {
// Overwrite MapFields and ListFields items with the same key.
// Note that default merge will keep old values in the maps or lists unchanged, which is not desired.
@@ -117,7 +124,7 @@ public class PersistAssignmentStage extends AbstractBaseStage {
}
long endTime = System.currentTimeMillis();
- LOG.info("END PersistAssignmentStage.process(), took " + (endTime - startTime) + " ms");
+ LOG.info("END PersistAssignmentStage.process() took " + (endTime - startTime) + " ms");
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index bde2904..65b94ab 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -127,7 +127,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
}
}
- event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
+ event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
}
private void addResource(String resource, Map<String, Resource> resourceMap) {
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
index e552797..09cbca6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
@@ -37,7 +37,7 @@ public class ResourceValidationStage extends AbstractBaseStage {
if (cache == null) {
throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
}
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
if (resourceMap == null) {
throw new StageException("Resources must be computed prior to validation!");
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
new file mode 100644
index 0000000..6acfd9e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
@@ -0,0 +1,176 @@
+package org.apache.helix.controller.stages;
+
+import java.util.Set;
+import org.apache.helix.api.config.StateTransitionThrottleConfig;
+import org.apache.helix.model.ClusterConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.log4j.Logger;
+
+/**
+ * Output for IntermediateStateCalStage.
+ */
+class StateTransitionThrottleController {
+ private static Logger logger = Logger.getLogger(StateTransitionThrottleController.class);
+
+ // pending allowed transition counts in the cluster level for recovery and load balance
+ Map<StateTransitionThrottleConfig.RebalanceType, Long> _pendingTransitionAllowedInCluster;
+
+ // pending allowed transition counts for each instance and resource
+ Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>
+ _pendingTransitionAllowedPerInstance;
+ Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>
+ _pendingTransitionAllowedPerResource;
+
+ private boolean _throttleEnabled = false;
+
+ public StateTransitionThrottleController(Set<String> resources, ClusterConfig clusterConfig,
+ Set<String> liveInstances) {
+ super();
+ _pendingTransitionAllowedInCluster =
+ new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>();
+ _pendingTransitionAllowedPerInstance =
+ new HashMap<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>();
+ _pendingTransitionAllowedPerResource =
+ new HashMap<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>();
+
+ if (clusterConfig == null) {
+ logger.warn("Cluster config is not found, no throttle config set!");
+ return;
+ }
+
+ List<StateTransitionThrottleConfig> throttleConfigs =
+ clusterConfig.getStateTransitionThrottleConfigs();
+
+ if (throttleConfigs == null || throttleConfigs.isEmpty()) {
+ logger.info("No throttle config is set!");
+ return;
+ }
+
+ for (StateTransitionThrottleConfig config : throttleConfigs) {
+ switch (config.getThrottleScope()) {
+ case CLUSTER:
+ _pendingTransitionAllowedInCluster
+ .put(config.getRebalanceType(), config.getMaxPartitionInTransition());
+ _throttleEnabled = true;
+ break;
+ case RESOURCE:
+ for (String resource : resources) {
+ if (!_pendingTransitionAllowedPerResource.containsKey(resource)) {
+ _pendingTransitionAllowedPerResource
+ .put(resource, new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>());
+ }
+ _pendingTransitionAllowedPerResource.get(resource)
+ .put(config.getRebalanceType(), config.getMaxPartitionInTransition());
+ }
+ _throttleEnabled = true;
+ break;
+ case INSTANCE:
+ for (String instance : liveInstances) {
+ if (!_pendingTransitionAllowedPerInstance.containsKey(instance)) {
+ _pendingTransitionAllowedPerInstance
+ .put(instance, new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>());
+ }
+ _pendingTransitionAllowedPerInstance.get(instance)
+ .put(config.getRebalanceType(), config.getMaxPartitionInTransition());
+ }
+ _throttleEnabled = true;
+ break;
+ }
+ }
+ }
+
+ /**
+ * Whether any throttle config enabled for this cluster.
+ *
+ * @return
+ */
+ protected boolean isThrottleEnabled() {
+ return _throttleEnabled;
+ }
+
+ /**
+ * Check if state transition on a partition should be throttled.
+ *
+ * @return true if it should be throttled, otherwise, false.
+ */
+ protected boolean throttleforCluster(
+ StateTransitionThrottleConfig.RebalanceType rebalanceType) {
+ Long clusterThrottle = _pendingTransitionAllowedInCluster.get(rebalanceType);
+ if (clusterThrottle != null) {
+ if (clusterThrottle <= 0) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ protected boolean throttleforResource(
+ StateTransitionThrottleConfig.RebalanceType rebalanceType, String resourceName) {
+ if (throttleforCluster(rebalanceType)) {
+ return true;
+ }
+
+ Long resouceThrottle;
+ if (_pendingTransitionAllowedPerResource.containsKey(resourceName)) {
+ resouceThrottle = _pendingTransitionAllowedPerResource.get(resourceName).get(rebalanceType);
+ if (resouceThrottle != null && resouceThrottle <= 0) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ protected boolean throttleForInstance(
+ StateTransitionThrottleConfig.RebalanceType rebalanceType, String instanceName) {
+ if (throttleforCluster(rebalanceType)) {
+ return true;
+ }
+
+ Long instanceThrottle;
+ if (_pendingTransitionAllowedPerInstance.containsKey(instanceName)) {
+ instanceThrottle = _pendingTransitionAllowedPerInstance.get(instanceName).get(rebalanceType);
+ if (instanceThrottle != null && instanceThrottle <= 0) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ protected void chargeCluster(StateTransitionThrottleConfig.RebalanceType rebalanceType) {
+ if (_pendingTransitionAllowedInCluster.containsKey(rebalanceType)) {
+ Long clusterThrottle = _pendingTransitionAllowedInCluster.get(rebalanceType);
+ if (clusterThrottle > 0) {
+ _pendingTransitionAllowedInCluster.put(rebalanceType, clusterThrottle - 1);
+ }
+ }
+ }
+
+ protected void chargeResource(StateTransitionThrottleConfig.RebalanceType rebalanceType,
+ String resource) {
+ if (_pendingTransitionAllowedPerResource.containsKey(resource)
+ && _pendingTransitionAllowedPerResource.get(resource).containsKey(rebalanceType)) {
+ Long resouceThrottle = _pendingTransitionAllowedPerResource.get(resource).get(rebalanceType);
+ if (resouceThrottle > 0) {
+ _pendingTransitionAllowedPerResource.get(resource).put(rebalanceType, resouceThrottle - 1);
+ }
+ }
+ }
+
+ protected void chargeInstance(StateTransitionThrottleConfig.RebalanceType rebalanceType,
+ String instance) {
+ if (_pendingTransitionAllowedPerInstance.containsKey(instance)
+ && _pendingTransitionAllowedPerInstance.get(instance).containsKey(rebalanceType)) {
+ Long instanceThrottle = _pendingTransitionAllowedPerInstance.get(instance).get(rebalanceType);
+ if (instanceThrottle > 0) {
+ _pendingTransitionAllowedPerInstance.get(instance).put(rebalanceType, instanceThrottle - 1);
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index c466bc6..8aed23e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -48,9 +48,9 @@ public class TaskAssignmentStage extends AbstractBaseStage {
logger.info("START TaskAssignmentStage.process()");
HelixManager manager = event.getAttribute("helixmanager");
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
MessageThrottleStageOutput messageOutput =
- event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+ event.getAttribute(AttributeName.MESSAGES_THROTTLE.name());
ClusterDataCache cache = event.getAttribute("ClusterDataCache");
Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
index 2b4cfb2..38b74cb 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -27,9 +27,7 @@ import org.I0Itec.zkclient.DataUpdater;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index f679b3f..79bb6fa 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -36,8 +36,9 @@ public class ClusterConfig extends HelixProperty {
*/
public enum ClusterConfigProperty {
HELIX_DISABLE_PIPELINE_TRIGGERS,
- TOPOLOGY, // cluster topology definition, for example, "/zone/rack/host/instance"
PERSIST_BEST_POSSIBLE_ASSIGNMENT,
+ PERSIST_INTERMEDIATE_ASSIGNMENT,
+ TOPOLOGY, // cluster topology definition, for example, "/zone/rack/host/instance"
FAULT_ZONE_TYPE, // the type in which isolation should be applied on when Helix places the replicas from same partition.
DELAY_REBALANCE_DISABLED, // enabled the delayed rebalaning in case node goes offline.
DELAY_REBALANCE_TIME, // delayed time in ms that the delay time Helix should hold until rebalancing.
@@ -91,9 +92,28 @@ public class ClusterConfig extends HelixProperty {
}
/**
+ * Whether to persist IntermediateAssignment in a resource's idealstate.
+ *
+ * @return
+ */
+ public Boolean isPersistIntermediateAssignment() {
+ return _record
+ .getBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(), false);
+ }
+
+ /**
+ * Enable/Disable persist IntermediateAssignment in a resource's idealstate.
*
* @return
*/
+ public void setPersistIntermediateAssignment(Boolean enable) {
+ if (enable == null) {
+ _record.getSimpleFields().remove(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString());
+ } else {
+ _record.setBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(), enable);
+ }
+ }
+
public Boolean isPipelineTriggersDisabled() {
return _record
.getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), false);
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
index 03c79d2..179f89a 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
@@ -28,11 +28,14 @@ import org.apache.helix.manager.zk.ZkClient;
*/
/**
- * This class is deprecated, please use BestPossibleExternalViewVerifier in tools.ClusterVerifiers instead.
+ * This class is deprecated, please use BestPossibleExternalViewVerifier in tools.ClusterVerifiers
+ * instead.
*/
@Deprecated
-public class ClusterExternalViewVerifier extends org.apache.helix.tools.ClusterVerifiers.ClusterExternalViewVerifier {
- public ClusterExternalViewVerifier(ZkClient zkclient, String clusterName, List<String> expectLiveNodes) {
+public class ClusterExternalViewVerifier
+ extends org.apache.helix.tools.ClusterVerifiers.ClusterExternalViewVerifier {
+ public ClusterExternalViewVerifier(ZkClient zkclient, String clusterName,
+ List<String> expectLiveNodes) {
super(zkclient, clusterName, expectLiveNodes);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index fc87dca..576b2fe 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -24,5 +24,6 @@ package org.apache.helix.tools;
* please use dedicated verifier classes, such as BestPossibleExternViewVerifier, etc, in tools.ClusterVerifiers
*/
@Deprecated
-public class ClusterStateVerifier extends org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier{
+public class ClusterStateVerifier
+ extends org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier {
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 6c79bed..2b6d92c 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -354,7 +354,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
runStage(event, new BestPossibleStateCalcStage());
BestPossibleStateOutput output =
- event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
return output;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterExternalViewVerifier.java
index fa697c4..933acc2 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterExternalViewVerifier.java
@@ -101,7 +101,7 @@ public class ClusterExternalViewVerifier extends ClusterVerifier {
runStage(event, stage);
}
- return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+ return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterStateVerifier.java
index d2a2d09..eace66f 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterStateVerifier.java
@@ -247,7 +247,7 @@ public class ClusterStateVerifier {
bestPossStateMap.get(resourceName).put(partition, new HashMap<String, String>());
}
bestPossStateMap.get(resourceName).get(partition)
- .put(instanceName, HelixDefinedState.ERROR.toString());
+ .put(instanceName, HelixDefinedState.ERROR.name());
}
}
}
@@ -281,7 +281,7 @@ public class ClusterStateVerifier {
while (insIter.hasNext()) {
Map.Entry<String, String> insEntry = insIter.next();
String state = insEntry.getValue();
- if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
+ if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.name())) {
insIter.remove();
}
}
@@ -351,7 +351,7 @@ public class ClusterStateVerifier {
// Filter resources if specified
if (resources != null) {
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
resourceMap.keySet().retainAll(resources);
}
@@ -359,7 +359,7 @@ public class ClusterStateVerifier {
runStage(event, bpStage);
BestPossibleStateOutput output =
- event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
// System.out.println("output:" + output);
return output;
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
index 33570a0..4ea93ac 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
@@ -53,8 +53,8 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
Map<String, Resource> resourceMap = getResourceMap();
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
- event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
- event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+ event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
ReadClusterDataStage stage1 = new ReadClusterDataStage();
runStage(event, stage1);
@@ -62,7 +62,7 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
runStage(event, stage2);
BestPossibleStateOutput output =
- event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
for (int p = 0; p < 5; p++) {
Partition resource = new Partition("testResourceName_" + p);
AssertJUnit.assertEquals("MASTER", output.getInstanceStateMap("testResourceName", resource)
@@ -86,8 +86,8 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
Map<String, Resource> resourceMap = getResourceMap();
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
- event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
- event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+ event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
ReadClusterDataStage stage1 = new ReadClusterDataStage();
runStage(event, stage1);
@@ -95,7 +95,7 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
runStage(event, stage2);
BestPossibleStateOutput output =
- event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
for (int p = 0; p < 5; p++) {
Partition resource = new Partition("testResourceName_" + p);
AssertJUnit.assertNull(output.getInstanceStateMap("testResourceName", resource).get(
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
index 82c7b37..43e0e07 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
@@ -49,8 +49,8 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest {
Map<String, Resource> resourceMap = getResourceMap();
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
- event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
- event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+ event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
ReadClusterDataStage stage1 = new ReadClusterDataStage();
runStage(event, stage1);
@@ -58,7 +58,7 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest {
runStage(event, stage2);
BestPossibleStateOutput output =
- event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
for (int p = 0; p < 5; p++) {
Partition resource = new Partition("testResourceName_" + p);
AssertJUnit.assertEquals("MASTER", output.getInstanceStateMap("testResourceName", resource)
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index c5f54a5..ac1f262 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -39,11 +39,11 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
@Test
public void testEmptyCS() {
Map<String, Resource> resourceMap = getResourceMap();
- event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
+ event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
CurrentStateComputationStage stage = new CurrentStateComputationStage();
runStage(event, new ReadClusterDataStage());
runStage(event, stage);
- CurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ CurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.name());
AssertJUnit.assertEquals(
output.getCurrentStateMap("testResourceName", new Partition("testResourceName_0")).size(),
0);
@@ -56,11 +56,11 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
setupLiveInstances(5);
- event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
+ event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
CurrentStateComputationStage stage = new CurrentStateComputationStage();
runStage(event, new ReadClusterDataStage());
runStage(event, stage);
- CurrentStateOutput output1 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ CurrentStateOutput output1 = event.getAttribute(AttributeName.CURRENT_STATE.name());
AssertJUnit.assertEquals(
output1.getCurrentStateMap("testResourceName", new Partition("testResourceName_0")).size(),
0);
@@ -79,7 +79,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
runStage(event, new ReadClusterDataStage());
runStage(event, stage);
- CurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ CurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.name());
String pendingState =
output2.getPendingState("testResourceName", new Partition("testResourceName_1"),
"localhost_3").getToState();
@@ -104,7 +104,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
stateWithDeadSession);
runStage(event, new ReadClusterDataStage());
runStage(event, stage);
- CurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ CurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.name());
String currentState =
output3.getCurrentState("testResourceName", new Partition("testResourceName_1"),
"localhost_3");
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index 3a321cc..965e0de 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -114,12 +114,12 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
selectMessages.add(msg);
msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
- event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
+ event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), msgSelectOutput);
runStage(event, throttleStage);
MessageThrottleStageOutput msgThrottleOutput =
- event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+ event.getAttribute(AttributeName.MESSAGES_THROTTLE.name());
Assert.assertEquals(msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0")).size(),
1);
@@ -298,12 +298,12 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
selectMessages.add(msg6); // should be throttled
msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
- event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
+ event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), msgSelectOutput);
runStage(event, throttleStage);
MessageThrottleStageOutput msgThrottleOutput =
- event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+ event.getAttribute(AttributeName.MESSAGES_THROTTLE.name());
List<Message> throttleMessages =
msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0"));
Assert.assertEquals(throttleMessages.size(), 4);
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 18abf75..a6863ca 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -97,7 +97,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
MessageSelectionStageOutput msgSelOutput =
- event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
List<Message> messages =
msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 1, "Should output 1 message: OFFLINE-SLAVE for node0");
@@ -113,7 +113,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
- msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node1");
@@ -249,7 +249,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
MessageSelectionStageOutput msgSelOutput =
- event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
List<Message> messages =
msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 1, "Should output 1 message: OFFLINE-SLAVE for node0");
@@ -267,7 +267,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
- msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 1,
"Should output only 1 message: OFFLINE->DROPPED for localhost_1");
@@ -284,7 +284,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
accessor.removeProperty(keyBuilder.message("localhost_0", msgIds.get(0)));
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
- msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 1,
"Should output 1 message: OFFLINE->DROPPED for localhost_0");
@@ -345,7 +345,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
MessageSelectionStageOutput msgSelOutput =
- event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
List<Message> messages =
msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 1, "Should output 1 message: SLAVE-MASTER for node1");
@@ -364,7 +364,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
- msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node0");
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
index dcb955c..87c0516 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
@@ -69,7 +69,7 @@ public class TestResourceComputationStage extends BaseStageTest {
runStage(event, new ReadClusterDataStage());
runStage(event, stage);
- Map<String, Resource> resource = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<String, Resource> resource = event.getAttribute(AttributeName.RESOURCES.name());
AssertJUnit.assertEquals(1, resource.size());
AssertJUnit.assertEquals(resource.keySet().iterator().next(), resourceName);
@@ -91,7 +91,7 @@ public class TestResourceComputationStage extends BaseStageTest {
runStage(event, new ReadClusterDataStage());
runStage(event, stage);
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
AssertJUnit.assertEquals(resources.length, resourceMap.size());
for (int i = 0; i < resources.length; i++) {
@@ -157,7 +157,7 @@ public class TestResourceComputationStage extends BaseStageTest {
runStage(event, new ReadClusterDataStage());
runStage(event, stage);
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
// +1 because it will have one for current state
AssertJUnit.assertEquals(resources.length + 1, resourceMap.size());
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
index 15d7fd8..9c86372 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
@@ -70,14 +70,14 @@ public class TestResourceValidationStage {
// run resource computation
new ResourceComputationStage().process(event);
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
Assert.assertTrue(resourceMap.containsKey(masterSlaveCustomResource));
Assert.assertTrue(resourceMap.containsKey(onlineOfflineFullAutoResource));
Assert.assertTrue(resourceMap.containsKey(masterSlaveSemiAutoInvalidResource));
// run resource validation
new ResourceValidationStage().process(event);
- Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.name());
Assert.assertTrue(finalResourceMap.containsKey(masterSlaveCustomResource));
Assert.assertTrue(finalResourceMap.containsKey(onlineOfflineFullAutoResource));
Assert.assertFalse(finalResourceMap.containsKey(masterSlaveSemiAutoInvalidResource));
@@ -102,12 +102,12 @@ public class TestResourceValidationStage {
// run resource computation
new ResourceComputationStage().process(event);
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
Assert.assertTrue(resourceMap.containsKey(masterSlaveCustomResource));
// run resource validation
new ResourceValidationStage().process(event);
- Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.name());
Assert.assertTrue(finalResourceMap.containsKey(masterSlaveCustomResource));
}
@@ -132,13 +132,13 @@ public class TestResourceValidationStage {
// run resource computation
new ResourceComputationStage().process(event);
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
Assert.assertTrue(resourceMap.containsKey(masterSlaveCustomResource));
Assert.assertTrue(resourceMap.containsKey(leaderStandbyCustomResource));
// run resource validation
new ResourceValidationStage().process(event);
- Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.name());
Assert.assertTrue(finalResourceMap.containsKey(masterSlaveCustomResource));
Assert.assertFalse(finalResourceMap.containsKey(leaderStandbyCustomResource));
}
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
index f9bbc94..d21706c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -90,7 +90,6 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBase {
new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
participant.syncStart();
_participants[i] = participant;
-
}
// start controller
@@ -140,7 +139,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBase {
ClusterSetup.processCommandLineArgs(command.split(" "));
TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView", 30 * 1000, CLUSTER_NAME, "MyDB2",
- TestHelper.<String> setOf("localhost_12918", "localhost_12919", "localhost_12920",
+ TestHelper.<String>setOf("localhost_12918", "localhost_12919", "localhost_12920",
"localhost_12921", "localhost_12922"), ZK_ADDR);
}