You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/10/03 06:01:25 UTC

[2/3] helix git commit: Add state transition throttling logic into intermediateStateCalcStage.

Add state transition throttling logic into intermediateStateCalcStage.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4e487196
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4e487196
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4e487196

Branch: refs/heads/master
Commit: 4e4871967db07cee191debb9d26bfcd53c401945
Parents: 79ebc04
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Jan 6 16:31:38 2017 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Oct 2 19:06:26 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/ConfigAccessor.java   |  14 +-
 .../config/StateTransitionThrottleConfig.java   | 162 ++---------
 .../stages/BestPossibleStateCalcStage.java      |   4 +-
 .../stages/BestPossibleStateOutput.java         |   3 +-
 .../stages/CurrentStateComputationStage.java    |   4 +-
 .../stages/ExternalViewComputeStage.java        |   4 +-
 .../stages/IntermediateStateCalcStage.java      | 291 ++++++++++++++++++-
 .../stages/MessageGenerationPhase.java          |   8 +-
 .../stages/MessageSelectionStage.java           |   8 +-
 .../controller/stages/MessageThrottleStage.java |   6 +-
 .../stages/PersistAssignmentStage.java          |  27 +-
 .../stages/ResourceComputationStage.java        |   2 +-
 .../stages/ResourceValidationStage.java         |   2 +-
 .../StateTransitionThrottleController.java      | 176 +++++++++++
 .../controller/stages/TaskAssignmentStage.java  |   4 +-
 .../org/apache/helix/manager/zk/ZKUtil.java     |   2 -
 .../org/apache/helix/model/ClusterConfig.java   |  22 +-
 .../tools/ClusterExternalViewVerifier.java      |   9 +-
 .../helix/tools/ClusterStateVerifier.java       |   3 +-
 .../BestPossibleExternalViewVerifier.java       |   2 +-
 .../ClusterExternalViewVerifier.java            |   2 +-
 .../ClusterVerifiers/ClusterStateVerifier.java  |   8 +-
 .../TestBestPossibleCalcStageCompatibility.java |  12 +-
 .../stages/TestBestPossibleStateCalcStage.java  |   6 +-
 .../TestCurrentStateComputationStage.java       |  12 +-
 .../stages/TestMessageThrottleStage.java        |   8 +-
 .../stages/TestRebalancePipeline.java           |  14 +-
 .../stages/TestResourceComputationStage.java    |   6 +-
 .../stages/TestResourceValidationStage.java     |  12 +-
 .../helix/integration/TestAutoRebalance.java    |   3 +-
 .../TestPartitionMovementThrottle.java          | 283 ++++++++++++++++++
 .../helix/integration/task/TaskTestUtil.java    |   2 +-
 32 files changed, 885 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index 27a30cb..5970de0 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -162,20 +162,14 @@ public class ConfigAccessor {
       LOG.error("fail to get configs. invalid config scope. scope: " + scope + ", keys: " + keys);
       return null;
     }
+    ZNRecord record = getConfigZnRecord(scope);
 
-    String clusterName = scope.getClusterName();
-    if (!ZKUtil.isClusterSetup(clusterName, zkClient)) {
-      throw new HelixException("fail to get configs. cluster " + clusterName + " is not setup yet");
-    }
-
-    Map<String, String> map = new HashMap<String, String>();
-
-    ZNRecord record = zkClient.readData(scope.getZkPath(), true);
     if (record == null) {
       LOG.warn("No config found at " + scope.getZkPath());
       return null;
     }
 
+    Map<String, String> map = new HashMap<String, String>();
     String mapKey = scope.getMapKey();
     if (mapKey == null) {
       for (String key : keys) {
@@ -304,8 +298,8 @@ public class ConfigAccessor {
       }
     }
 
-    String zkPath = scope.getZkPath();
     String mapKey = scope.getMapKey();
+    String zkPath = scope.getZkPath();
     String id = zkPath.substring(zkPath.lastIndexOf('/') + 1);
     ZNRecord update = new ZNRecord(id);
     if (mapKey == null) {
@@ -313,6 +307,7 @@ public class ConfigAccessor {
     } else {
       update.setMapField(mapKey, keyValueMap);
     }
+
     ZKUtil.createOrUpdate(zkClient, zkPath, update, true, true);
     return;
   }
@@ -620,6 +615,7 @@ public class ConfigAccessor {
   }
 
   /**
+<<<<<<< HEAD
    * Set config of the given resource.
    * The current Resource config will be replaced with the given clusterConfig.
    *

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
index 39bd458..1ca25c5 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
@@ -35,128 +35,42 @@ public class StateTransitionThrottleConfig {
   private enum ConfigProperty {
     CONFIG_TYPE,
     REBALANCE_TYPE,
-    THROTTLE_SCOPE
+    THROTTLE_SCOPE,
+    MAX_PARTITION_IN_TRANSITION
   }
 
   public enum ThrottleScope {
     CLUSTER,
     RESOURCE,
-    INSTANCE,
-    PARTITION
+    INSTANCE
   }
 
   public enum RebalanceType {
     LOAD_BALANCE,
-    RECOVERY_BALANCE,
-    ANY
+    RECOVERY_BALANCE
   }
 
-  public static class StateTransitionType {
-    final static String ANY_STATE = "*";
-    final static String FROM_KEY = "from";
-    final static String TO_KEY = "to";
-    String _fromState;
-    String _toState;
+  RebalanceType _rebalanceType;
+  ThrottleScope _throttleScope;
+  Long _maxPartitionInTransition;
 
-    StateTransitionType(String fromState, String toState) {
-      _fromState = fromState;
-      _toState = toState;
-    }
-
-    @Override
-    public String toString() {
-      return FROM_KEY + "." + _fromState + "." + TO_KEY + "." + _toState;
-    }
-
-    public static StateTransitionType parseFromString(String stateTransTypeStr) {
-      String states[] = stateTransTypeStr.split(".");
-      if (states.length < 4 || !states[0].equalsIgnoreCase(FROM_KEY) || !states[2]
-          .equalsIgnoreCase(TO_KEY)) {
-        return null;
-      }
-      return new StateTransitionType(states[1], states[3]);
-    }
-  }
-
-  private ThrottleScope _throttleScope;
-  private RebalanceType _rebalanceType;
-  private Map<StateTransitionType, Long> _maxPendingStateTransitionMap;
-
-  public StateTransitionThrottleConfig(RebalanceType rebalanceType, ThrottleScope throttleScope) {
+  public StateTransitionThrottleConfig(RebalanceType rebalanceType,
+      ThrottleScope throttleScope, long maxPartitionInTransition) {
     _rebalanceType = rebalanceType;
     _throttleScope = throttleScope;
-    _maxPendingStateTransitionMap = new HashMap<StateTransitionType, Long>();
+    _maxPartitionInTransition = maxPartitionInTransition;
   }
 
-  /**
-   * Add a max pending transition from given from state to the specified to state.
-   *
-   * @param fromState
-   * @param toState
-   * @param maxPendingStateTransition
-   * @return
-   */
-  public StateTransitionThrottleConfig addThrottle(String fromState, String toState,
-      long maxPendingStateTransition) {
-    _maxPendingStateTransitionMap
-        .put(new StateTransitionType(fromState, toState), maxPendingStateTransition);
-    return this;
+  public RebalanceType getRebalanceType() {
+    return _rebalanceType;
   }
 
-  /**
-   * Add a max pending transition from ANY state to ANY state.
-   *
-   * @param maxPendingStateTransition
-   * @return
-   */
-  public StateTransitionThrottleConfig addThrottle(long maxPendingStateTransition) {
-    _maxPendingStateTransitionMap
-        .put(new StateTransitionType(StateTransitionType.ANY_STATE, StateTransitionType.ANY_STATE),
-            maxPendingStateTransition);
-    return this;
+  public ThrottleScope getThrottleScope() {
+    return _throttleScope;
   }
 
-  /**
-   * Add a max pending transition for a given state transition type.
-   *
-   * @param stateTransitionType
-   * @param maxPendingStateTransition
-   * @return
-   */
-  public StateTransitionThrottleConfig addThrottle(StateTransitionType stateTransitionType,
-      long maxPendingStateTransition) {
-    _maxPendingStateTransitionMap.put(stateTransitionType, maxPendingStateTransition);
-    return this;
-  }
-
-  /**
-   * Add a max pending transition from ANY state to the specified state.
-   *
-   * @param toState
-   * @param maxPendingStateTransition
-   * @return
-   */
-  public StateTransitionThrottleConfig addThrottleFromAnyState(String toState,
-      long maxPendingStateTransition) {
-    _maxPendingStateTransitionMap
-        .put(new StateTransitionType(StateTransitionType.ANY_STATE, toState),
-            maxPendingStateTransition);
-    return this;
-  }
-
-  /**
-   * Add a max pending transition from given state to ANY state.
-   *
-   * @param fromState
-   * @param maxPendingStateTransition
-   * @return
-   */
-  public StateTransitionThrottleConfig addThrottleToAnyState(String fromState,
-      long maxPendingStateTransition) {
-    _maxPendingStateTransitionMap
-        .put(new StateTransitionType(fromState, StateTransitionType.ANY_STATE),
-            maxPendingStateTransition);
-    return this;
+  public Long getMaxPartitionInTransition() {
+    return _maxPartitionInTransition;
   }
 
   private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -167,21 +81,18 @@ public class StateTransitionThrottleConfig {
    * @return Json String for this config.
    */
   public String toJSON() {
-    Map<String, String> configsMap = new HashMap<String, String>();
-
-    configsMap.put(ConfigProperty.REBALANCE_TYPE.name(), _rebalanceType.name());
-    configsMap.put(ConfigProperty.THROTTLE_SCOPE.name(), _throttleScope.name());
-
-    for (Map.Entry<StateTransitionType, Long> e : _maxPendingStateTransitionMap.entrySet()) {
-      configsMap.put(e.getKey().toString(), String.valueOf(e.getValue()));
-    }
+    Map<String, String> configMap = new HashMap<String, String>();
+    configMap.put(ConfigProperty.REBALANCE_TYPE.name(), _rebalanceType.name());
+    configMap.put(ConfigProperty.THROTTLE_SCOPE.name(), _throttleScope.name());
+    configMap.put(ConfigProperty.MAX_PARTITION_IN_TRANSITION.name(),
+        String.valueOf(_maxPartitionInTransition));
 
     String jsonStr = null;
     try {
       ObjectWriter objectWriter = OBJECT_MAPPER.writer();
-      jsonStr = objectWriter.writeValueAsString(configsMap);
+      jsonStr = objectWriter.writeValueAsString(configMap);
     } catch (IOException e) {
-      logger.error("Failed to convert config map to JSON object! " + configsMap);
+      logger.error("Failed to convert config map to JSON object! " + configMap);
     }
 
     return jsonStr;
@@ -206,16 +117,17 @@ public class StateTransitionThrottleConfig {
     return throttleConfig;
   }
 
-
   /**
    * Instantiate a throttle config from a config map
    *
    * @param configsMap
-   * @return StateTransitionThrottleConfig or null if the given configs map is not a valid StateTransitionThrottleConfig.
+   *
+   * @return StateTransitionThrottleConfig or null if the given configs map is not a valid
+   * StateTransitionThrottleConfig.
    */
   public static StateTransitionThrottleConfig fromConfigMap(Map<String, String> configsMap) {
-    if (!configsMap.containsKey(ConfigProperty.REBALANCE_TYPE.name()) ||
-        !configsMap.containsKey(ConfigProperty.THROTTLE_SCOPE.name())) {
+    if (!configsMap.containsKey(ConfigProperty.REBALANCE_TYPE.name()) || !configsMap
+        .containsKey(ConfigProperty.THROTTLE_SCOPE.name())) {
       // not a valid StateTransitionThrottleConfig
       return null;
     }
@@ -226,25 +138,13 @@ public class StateTransitionThrottleConfig {
           RebalanceType.valueOf(configsMap.get(ConfigProperty.REBALANCE_TYPE.name()));
       ThrottleScope throttleScope =
           ThrottleScope.valueOf(configsMap.get(ConfigProperty.THROTTLE_SCOPE.name()));
-      config = new StateTransitionThrottleConfig(rebalanceType, throttleScope);
+      Long maxPartition =
+          Long.valueOf(configsMap.get(ConfigProperty.MAX_PARTITION_IN_TRANSITION.name()));
+      config = new StateTransitionThrottleConfig(rebalanceType, throttleScope, maxPartition);
     } catch (IllegalArgumentException ex) {
       return null;
     }
 
-    for (String configKey : configsMap.keySet()) {
-      StateTransitionType transitionType = StateTransitionType.parseFromString(configKey);
-      if (transitionType != null) {
-        try {
-          long value = Long.valueOf(configsMap.get(configKey));
-          config.addThrottle(transitionType, value);
-        } catch (NumberFormatException ex) {
-          // ignore the config item with invalid number.
-          logger.warn(String.format("Invalid config entry, key=%s, value=%s", configKey,
-              configsMap.get(configKey)));
-        }
-      }
-    }
-
     return config;
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index fbb7f86..0a13a8d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -57,8 +57,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     logger.info("START BestPossibleStateCalcStage.process()");
 
     CurrentStateOutput currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+        event.getAttribute(AttributeName.CURRENT_STATE.name());
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     ClusterDataCache cache = event.getAttribute("ClusterDataCache");
 
     if (currentStateOutput == null || resourceMap == null || cache == null) {

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
index c64c8cf..9b5faea 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
@@ -40,7 +40,8 @@ public class BestPossibleStateOutput extends ResourcesStateMap {
    * @return
    */
   // TODO: remove this.
-  @Deprecated public Map<Partition, Map<String, String>> getResourceMap(String resourceName) {
+  @Deprecated
+  public Map<Partition, Map<String, String>> getResourceMap(String resourceName) {
     PartitionStateMap map = _resourceStateMap.get(resourceName);
     if (map != null) {
       return map.getStateMap();

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 624698d..0dd4165 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -40,7 +40,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
     ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
 
     if (cache == null || resourceMap == null) {
       throw new StageException("Missing attributes in event:" + event
@@ -126,6 +126,6 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
         }
       }
     }
-    event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index d83518d..5eaf08a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -59,7 +59,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
     LOG.info("START ExternalViewComputeStage.process()");
 
     HelixManager manager = event.getAttribute("helixmanager");
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     ClusterDataCache cache = event.getAttribute("ClusterDataCache");
 
     if (manager == null || resourceMap == null || cache == null) {
@@ -71,7 +71,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
     PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
 
     CurrentStateOutput currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+        event.getAttribute(AttributeName.CURRENT_STATE.name());
 
     List<ExternalView> newExtViews = new ArrayList<ExternalView>();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 5a13c7a..babc938 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -19,31 +19,41 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.api.config.StateTransitionThrottleConfig;
+import org.apache.helix.controller.common.PartitionStateMap;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
- * For partition compute the Intermediate State (instance,state) pair based on
- * the BestPossible State and Current State, with all constraints applied (such as state transition throttling).
+ * For partition compute the Intermediate State (instance,state) pair based on the BestPossible
+ * State and Current State, with all constraints applied (such as state transition throttling).
  */
 public class IntermediateStateCalcStage extends AbstractBaseStage {
   private static final Logger logger = Logger.getLogger(IntermediateStateCalcStage.class.getName());
 
-  @Override
-  public void process(ClusterEvent event) throws Exception {
+  @Override public void process(ClusterEvent event) throws Exception {
     long startTime = System.currentTimeMillis();
     logger.info("START Intermediate.process()");
 
     CurrentStateOutput currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+        event.getAttribute(AttributeName.CURRENT_STATE.name());
 
     BestPossibleStateOutput bestPossibleStateOutput =
-        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     ClusterDataCache cache = event.getAttribute("ClusterDataCache");
 
     if (currentStateOutput == null || bestPossibleStateOutput == null || resourceMap == null
@@ -53,26 +63,281 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     }
 
     IntermediateStateOutput immediateStateOutput =
-        compute(event, resourceMap, currentStateOutput, bestPossibleStateOutput);
+        compute(cache, resourceMap, currentStateOutput, bestPossibleStateOutput);
+
     event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), immediateStateOutput);
 
     long endTime = System.currentTimeMillis();
     logger.info("END ImmediateStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
   }
 
-  private IntermediateStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
-      CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput) {
+  private IntermediateStateOutput compute(ClusterDataCache dataCache,
+      Map<String, Resource> resourceMap, CurrentStateOutput currentStateOutput,
+      BestPossibleStateOutput bestPossibleStateOutput) {
     // for each resource
     // get the best possible state and current state
     // try to bring immediate state close to best possible state until
     // the possible pending state transition numbers reach the set throttle number.
     IntermediateStateOutput output = new IntermediateStateOutput();
 
-    // TODO: add throttling logic here.
+    StateTransitionThrottleController throttleController =
+        new StateTransitionThrottleController(resourceMap.keySet(), dataCache.getClusterConfig(),
+            dataCache.getLiveInstances().keySet());
+
     for (String resourceName : resourceMap.keySet()) {
-      logger.debug("Processing resource:" + resourceName);
-      output.setState(resourceName, bestPossibleStateOutput.getPartitionStateMap(resourceName));
+      PartitionStateMap intermediatePartitionStateMap =
+          computeIntermediatePartitionState(dataCache, dataCache.getIdealState(resourceName),
+              resourceMap.get(resourceName), currentStateOutput,
+              bestPossibleStateOutput.getPartitionStateMap(resourceName), throttleController);
+      output.setState(resourceName, intermediatePartitionStateMap);
     }
     return output;
   }
+
+  public PartitionStateMap computeIntermediatePartitionState(ClusterDataCache cache,
+      IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput,
+      PartitionStateMap bestPossiblePartitionStateMap,
+      StateTransitionThrottleController throttleController) {
+    String resourceName = resource.getResourceName();
+    logger.info("Processing resource:" + resourceName);
+
+    if (!throttleController.isThrottleEnabled()) {
+      logger.info("None of any type of transition throttling is set for resource " + resourceName
+          + " skip computing intermediate partition state.");
+      return bestPossiblePartitionStateMap;
+    }
+
+    String stateModelDefName = idealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+
+    boolean pendingRecoveryRebalance = false;
+
+    // check and charge pending transitions
+    for (Partition partition : resource.getPartitions()) {
+      Map<String, String> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resourceName, partition);
+      Map<String, String> pendingMap =
+          currentStateOutput.getPendingStateMap(resourceName, partition);
+      Map<String, String> bestPossibleMap =
+          bestPossiblePartitionStateMap.getPartitionMap(partition);
+
+      StateTransitionThrottleConfig.RebalanceType rebalanceType;
+      if (needRecoveryRebalance(bestPossibleMap, stateModelDef, currentStateMap)) {
+        rebalanceType = StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE;
+        pendingRecoveryRebalance = true;
+      } else {
+        rebalanceType = StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE;
+      }
+
+      if (pendingMap.size() > 0) {
+        throttleController.chargeCluster(rebalanceType);
+        throttleController.chargeResource(rebalanceType, resourceName);
+      }
+
+      Set<String> allInstances = new HashSet<String>(currentStateMap.keySet());
+      allInstances.addAll(pendingMap.keySet());
+
+      for (String ins : allInstances) {
+        String currentState = currentStateMap.get(ins);
+        String pendingState = pendingMap.get(ins);
+        if (pendingState != null && !pendingState.equals(currentState)) {
+          throttleController.chargeInstance(rebalanceType, ins);
+        }
+      }
+    }
+
+    PartitionStateMap output = new PartitionStateMap(resourceName);
+
+    int recoveryNeededCount = 0, recoveryThrottledCount = 0;
+    int loadbalanceNeededCount = 0, loadbalanceThrottledCount = 0;
+
+    Set<Partition> partitionsNeedRecovery = new HashSet<Partition>();
+    Set<Partition> partitionsNeedLoadbalance = new HashSet<Partition>();
+    Set<Partition> partitionsRecoveryThrotted = new HashSet<Partition>();
+    Set<Partition> partitionsLoadbalanceThrottled = new HashSet<Partition>();
+
+    // check recovery rebalance
+    for (Partition partition : resource.getPartitions()) {
+      Map<String, String> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resourceName, partition);
+      Map<String, String> bestPossibleMap =
+          bestPossiblePartitionStateMap.getPartitionMap(partition);
+      Map<String, String> intermediateMap = new HashMap<String, String>();
+
+      if (currentStateMap.equals(bestPossibleMap)) {
+        // no rebalance needed.
+        intermediateMap.putAll(bestPossibleMap);
+      } else if (needRecoveryRebalance(bestPossibleMap, stateModelDef, currentStateMap)) {
+        //TODO: add throttling on recovery balance
+        recoveryNeededCount++;
+        intermediateMap.putAll(bestPossibleMap);
+        pendingRecoveryRebalance = true;
+        partitionsNeedRecovery.add(partition);
+      } else {
+        partitionsNeedLoadbalance.add(partition);
+      }
+      output.setState(partition, intermediateMap);
+    }
+
+    // perform load balance only if no partition need recovery rebalance.
+    loadbalanceNeededCount = partitionsNeedLoadbalance.size();
+    if (!pendingRecoveryRebalance) {
+      for (Partition partition : partitionsNeedLoadbalance) {
+        Map<String, String> currentStateMap =
+            currentStateOutput.getCurrentStateMap(resourceName, partition);
+        Map<String, String> bestPossibleMap =
+            bestPossiblePartitionStateMap.getPartitionMap(partition);
+        Map<String, String> intermediateMap = new HashMap<String, String>();
+        ;
+
+        Set<String> allInstances = new HashSet<String>(currentStateMap.keySet());
+        allInstances.addAll(bestPossibleMap.keySet());
+
+        boolean throttled = false;
+        if (throttleController
+            .throttleforResource(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+                resourceName)) {
+          throttled = true;
+          logger.debug("Load balance throttled on resource for " + resourceName + " " + partition
+                  .getPartitionName());
+        } else {
+          // throttle the load balance if any of the instance can not handle the state transition
+          // TODO: may need finer grained control here.
+          for (String ins : allInstances) {
+            String currentState = currentStateMap.get(ins);
+            String bestPossibleState = bestPossibleMap.get(ins);
+            if (bestPossibleState != null && !bestPossibleState.equals(currentState)) {
+              if (throttleController
+                  .throttleForInstance(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+                      ins)) {
+                throttled = true;
+                logger.debug(
+                    "Load balance throttled because instance " + ins + " for " + resourceName + " "
+                        + partition.getPartitionName());
+              }
+            }
+          }
+        }
+
+        if (!throttled) {
+          intermediateMap.putAll(bestPossibleMap);
+          for (String ins : allInstances) {
+            String currentState = currentStateMap.get(ins);
+            String bestPossibleState = bestPossibleMap.get(ins);
+            if (bestPossibleState != null && !bestPossibleState.equals(currentState)) {
+              throttleController
+                  .chargeInstance(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, ins);
+            }
+          }
+
+          throttleController
+              .chargeCluster(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE);
+          throttleController
+              .chargeResource(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+                  resourceName);
+        } else {
+          intermediateMap.putAll(currentStateMap);
+          loadbalanceThrottledCount++;
+          partitionsLoadbalanceThrottled.add(partition);
+        }
+        output.setState(partition, intermediateMap);
+      }
+    }
+
+    logger.info(String.format(
+        "RecoveryNeeded: %d, RecoveryThrottled: %d, loadbalanceNeeded: %d, loadbalanceThrottled: %d",
+        recoveryNeededCount, recoveryThrottledCount, loadbalanceNeededCount,
+        loadbalanceThrottledCount));
+
+    if (logger.isDebugEnabled()) {
+      logParitionMapState(resourceName, new HashSet(resource.getPartitions()),
+          partitionsNeedRecovery, partitionsRecoveryThrotted, partitionsNeedLoadbalance,
+          partitionsLoadbalanceThrottled, currentStateOutput, bestPossiblePartitionStateMap,
+          output);
+    }
+
+    logger.info("End processing resource:" + resourceName);
+
+    return output;
+  }
+
+  private void logParitionMapState(String resource, Set<Partition> allPartitions,
+      Set<Partition> recoveryPartitions, Set<Partition> recoveryThrottledPartitions,
+      Set<Partition> loadbalancePartitions, Set<Partition> loadbalanceThrottledPartitions,
+      CurrentStateOutput currentStateOutput,
+      PartitionStateMap bestPossibleStateMap,
+      PartitionStateMap intermediateStateMap) {
+
+    logger.debug("Partitions need recovery: " + recoveryPartitions
+        + "\nPartitions get throttled on recovery: " + recoveryThrottledPartitions);
+    logger.debug("Partitions need loadbalance: " + loadbalancePartitions
+        + "\nPartitions get throttled on load-balance: " + loadbalanceThrottledPartitions);
+
+    for (Partition partition : allPartitions) {
+      if (recoveryPartitions.contains(partition)) {
+        logger
+            .debug("recovery balance needed for " + resource + " " + partition.getPartitionName());
+        if (recoveryThrottledPartitions.contains(partition)) {
+          logger.debug("Recovery balance throttled on resource for " + resource + " " + partition
+              .getPartitionName());
+        }
+      } else if (loadbalancePartitions.contains(partition)) {
+        logger.debug("load balance needed for " + resource + " " + partition.getPartitionName());
+        if (loadbalanceThrottledPartitions.contains(partition)) {
+          logger.debug("Load balance throttled on resource for " + resource + " " + partition
+              .getPartitionName());
+        }
+      } else {
+        logger.debug("no balance needed for " + resource + " " + partition.getPartitionName());
+      }
+
+      logger.debug(
+          partition + ": Best possible map: " + bestPossibleStateMap.getPartitionMap(partition));
+      logger.debug(partition + ": Current State: " + currentStateOutput
+          .getCurrentStateMap(resource, partition));
+      logger.debug(partition + ": Pending state: " + currentStateOutput
+          .getPendingMessageMap(resource, partition));
+      logger.debug(
+          partition + ": Intermediate state: " + intermediateStateMap.getPartitionMap(partition));
+    }
+  }
+
+  private boolean needRecoveryRebalance(Map<String, String> bestPossibleMap,
+      StateModelDefinition stateModelDef, Map<String, String> currentStateMap) {
+    boolean recoveryBalanceNeeded = false;
+    List<String> states = stateModelDef.getStatesPriorityList();
+    Map<String, Long> bestPossibleStateCounts = getStateCounts(bestPossibleMap);
+    Map<String, Long> currentStateCounts = getStateCounts(currentStateMap);
+
+    for (String state : states) {
+      Long bestPossibleCount = bestPossibleStateCounts.get(state);
+      Long currentCount = currentStateCounts.get(state);
+
+      if (bestPossibleCount == null && currentCount == null) {
+        continue;
+      } else if (bestPossibleCount == null || currentCount == null ||
+          !bestPossibleCount.equals(currentCount)) {
+        if (!state.equals(HelixDefinedState.DROPPED.name()) &&
+            !state.equals(HelixDefinedState.ERROR.name()) &&
+            !state.equals(stateModelDef.getInitialState())) {
+          recoveryBalanceNeeded = true;
+          break;
+        }
+      }
+    }
+
+    return recoveryBalanceNeeded;
+  }
+
+  /* given instance->state map, return the state counts */
+  private Map<String, Long> getStateCounts(Map<String, String> stateMap) {
+    Map<String, Long> stateCounts = new HashMap<String, Long>();
+    for (String state : stateMap.values()) {
+      if (!stateCounts.containsKey(state)) {
+        stateCounts.put(state, 0L);
+      }
+      stateCounts.put(state, stateCounts.get(state) + 1);
+    }
+    return stateCounts;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 2f4a331..f5f912e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -51,11 +51,11 @@ public class MessageGenerationPhase extends AbstractBaseStage {
   public void process(ClusterEvent event) throws Exception {
     HelixManager manager = event.getAttribute("helixmanager");
     ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     CurrentStateOutput currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+        event.getAttribute(AttributeName.CURRENT_STATE.name());
     IntermediateStateOutput intermediateStateOutput =
-        event.getAttribute(AttributeName.INTERMEDIATE_STATE.toString());
+        event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
     if (manager == null || cache == null || resourceMap == null || currentStateOutput == null
         || intermediateStateOutput == null) {
       throw new StageException("Missing attributes in event:" + event
@@ -168,7 +168,7 @@ public class MessageGenerationPhase extends AbstractBaseStage {
 
       } // end of for-each-partition
     }
-    event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
+    event.addAttribute(AttributeName.MESSAGES_ALL.name(), output);
   }
 
   private Message createMessage(HelixManager manager, Resource resource, String partitionName,

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 0bc1905..8e50d83 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -76,11 +76,11 @@ public class MessageSelectionStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
     ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     CurrentStateOutput currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+        event.getAttribute(AttributeName.CURRENT_STATE.name());
     MessageGenerationOutput messageGenOutput =
-        event.getAttribute(AttributeName.MESSAGES_ALL.toString());
+        event.getAttribute(AttributeName.MESSAGES_ALL.name());
     if (cache == null || resourceMap == null || currentStateOutput == null
         || messageGenOutput == null) {
       throw new StageException("Missing attributes in event:" + event
@@ -107,7 +107,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
         output.addMessages(resourceName, partition, selectedMessages);
       }
     }
-    event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
+    event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), output);
   }
 
   private void increaseStateCnt(Map<String, Bounds> stateConstraints, String state,

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
index 6bf610a..9a764ff 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
@@ -114,8 +114,8 @@ public class MessageThrottleStage extends AbstractBaseStage {
   public void process(ClusterEvent event) throws Exception {
     ClusterDataCache cache = event.getAttribute("ClusterDataCache");
     MessageSelectionStageOutput msgSelectionOutput =
-        event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
 
     if (cache == null || resourceMap == null || msgSelectionOutput == null) {
       throw new StageException("Missing attributes in event: " + event
@@ -148,7 +148,7 @@ public class MessageThrottleStage extends AbstractBaseStage {
       }
     }
 
-    event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output);
+    event.addAttribute(AttributeName.MESSAGES_THROTTLE.name(), output);
   }
 
   private List<Message> throttle(Map<String, Integer> throttleMap, ClusterConstraints constraint,

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index 8255cf4..b55a838 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -28,6 +28,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.common.PartitionStateMap;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
@@ -48,7 +49,8 @@ public class PersistAssignmentStage extends AbstractBaseStage {
     ClusterDataCache cache = event.getAttribute("ClusterDataCache");
     ClusterConfig clusterConfig = cache.getClusterConfig();
 
-    if (!clusterConfig.isPersistBestPossibleAssignment()) {
+    if (!clusterConfig.isPersistBestPossibleAssignment() && !clusterConfig
+        .isPersistIntermediateAssignment()) {
       return;
     }
 
@@ -86,13 +88,19 @@ public class PersistAssignmentStage extends AbstractBaseStage {
           }
         }
 
-        Map<Partition, Map<String, String>> bestPossibleAssignements =
-            bestPossibleAssignment.getResourceMap(resourceId);
+        PartitionStateMap partitionStateMap =
+            bestPossibleAssignment.getPartitionStateMap(resourceId);
+        if (clusterConfig.isPersistIntermediateAssignment()) {
+          IntermediateStateOutput intermediateAssignment =
+              event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
+          partitionStateMap = intermediateAssignment.getPartitionStateMap(resourceId);
+        }
+
+        Map<Partition, Map<String, String>> assignmentToPersist = partitionStateMap.getStateMap());
 
-        if (bestPossibleAssignements != null && hasInstanceMapChanged(bestPossibleAssignements,
-            idealState)) {
-          for (Partition partition : bestPossibleAssignements.keySet()) {
-            Map<String, String> instanceMap = bestPossibleAssignements.get(partition);
+        if (assignmentToPersist != null && hasInstanceMapChanged(assignmentToPersist, idealState)) {
+          for (Partition partition : assignmentToPersist.keySet()) {
+            Map<String, String> instanceMap = assignmentToPersist.get(partition);
             idealState.setInstanceStateMap(partition.getPartitionName(), instanceMap);
           }
           needPersist = true;
@@ -101,8 +109,7 @@ public class PersistAssignmentStage extends AbstractBaseStage {
         if (needPersist) {
           // Update instead of set to ensure any intermediate changes that the controller does not update are kept.
           accessor.updateProperty(keyBuilder.idealStates(resourceId), new DataUpdater<ZNRecord>() {
-            @Override
-            public ZNRecord update(ZNRecord current) {
+            @Override public ZNRecord update(ZNRecord current) {
               if (current != null) {
                 // Overwrite MapFields and ListFields items with the same key.
                 // Note that default merge will keep old values in the maps or lists unchanged, which is not desired.
@@ -117,7 +124,7 @@ public class PersistAssignmentStage extends AbstractBaseStage {
     }
 
     long endTime = System.currentTimeMillis();
-    LOG.info("END PersistAssignmentStage.process(), took " + (endTime - startTime) + " ms");
+    LOG.info("END PersistAssignmentStage.process() took " + (endTime - startTime) + " ms");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index bde2904..65b94ab 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -127,7 +127,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
       }
     }
 
-    event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
+    event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
   }
 
   private void addResource(String resource, Map<String, Resource> resourceMap) {

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
index e552797..09cbca6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
@@ -37,7 +37,7 @@ public class ResourceValidationStage extends AbstractBaseStage {
     if (cache == null) {
       throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
     }
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     if (resourceMap == null) {
       throw new StageException("Resources must be computed prior to validation!");
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
new file mode 100644
index 0000000..6acfd9e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
@@ -0,0 +1,176 @@
+package org.apache.helix.controller.stages;
+
+import java.util.Set;
+import org.apache.helix.api.config.StateTransitionThrottleConfig;
+import org.apache.helix.model.ClusterConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.log4j.Logger;
+
+/**
+ * Output for IntermediateStateCalStage.
+ */
+class StateTransitionThrottleController {
+  private static Logger logger = Logger.getLogger(StateTransitionThrottleController.class);
+
+  // pending allowed transition counts in the cluster level for recovery and load balance
+  Map<StateTransitionThrottleConfig.RebalanceType, Long> _pendingTransitionAllowedInCluster;
+
+  // pending allowed transition counts for each instance and resource
+  Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>
+      _pendingTransitionAllowedPerInstance;
+  Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>
+      _pendingTransitionAllowedPerResource;
+
+  private boolean _throttleEnabled = false;
+
+  public StateTransitionThrottleController(Set<String> resources, ClusterConfig clusterConfig,
+      Set<String> liveInstances) {
+    super();
+    _pendingTransitionAllowedInCluster =
+        new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>();
+    _pendingTransitionAllowedPerInstance =
+        new HashMap<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>();
+    _pendingTransitionAllowedPerResource =
+        new HashMap<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>();
+
+    if (clusterConfig == null) {
+      logger.warn("Cluster config is not found, no throttle config set!");
+      return;
+    }
+
+    List<StateTransitionThrottleConfig> throttleConfigs =
+        clusterConfig.getStateTransitionThrottleConfigs();
+
+    if (throttleConfigs == null || throttleConfigs.isEmpty()) {
+      logger.info("No throttle config is set!");
+      return;
+    }
+
+    for (StateTransitionThrottleConfig config : throttleConfigs) {
+      switch (config.getThrottleScope()) {
+      case CLUSTER:
+        _pendingTransitionAllowedInCluster
+            .put(config.getRebalanceType(), config.getMaxPartitionInTransition());
+        _throttleEnabled = true;
+        break;
+      case RESOURCE:
+        for (String resource : resources) {
+          if (!_pendingTransitionAllowedPerResource.containsKey(resource)) {
+            _pendingTransitionAllowedPerResource
+                .put(resource, new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>());
+          }
+          _pendingTransitionAllowedPerResource.get(resource)
+              .put(config.getRebalanceType(), config.getMaxPartitionInTransition());
+        }
+        _throttleEnabled = true;
+        break;
+      case INSTANCE:
+        for (String instance : liveInstances) {
+          if (!_pendingTransitionAllowedPerInstance.containsKey(instance)) {
+            _pendingTransitionAllowedPerInstance
+                .put(instance, new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>());
+          }
+          _pendingTransitionAllowedPerInstance.get(instance)
+              .put(config.getRebalanceType(), config.getMaxPartitionInTransition());
+        }
+        _throttleEnabled = true;
+        break;
+      }
+    }
+  }
+
+  /**
+   * Whether any throttle config enabled for this cluster.
+   *
+   * @return
+   */
+  protected boolean isThrottleEnabled() {
+    return _throttleEnabled;
+  }
+
+  /**
+   * Check if state transition on a partition should be throttled.
+   *
+   * @return true if it should be throttled, otherwise, false.
+   */
+  protected boolean throttleforCluster(
+      StateTransitionThrottleConfig.RebalanceType rebalanceType) {
+    Long clusterThrottle = _pendingTransitionAllowedInCluster.get(rebalanceType);
+    if (clusterThrottle != null) {
+      if (clusterThrottle <= 0) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  protected boolean throttleforResource(
+      StateTransitionThrottleConfig.RebalanceType rebalanceType, String resourceName) {
+    if (throttleforCluster(rebalanceType)) {
+      return true;
+    }
+
+    Long resouceThrottle;
+    if (_pendingTransitionAllowedPerResource.containsKey(resourceName)) {
+      resouceThrottle = _pendingTransitionAllowedPerResource.get(resourceName).get(rebalanceType);
+      if (resouceThrottle != null && resouceThrottle <= 0) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  protected boolean throttleForInstance(
+      StateTransitionThrottleConfig.RebalanceType rebalanceType, String instanceName) {
+    if (throttleforCluster(rebalanceType)) {
+      return true;
+    }
+
+    Long instanceThrottle;
+    if (_pendingTransitionAllowedPerInstance.containsKey(instanceName)) {
+      instanceThrottle = _pendingTransitionAllowedPerInstance.get(instanceName).get(rebalanceType);
+      if (instanceThrottle != null && instanceThrottle <= 0) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  protected void chargeCluster(StateTransitionThrottleConfig.RebalanceType rebalanceType) {
+    if (_pendingTransitionAllowedInCluster.containsKey(rebalanceType)) {
+      Long clusterThrottle = _pendingTransitionAllowedInCluster.get(rebalanceType);
+      if (clusterThrottle > 0) {
+        _pendingTransitionAllowedInCluster.put(rebalanceType, clusterThrottle - 1);
+      }
+    }
+  }
+
+  protected void chargeResource(StateTransitionThrottleConfig.RebalanceType rebalanceType,
+      String resource) {
+    if (_pendingTransitionAllowedPerResource.containsKey(resource)
+        && _pendingTransitionAllowedPerResource.get(resource).containsKey(rebalanceType)) {
+      Long resouceThrottle = _pendingTransitionAllowedPerResource.get(resource).get(rebalanceType);
+      if (resouceThrottle > 0) {
+        _pendingTransitionAllowedPerResource.get(resource).put(rebalanceType, resouceThrottle - 1);
+      }
+    }
+  }
+
+  protected void chargeInstance(StateTransitionThrottleConfig.RebalanceType rebalanceType,
+      String instance) {
+    if (_pendingTransitionAllowedPerInstance.containsKey(instance)
+        && _pendingTransitionAllowedPerInstance.get(instance).containsKey(rebalanceType)) {
+      Long instanceThrottle = _pendingTransitionAllowedPerInstance.get(instance).get(rebalanceType);
+      if (instanceThrottle > 0) {
+        _pendingTransitionAllowedPerInstance.get(instance).put(rebalanceType, instanceThrottle - 1);
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index c466bc6..8aed23e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -48,9 +48,9 @@ public class TaskAssignmentStage extends AbstractBaseStage {
     logger.info("START TaskAssignmentStage.process()");
 
     HelixManager manager = event.getAttribute("helixmanager");
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     MessageThrottleStageOutput messageOutput =
-        event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+        event.getAttribute(AttributeName.MESSAGES_THROTTLE.name());
     ClusterDataCache cache = event.getAttribute("ClusterDataCache");
     Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
index 2b4cfb2..38b74cb 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -27,9 +27,7 @@ import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.data.Stat;

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index f679b3f..79bb6fa 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -36,8 +36,9 @@ public class ClusterConfig extends HelixProperty {
    */
   public enum ClusterConfigProperty {
     HELIX_DISABLE_PIPELINE_TRIGGERS,
-    TOPOLOGY,  // cluster topology definition, for example, "/zone/rack/host/instance"
     PERSIST_BEST_POSSIBLE_ASSIGNMENT,
+    PERSIST_INTERMEDIATE_ASSIGNMENT,
+    TOPOLOGY,  // cluster topology definition, for example, "/zone/rack/host/instance"
     FAULT_ZONE_TYPE, // the type in which isolation should be applied on when Helix places the replicas from same partition.
     DELAY_REBALANCE_DISABLED,  // enabled the delayed rebalaning in case node goes offline.
     DELAY_REBALANCE_TIME,     // delayed time in ms that the delay time Helix should hold until rebalancing.
@@ -91,9 +92,28 @@ public class ClusterConfig extends HelixProperty {
   }
 
   /**
+   * Whether to persist IntermediateAssignment in a resource's idealstate.
+   *
+   * @return
+   */
+  public Boolean isPersistIntermediateAssignment() {
+    return _record
+        .getBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(), false);
+  }
+
+  /**
+   * Enable/Disable persist IntermediateAssignment in a resource's idealstate.
    *
    * @return
    */
+  public void setPersistIntermediateAssignment(Boolean enable) {
+    if (enable == null) {
+      _record.getSimpleFields().remove(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString());
+    } else {
+      _record.setBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(), enable);
+    }
+  }
+
   public Boolean isPipelineTriggersDisabled() {
     return _record
         .getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), false);

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
index 03c79d2..179f89a 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
@@ -28,11 +28,14 @@ import org.apache.helix.manager.zk.ZkClient;
  */
 
 /**
- * This class is deprecated, please use BestPossibleExternalViewVerifier in tools.ClusterVerifiers instead.
+ * This class is deprecated, please use BestPossibleExternalViewVerifier in tools.ClusterVerifiers
+ * instead.
  */
 @Deprecated
-public class ClusterExternalViewVerifier extends org.apache.helix.tools.ClusterVerifiers.ClusterExternalViewVerifier {
-  public ClusterExternalViewVerifier(ZkClient zkclient, String clusterName, List<String> expectLiveNodes) {
+public class ClusterExternalViewVerifier
+    extends org.apache.helix.tools.ClusterVerifiers.ClusterExternalViewVerifier {
+  public ClusterExternalViewVerifier(ZkClient zkclient, String clusterName,
+      List<String> expectLiveNodes) {
     super(zkclient, clusterName, expectLiveNodes);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index fc87dca..576b2fe 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -24,5 +24,6 @@ package org.apache.helix.tools;
  * please use dedicated verifier classes, such as BestPossibleExternViewVerifier, etc, in tools.ClusterVerifiers
  */
 @Deprecated
-public class ClusterStateVerifier extends org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier{
+public class ClusterStateVerifier
+    extends org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier {
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 6c79bed..2b6d92c 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -354,7 +354,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
     runStage(event, new BestPossibleStateCalcStage());
 
     BestPossibleStateOutput output =
-        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
 
     return output;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterExternalViewVerifier.java
index fa697c4..933acc2 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterExternalViewVerifier.java
@@ -101,7 +101,7 @@ public class ClusterExternalViewVerifier extends ClusterVerifier {
       runStage(event, stage);
     }
 
-    return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+    return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterStateVerifier.java
index d2a2d09..eace66f 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterStateVerifier.java
@@ -247,7 +247,7 @@ public class ClusterStateVerifier {
                 bestPossStateMap.get(resourceName).put(partition, new HashMap<String, String>());
               }
               bestPossStateMap.get(resourceName).get(partition)
-                  .put(instanceName, HelixDefinedState.ERROR.toString());
+                  .put(instanceName, HelixDefinedState.ERROR.name());
             }
           }
         }
@@ -281,7 +281,7 @@ public class ClusterStateVerifier {
               while (insIter.hasNext()) {
                 Map.Entry<String, String> insEntry = insIter.next();
                 String state = insEntry.getValue();
-                if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
+                if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.name())) {
                   insIter.remove();
                 }
               }
@@ -351,7 +351,7 @@ public class ClusterStateVerifier {
 
       // Filter resources if specified
       if (resources != null) {
-        Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+        Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
         resourceMap.keySet().retainAll(resources);
       }
 
@@ -359,7 +359,7 @@ public class ClusterStateVerifier {
       runStage(event, bpStage);
 
       BestPossibleStateOutput output =
-          event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+          event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
 
       // System.out.println("output:" + output);
       return output;

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
index 33570a0..4ea93ac 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
@@ -53,8 +53,8 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
 
     Map<String, Resource> resourceMap = getResourceMap();
     CurrentStateOutput currentStateOutput = new CurrentStateOutput();
-    event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
-    event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+    event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
     ReadClusterDataStage stage1 = new ReadClusterDataStage();
     runStage(event, stage1);
@@ -62,7 +62,7 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
     runStage(event, stage2);
 
     BestPossibleStateOutput output =
-        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
     for (int p = 0; p < 5; p++) {
       Partition resource = new Partition("testResourceName_" + p);
       AssertJUnit.assertEquals("MASTER", output.getInstanceStateMap("testResourceName", resource)
@@ -86,8 +86,8 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
 
     Map<String, Resource> resourceMap = getResourceMap();
     CurrentStateOutput currentStateOutput = new CurrentStateOutput();
-    event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
-    event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+    event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
     ReadClusterDataStage stage1 = new ReadClusterDataStage();
     runStage(event, stage1);
@@ -95,7 +95,7 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
     runStage(event, stage2);
 
     BestPossibleStateOutput output =
-        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
     for (int p = 0; p < 5; p++) {
       Partition resource = new Partition("testResourceName_" + p);
       AssertJUnit.assertNull(output.getInstanceStateMap("testResourceName", resource).get(

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
index 82c7b37..43e0e07 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
@@ -49,8 +49,8 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest {
 
     Map<String, Resource> resourceMap = getResourceMap();
     CurrentStateOutput currentStateOutput = new CurrentStateOutput();
-    event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
-    event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+    event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
 
     ReadClusterDataStage stage1 = new ReadClusterDataStage();
     runStage(event, stage1);
@@ -58,7 +58,7 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest {
     runStage(event, stage2);
 
     BestPossibleStateOutput output =
-        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
     for (int p = 0; p < 5; p++) {
       Partition resource = new Partition("testResourceName_" + p);
       AssertJUnit.assertEquals("MASTER", output.getInstanceStateMap("testResourceName", resource)

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index c5f54a5..ac1f262 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -39,11 +39,11 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
   @Test
   public void testEmptyCS() {
     Map<String, Resource> resourceMap = getResourceMap();
-    event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
+    event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
     CurrentStateComputationStage stage = new CurrentStateComputationStage();
     runStage(event, new ReadClusterDataStage());
     runStage(event, stage);
-    CurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    CurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.name());
     AssertJUnit.assertEquals(
         output.getCurrentStateMap("testResourceName", new Partition("testResourceName_0")).size(),
         0);
@@ -56,11 +56,11 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
 
     setupLiveInstances(5);
 
-    event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
+    event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
     CurrentStateComputationStage stage = new CurrentStateComputationStage();
     runStage(event, new ReadClusterDataStage());
     runStage(event, stage);
-    CurrentStateOutput output1 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    CurrentStateOutput output1 = event.getAttribute(AttributeName.CURRENT_STATE.name());
     AssertJUnit.assertEquals(
         output1.getCurrentStateMap("testResourceName", new Partition("testResourceName_0")).size(),
         0);
@@ -79,7 +79,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
 
     runStage(event, new ReadClusterDataStage());
     runStage(event, stage);
-    CurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    CurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.name());
     String pendingState =
         output2.getPendingState("testResourceName", new Partition("testResourceName_1"),
             "localhost_3").getToState();
@@ -104,7 +104,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
         stateWithDeadSession);
     runStage(event, new ReadClusterDataStage());
     runStage(event, stage);
-    CurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    CurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.name());
     String currentState =
         output3.getCurrentState("testResourceName", new Partition("testResourceName_1"),
             "localhost_3");

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index 3a321cc..965e0de 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -114,12 +114,12 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     selectMessages.add(msg);
 
     msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
-    event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
+    event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), msgSelectOutput);
 
     runStage(event, throttleStage);
 
     MessageThrottleStageOutput msgThrottleOutput =
-        event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+        event.getAttribute(AttributeName.MESSAGES_THROTTLE.name());
     Assert.assertEquals(msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0")).size(),
         1);
 
@@ -298,12 +298,12 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     selectMessages.add(msg6); // should be throttled
 
     msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
-    event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
+    event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), msgSelectOutput);
 
     runStage(event, throttleStage);
 
     MessageThrottleStageOutput msgThrottleOutput =
-        event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+        event.getAttribute(AttributeName.MESSAGES_THROTTLE.name());
     List<Message> throttleMessages =
         msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0"));
     Assert.assertEquals(throttleMessages.size(), 4);

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 18abf75..a6863ca 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -97,7 +97,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
     MessageSelectionStageOutput msgSelOutput =
-        event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     List<Message> messages =
         msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 1, "Should output 1 message: OFFLINE-SLAVE for node0");
@@ -113,7 +113,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
-    msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node1");
 
@@ -249,7 +249,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
     MessageSelectionStageOutput msgSelOutput =
-        event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     List<Message> messages =
         msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 1, "Should output 1 message: OFFLINE-SLAVE for node0");
@@ -267,7 +267,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
-    msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 1,
         "Should output only 1 message: OFFLINE->DROPPED for localhost_1");
@@ -284,7 +284,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     accessor.removeProperty(keyBuilder.message("localhost_0", msgIds.get(0)));
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
-    msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 1,
         "Should output 1 message: OFFLINE->DROPPED for localhost_0");
@@ -345,7 +345,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
     MessageSelectionStageOutput msgSelOutput =
-        event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     List<Message> messages =
         msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 1, "Should output 1 message: SLAVE-MASTER for node1");
@@ -364,7 +364,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
-    msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node0");
 

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
index dcb955c..87c0516 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
@@ -69,7 +69,7 @@ public class TestResourceComputationStage extends BaseStageTest {
     runStage(event, new ReadClusterDataStage());
     runStage(event, stage);
 
-    Map<String, Resource> resource = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<String, Resource> resource = event.getAttribute(AttributeName.RESOURCES.name());
     AssertJUnit.assertEquals(1, resource.size());
 
     AssertJUnit.assertEquals(resource.keySet().iterator().next(), resourceName);
@@ -91,7 +91,7 @@ public class TestResourceComputationStage extends BaseStageTest {
     runStage(event, new ReadClusterDataStage());
     runStage(event, stage);
 
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     AssertJUnit.assertEquals(resources.length, resourceMap.size());
 
     for (int i = 0; i < resources.length; i++) {
@@ -157,7 +157,7 @@ public class TestResourceComputationStage extends BaseStageTest {
     runStage(event, new ReadClusterDataStage());
     runStage(event, stage);
 
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     // +1 because it will have one for current state
     AssertJUnit.assertEquals(resources.length + 1, resourceMap.size());
 

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
index 15d7fd8..9c86372 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
@@ -70,14 +70,14 @@ public class TestResourceValidationStage {
 
     // run resource computation
     new ResourceComputationStage().process(event);
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     Assert.assertTrue(resourceMap.containsKey(masterSlaveCustomResource));
     Assert.assertTrue(resourceMap.containsKey(onlineOfflineFullAutoResource));
     Assert.assertTrue(resourceMap.containsKey(masterSlaveSemiAutoInvalidResource));
 
     // run resource validation
     new ResourceValidationStage().process(event);
-    Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     Assert.assertTrue(finalResourceMap.containsKey(masterSlaveCustomResource));
     Assert.assertTrue(finalResourceMap.containsKey(onlineOfflineFullAutoResource));
     Assert.assertFalse(finalResourceMap.containsKey(masterSlaveSemiAutoInvalidResource));
@@ -102,12 +102,12 @@ public class TestResourceValidationStage {
 
     // run resource computation
     new ResourceComputationStage().process(event);
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     Assert.assertTrue(resourceMap.containsKey(masterSlaveCustomResource));
 
     // run resource validation
     new ResourceValidationStage().process(event);
-    Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     Assert.assertTrue(finalResourceMap.containsKey(masterSlaveCustomResource));
   }
 
@@ -132,13 +132,13 @@ public class TestResourceValidationStage {
 
     // run resource computation
     new ResourceComputationStage().process(event);
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     Assert.assertTrue(resourceMap.containsKey(masterSlaveCustomResource));
     Assert.assertTrue(resourceMap.containsKey(leaderStandbyCustomResource));
 
     // run resource validation
     new ResourceValidationStage().process(event);
-    Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<String, Resource> finalResourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     Assert.assertTrue(finalResourceMap.containsKey(masterSlaveCustomResource));
     Assert.assertFalse(finalResourceMap.containsKey(leaderStandbyCustomResource));
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
index f9bbc94..d21706c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -90,7 +90,6 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBase {
           new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
       participant.syncStart();
       _participants[i] = participant;
-
     }
 
     // start controller
@@ -140,7 +139,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBase {
     ClusterSetup.processCommandLineArgs(command.split(" "));
 
     TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView", 30 * 1000, CLUSTER_NAME, "MyDB2",
-        TestHelper.<String> setOf("localhost_12918", "localhost_12919", "localhost_12920",
+        TestHelper.<String>setOf("localhost_12918", "localhost_12919", "localhost_12920",
             "localhost_12921", "localhost_12922"), ZK_ADDR);
   }