You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/02/20 19:41:59 UTC

[1/2] [HELIX-345] Speed up the controller pipeline

Repository: helix
Updated Branches:
  refs/heads/master c8a644f4b -> 51329f6f0


http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
index ef5a5fd..a49feae 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
@@ -56,7 +56,7 @@ public class MessageGenerationStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
     HelixManager manager = event.getAttribute("helixmanager");
-    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Cluster cluster = event.getAttribute("Cluster");
     Map<StateModelDefId, StateModelDefinition> stateModelDefMap = cluster.getStateModelMap();
     Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
@@ -67,7 +67,7 @@ public class MessageGenerationStage extends AbstractBaseStage {
     if (manager == null || cluster == null || resourceMap == null || currentStateOutput == null
         || bestPossibleStateOutput == null) {
       throw new StageException("Missing attributes in event:" + event
-          + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
+          + ". Requires HelixManager|Cluster|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
     }
 
     MessageOutput output = new MessageOutput();

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 966160c..b5ed39e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -89,7 +89,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
 
   @Override
   public void process(ClusterEvent event) throws Exception {
-    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Cluster cluster = event.getAttribute("Cluster");
     Map<StateModelDefId, StateModelDefinition> stateModelDefMap = cluster.getStateModelMap();
     Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
@@ -101,7 +101,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
     if (cluster == null || resourceMap == null || currentStateOutput == null
         || messageGenOutput == null || bestPossibleStateOutput == null) {
       throw new StageException("Missing attributes in event:" + event
-          + ". Requires DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE|MESSAGES_ALL");
+          + ". Requires Cluster|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE|MESSAGES_ALL");
     }
 
     MessageOutput output = new MessageOutput();
@@ -112,7 +112,8 @@ public class MessageSelectionStage extends AbstractBaseStage {
           stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
 
       if (stateModelDef == null) {
-        LOG.info("resource: " + resourceId
+        LOG.info("resource: "
+            + resourceId
             + " doesn't have state-model-def; e.g. we add a resource config but not add the resource in ideal-states");
         continue;
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
index 764b422..39bb228 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
@@ -116,7 +116,7 @@ public class MessageThrottleStage extends AbstractBaseStage {
 
   @Override
   public void process(ClusterEvent event) throws Exception {
-    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Cluster cluster = event.getAttribute("Cluster");
     MessageOutput msgSelectionOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
     Map<ResourceId, ResourceConfig> resourceMap =
@@ -127,7 +127,7 @@ public class MessageThrottleStage extends AbstractBaseStage {
     if (cluster == null || resourceMap == null || msgSelectionOutput == null
         || bestPossibleStateOutput == null) {
       throw new StageException("Missing attributes in event: " + event
-          + ". Requires ClusterDataCache|RESOURCES|BEST_POSSIBLE_STATE|MESSAGES_SELECTED");
+          + ". Requires Cluster|RESOURCES|BEST_POSSIBLE_STATE|MESSAGES_SELECTED");
     }
 
     MessageOutput output = new MessageOutput();

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index b3252a8..a9b46e7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -19,29 +19,66 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.List;
+
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.api.Cluster;
-import org.apache.helix.api.accessor.ResourceAccessor;
+import org.apache.helix.api.Resource;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
 
 /**
  * Persist the ResourceAssignment of each resource that went through rebalancing
  */
 public class PersistAssignmentStage extends AbstractBaseStage {
+  private static final Logger LOG = Logger.getLogger(PersistAssignmentStage.class);
+
   @Override
   public void process(ClusterEvent event) throws Exception {
-    HelixManager helixManager = event.getAttribute("helixmanager");
-    Cluster cluster = event.getAttribute("ClusterDataCache");
-    HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
-    ResourceAccessor resourceAccessor = new ResourceAccessor(cluster.getId(), accessor);
-    BestPossibleStateOutput assignments =
-        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
-    for (ResourceId resourceId : assignments.getAssignedResources()) {
-      ResourceAssignment assignment = assignments.getResourceAssignment(resourceId);
-      resourceAccessor.setResourceAssignment(resourceId, assignment);
+    LOG.info("START PersistAssignmentStage.process()");
+    long startTime = System.currentTimeMillis();
+
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+    if (cache.assignmentWriteEnabled()) {
+      Cluster cluster = event.getAttribute("Cluster");
+      HelixManager helixManager = event.getAttribute("helixmanager");
+      HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
+      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+      BestPossibleStateOutput assignments =
+          event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+      List<ResourceAssignment> changedAssignments = Lists.newLinkedList();
+      List<PropertyKey> changedKeys = Lists.newLinkedList();
+      for (ResourceId resourceId : assignments.getAssignedResources()) {
+        ResourceAssignment assignment = assignments.getResourceAssignment(resourceId);
+        Resource resource = cluster.getResource(resourceId);
+        boolean toAdd = false;
+        if (resource != null) {
+          ResourceAssignment existAssignment = resource.getResourceAssignment();
+          if (existAssignment == null || !existAssignment.equals(assignment)) {
+            toAdd = true;
+          }
+        } else {
+          toAdd = true;
+        }
+        if (toAdd) {
+          changedAssignments.add(assignment);
+          changedKeys.add(keyBuilder.resourceAssignment(resourceId.toString()));
+        }
+      }
+
+      // update as a batch operation
+      if (changedAssignments.size() > 0) {
+        accessor.setChildren(changedKeys, changedAssignments);
+      }
     }
+
+    long endTime = System.currentTimeMillis();
+    LOG.info("END PersistAssignmentStage.process(), took " + (endTime - startTime) + " ms");
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java
index 7c2cc0f..1a4aff3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java
@@ -48,8 +48,12 @@ public class PersistContextStage extends AbstractBaseStage {
 
     // remove marked contexts
     Set<ContextId> removedContexts = contextProvider.getRemovedContexts();
+    List<String> removedPaths = Lists.newLinkedList();
     for (ContextId contextId : removedContexts) {
-      accessor.removeProperty(keyBuilder.controllerContext(contextId.stringify()));
+      removedPaths.add(keyBuilder.controllerContext(contextId.stringify()).getPath());
+    }
+    if (removedPaths.size() > 0) {
+      accessor.getBaseDataAccessor().remove(removedPaths, 0);
     }
 
     // persist pending contexts
@@ -63,6 +67,9 @@ public class PersistContextStage extends AbstractBaseStage {
         properties.add(holder);
       }
     }
-    accessor.setChildren(keys, properties);
+
+    if (keys.size() > 0) {
+      accessor.setChildren(keys, properties);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index 85252a0..91505a0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -44,6 +44,8 @@ import com.google.common.collect.Sets;
 public class ReadClusterDataStage extends AbstractBaseStage {
   private static final Logger LOG = Logger.getLogger(ReadClusterDataStage.class.getName());
 
+  private ClusterDataCache _cache = null;
+
   @Override
   public void process(ClusterEvent event) throws Exception {
     long startTime = System.currentTimeMillis();
@@ -54,8 +56,15 @@ public class ReadClusterDataStage extends AbstractBaseStage {
       throw new StageException("HelixManager attribute value is null");
     }
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
+
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+    if (cache == null && _cache == null) {
+      cache = new ClusterDataCache();
+    }
+    _cache = cache;
+
     ClusterId clusterId = ClusterId.from(manager.getClusterName());
-    ClusterAccessor clusterAccessor = new ClusterAccessor(clusterId, accessor);
+    ClusterAccessor clusterAccessor = new ClusterAccessor(clusterId, accessor, _cache);
 
     Cluster cluster = clusterAccessor.readCluster();
 
@@ -87,7 +96,8 @@ public class ReadClusterDataStage extends AbstractBaseStage {
           disabledInstanceSet, disabledPartitions, tags);
     }
 
-    event.addAttribute("ClusterDataCache", cluster);
+    event.addAttribute("Cluster", cluster);
+    event.addAttribute("ClusterDataCache", _cache);
 
     // read contexts (if any)
     Map<ContextId, ControllerContext> persistedContexts = null;

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 5b75535..16f15e1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -48,7 +48,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
 
   @Override
   public void process(ClusterEvent event) throws StageException {
-    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Cluster cluster = event.getAttribute("Cluster");
     if (cluster == null) {
       throw new StageException("Missing attributes in event: " + event + ". Requires Cluster");
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
index 470de2c..575a163 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
@@ -37,9 +37,9 @@ public class ResourceValidationStage extends AbstractBaseStage {
 
   @Override
   public void process(ClusterEvent event) throws Exception {
-    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Cluster cluster = event.getAttribute("Cluster");
     if (cluster == null) {
-      throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
+      throw new StageException("Missing attributes in event:" + event + ". Requires Cluster");
     }
     Map<ResourceId, ResourceConfig> resourceConfigMap =
         event.getAttribute(AttributeName.RESOURCES.toString());

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index bc2ee50..aa47b4b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -55,15 +55,16 @@ public class TaskAssignmentStage extends AbstractBaseStage {
     MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
     BestPossibleStateOutput bestPossibleStateOutput =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
-    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Cluster cluster = event.getAttribute("Cluster");
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
     Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
 
     if (manager == null || resourceMap == null || messageOutput == null || cluster == null
-        || liveParticipantMap == null) {
+        || cache == null || liveParticipantMap == null) {
       throw new StageException(
           "Missing attributes in event:"
               + event
-              + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|BEST_POSSIBLE_STATE|DataCache|liveInstanceMap");
+              + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|BEST_POSSIBLE_STATE|Cluster|DataCache|liveInstanceMap");
     }
 
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
@@ -81,6 +82,11 @@ public class TaskAssignmentStage extends AbstractBaseStage {
             manager.getProperties());
     sendMessages(dataAccessor, outputMessages);
 
+    long cacheStart = System.currentTimeMillis();
+    cache.cacheMessages(outputMessages);
+    long cacheEnd = System.currentTimeMillis();
+    logger.debug("Caching messages took " + (cacheEnd - cacheStart) + " ms");
+
     long endTime = System.currentTimeMillis();
     logger.info("END TaskAssignmentStage.process(). took: " + (endTime - startTime) + " ms");
 

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index bff7e46..d428d02 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -21,6 +21,7 @@ package org.apache.helix.controller.strategy;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -41,6 +42,7 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Function;
 import com.google.common.base.Functions;
 import com.google.common.collect.Lists;
 
@@ -48,15 +50,15 @@ public class AutoRebalanceStrategy {
 
   private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);
 
-  private final String _resourceName;
-  private final List<String> _partitions;
-  private final LinkedHashMap<String, Integer> _states;
+  private final ResourceId _resourceId;
+  private final List<PartitionId> _partitions;
+  private final LinkedHashMap<State, Integer> _states;
   private final int _maximumPerNode;
   private final ReplicaPlacementScheme _placementScheme;
 
-  private Map<String, Node> _nodeMap;
+  private Map<ParticipantId, Node> _nodeMap;
   private List<Node> _liveNodesList;
-  private Map<Integer, String> _stateMap;
+  private Map<Integer, State> _stateMap;
 
   private Map<Replica, Node> _preferredAssignment;
   private Map<Replica, Node> _existingPreferredAssignment;
@@ -75,9 +77,18 @@ public class AutoRebalanceStrategy {
   public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
       final LinkedHashMap<String, Integer> states, int maximumPerNode,
       ReplicaPlacementScheme placementScheme) {
-    _resourceName = resourceName;
-    _partitions = partitions;
-    _states = states;
+    _resourceId = ResourceId.from(resourceName);
+    _partitions =
+        Lists.newArrayList(Lists.transform(partitions, new Function<String, PartitionId>() {
+          @Override
+          public PartitionId apply(String input) {
+            return PartitionId.from(input);
+          }
+        }));
+    _states = new LinkedHashMap<State, Integer>();
+    for (String state : states.keySet()) {
+      _states.put(State.from(state), states.get(state));
+    }
     _maximumPerNode = maximumPerNode;
     if (placementScheme != null) {
       _placementScheme = placementScheme;
@@ -107,14 +118,9 @@ public class AutoRebalanceStrategy {
   public AutoRebalanceStrategy(ResourceId resourceId, final List<PartitionId> partitions,
       final LinkedHashMap<State, Integer> states, int maximumPerNode,
       ReplicaPlacementScheme placementScheme) {
-    LinkedHashMap<String, Integer> rawStateCountMap = new LinkedHashMap<String, Integer>();
-    for (State state : states.keySet()) {
-      rawStateCountMap.put(state.toString(), states.get(state));
-    }
-    List<String> partitionNames = Lists.transform(partitions, Functions.toStringFunction());
-    _resourceName = resourceId.stringify();
-    _partitions = partitionNames;
-    _states = rawStateCountMap;
+    _resourceId = resourceId;
+    _partitions = partitions;
+    _states = states;
     _maximumPerNode = maximumPerNode;
     if (placementScheme != null) {
       _placementScheme = placementScheme;
@@ -134,37 +140,24 @@ public class AutoRebalanceStrategy {
   public ZNRecord typedComputePartitionAssignment(final List<ParticipantId> liveNodes,
       final Map<PartitionId, Map<ParticipantId, State>> currentMapping,
       final List<ParticipantId> allNodes) {
-    final List<String> rawLiveNodes = Lists.transform(liveNodes, Functions.toStringFunction());
-    final List<String> rawAllNodes = Lists.transform(allNodes, Functions.toStringFunction());
-    final Map<String, Map<String, String>> rawCurrentMapping =
-        ResourceAssignment.stringMapsFromReplicaMaps(currentMapping);
-    return computePartitionAssignment(rawLiveNodes, rawCurrentMapping, rawAllNodes);
-  }
-
-  /**
-   * Determine a preference list and mapping of partitions to nodes for all replicas
-   * @param liveNodes the current list of live participants
-   * @param currentMapping the current assignment of replicas to nodes
-   * @param allNodes the full list of known nodes in the system
-   * @return the preference list and replica mapping
-   */
-  public ZNRecord computePartitionAssignment(final List<String> liveNodes,
-      final Map<String, Map<String, String>> currentMapping, final List<String> allNodes) {
-    List<String> sortedLiveNodes = new ArrayList<String>(liveNodes);
-    Collections.sort(sortedLiveNodes);
-    List<String> sortedAllNodes = new ArrayList<String>(allNodes);
-    Collections.sort(sortedAllNodes);
+    Comparator<ParticipantId> nodeComparator = new NodeComparator();
+    List<ParticipantId> sortedLiveNodes = new ArrayList<ParticipantId>(liveNodes);
+    Collections.sort(sortedLiveNodes, nodeComparator);
+    List<ParticipantId> sortedAllNodes = new ArrayList<ParticipantId>(allNodes);
+    Collections.sort(sortedAllNodes, nodeComparator);
+    List<String> sortedNodeNames =
+        Lists.newArrayList(Lists.transform(sortedAllNodes, Functions.toStringFunction()));
     int numReplicas = countStateReplicas();
-    ZNRecord znRecord = new ZNRecord(_resourceName);
+    ZNRecord znRecord = new ZNRecord(_resourceId.stringify());
     if (sortedLiveNodes.size() == 0) {
       return znRecord;
     }
     int distRemainder = (numReplicas * _partitions.size()) % sortedLiveNodes.size();
     int distFloor = (numReplicas * _partitions.size()) / sortedLiveNodes.size();
-    _nodeMap = new HashMap<String, Node>();
+    _nodeMap = new HashMap<ParticipantId, Node>();
     _liveNodesList = new ArrayList<Node>();
 
-    for (String id : sortedAllNodes) {
+    for (ParticipantId id : sortedAllNodes) {
       Node node = new Node(id);
       node.capacity = 0;
       node.hasCeilingCapacity = false;
@@ -189,7 +182,7 @@ public class AutoRebalanceStrategy {
     _stateMap = generateStateMap();
 
     // compute the preferred mapping if all nodes were up
-    _preferredAssignment = computePreferredPlacement(sortedAllNodes);
+    _preferredAssignment = computePreferredPlacement(sortedNodeNames);
 
     // logger.info("preferred mapping:"+ preferredAssignment);
     // from current mapping derive the ones in preferred location
@@ -216,6 +209,31 @@ public class AutoRebalanceStrategy {
   }
 
   /**
+   * Determine a preference list and mapping of partitions to nodes for all replicas
+   * @param liveNodes the current list of live participants
+   * @param currentMapping the current assignment of replicas to nodes
+   * @param allNodes the full list of known nodes in the system
+   * @return the preference list and replica mapping
+   */
+  public ZNRecord computePartitionAssignment(final List<String> liveNodes,
+      final Map<String, Map<String, String>> currentMapping, final List<String> allNodes) {
+
+    Function<String, ParticipantId> participantConverter = new Function<String, ParticipantId>() {
+      @Override
+      public ParticipantId apply(String participantId) {
+        return ParticipantId.from(participantId);
+      }
+    };
+    List<ParticipantId> typedLiveNodes =
+        Lists.newArrayList(Lists.transform(liveNodes, participantConverter));
+    List<ParticipantId> typedAllNodes =
+        Lists.newArrayList(Lists.transform(allNodes, participantConverter));
+    Map<PartitionId, Map<ParticipantId, State>> typedCurrentMapping =
+        ResourceAssignment.replicaMapsFromStringMaps(currentMapping);
+    return typedComputePartitionAssignment(typedLiveNodes, typedCurrentMapping, typedAllNodes);
+  }
+
+  /**
    * Move replicas assigned to non-preferred nodes if their current node is at capacity
    * and its preferred node is under capacity.
    */
@@ -355,10 +373,11 @@ public class AutoRebalanceStrategy {
     // This is useful to verify that there is no node serving multiple replicas of the same
     // partition.
     Map<String, List<String>> newPreferences = new TreeMap<String, List<String>>();
-    for (String partition : _partitions) {
-      znRecord.setMapField(partition, new TreeMap<String, String>());
-      znRecord.setListField(partition, new ArrayList<String>());
-      newPreferences.put(partition, new ArrayList<String>());
+    for (PartitionId partition : _partitions) {
+      String partitionName = partition.stringify();
+      znRecord.setMapField(partitionName, new TreeMap<String, String>());
+      znRecord.setListField(partitionName, new ArrayList<String>());
+      newPreferences.put(partitionName, new ArrayList<String>());
     }
 
     // for preference lists, the rough priority that we want is:
@@ -367,29 +386,29 @@ public class AutoRebalanceStrategy {
     for (Node node : _liveNodesList) {
       for (Replica replica : node.preferred) {
         if (node.newReplicas.contains(replica)) {
-          newPreferences.get(replica.partition).add(node.id);
+          newPreferences.get(replica.partition.toString()).add(node.id.toString());
         } else {
-          znRecord.getListField(replica.partition).add(node.id);
+          znRecord.getListField(replica.partition.toString()).add(node.id.toString());
         }
       }
     }
     for (Node node : _liveNodesList) {
       for (Replica replica : node.nonPreferred) {
         if (node.newReplicas.contains(replica)) {
-          newPreferences.get(replica.partition).add(node.id);
+          newPreferences.get(replica.partition.toString()).add(node.id.toString());
         } else {
-          znRecord.getListField(replica.partition).add(node.id);
+          znRecord.getListField(replica.partition.toString()).add(node.id.toString());
         }
       }
     }
     normalizePreferenceLists(znRecord.getListFields(), newPreferences);
 
     // generate preference maps based on the preference lists
-    for (String partition : _partitions) {
-      List<String> preferenceList = znRecord.getListField(partition);
+    for (PartitionId partition : _partitions) {
+      List<String> preferenceList = znRecord.getListField(partition.toString());
       int i = 0;
       for (String participant : preferenceList) {
-        znRecord.getMapField(partition).put(participant, _stateMap.get(i));
+        znRecord.getMapField(partition.toString()).put(participant, _stateMap.get(i).toString());
         i++;
       }
     }
@@ -429,12 +448,12 @@ public class AutoRebalanceStrategy {
     List<String> newPreferenceList = new ArrayList<String>();
     int replicas = Math.min(countStateReplicas(), preferenceList.size());
     for (int i = 0; i < replicas; i++) {
-      String state = _stateMap.get(i);
+      State state = _stateMap.get(i);
       String node = getMinimumNodeForReplica(state, notAssigned, nodeReplicaCounts);
       newPreferenceList.add(node);
       notAssigned.remove(node);
       Map<String, Integer> counts = nodeReplicaCounts.get(node);
-      counts.put(state, counts.get(state) + 1);
+      counts.put(state.toString(), counts.get(state.toString()) + 1);
     }
     preferenceList.clear();
     preferenceList.addAll(newPreferenceList);
@@ -447,7 +466,7 @@ public class AutoRebalanceStrategy {
    * @param nodeReplicaCounts current assignment of replicas
    * @return the node most willing to accept the replica
    */
-  private String getMinimumNodeForReplica(String state, Set<String> nodes,
+  private String getMinimumNodeForReplica(State state, Set<String> nodes,
       Map<String, Map<String, Integer>> nodeReplicaCounts) {
     String minimalNode = null;
     int minimalCount = Integer.MAX_VALUE;
@@ -468,17 +487,17 @@ public class AutoRebalanceStrategy {
    * @param nodeReplicaCounts a map of node to replica id and counts
    * @return the number of currently assigned replicas of the given id
    */
-  private int getReplicaCountForNode(String state, String node,
+  private int getReplicaCountForNode(State state, String node,
       Map<String, Map<String, Integer>> nodeReplicaCounts) {
     if (!nodeReplicaCounts.containsKey(node)) {
       Map<String, Integer> replicaCounts = new HashMap<String, Integer>();
-      replicaCounts.put(state, 0);
+      replicaCounts.put(state.toString(), 0);
       nodeReplicaCounts.put(node, replicaCounts);
       return 0;
     }
     Map<String, Integer> replicaCounts = nodeReplicaCounts.get(node);
     if (!replicaCounts.containsKey(state)) {
-      replicaCounts.put(state, 0);
+      replicaCounts.put(state.toString(), 0);
       return 0;
     }
     return replicaCounts.get(state);
@@ -491,12 +510,12 @@ public class AutoRebalanceStrategy {
    * @return The current assignments that do not conform to the preferred assignment
    */
   private Map<Replica, Node> computeExistingNonPreferredPlacement(
-      Map<String, Map<String, String>> currentMapping) {
+      Map<PartitionId, Map<ParticipantId, State>> currentMapping) {
     Map<Replica, Node> existingNonPreferredAssignment = new TreeMap<Replica, Node>();
     int count = countStateReplicas();
-    for (String partition : currentMapping.keySet()) {
-      Map<String, String> nodeStateMap = currentMapping.get(partition);
-      for (String nodeId : nodeStateMap.keySet()) {
+    for (PartitionId partition : currentMapping.keySet()) {
+      Map<ParticipantId, State> nodeStateMap = currentMapping.get(partition);
+      for (ParticipantId nodeId : nodeStateMap.keySet()) {
         Node node = _nodeMap.get(nodeId);
         boolean skip = false;
         for (Replica replica : node.preferred) {
@@ -560,12 +579,12 @@ public class AutoRebalanceStrategy {
    * @return Assignments that conform to the preferred placement
    */
   private Map<Replica, Node> computeExistingPreferredPlacement(
-      final Map<String, Map<String, String>> currentMapping) {
+      final Map<PartitionId, Map<ParticipantId, State>> currentMapping) {
     Map<Replica, Node> existingPreferredAssignment = new TreeMap<Replica, Node>();
     int count = countStateReplicas();
-    for (String partition : currentMapping.keySet()) {
-      Map<String, String> nodeStateMap = currentMapping.get(partition);
-      for (String nodeId : nodeStateMap.keySet()) {
+    for (PartitionId partition : currentMapping.keySet()) {
+      Map<ParticipantId, State> nodeStateMap = currentMapping.get(partition);
+      for (ParticipantId nodeId : nodeStateMap.keySet()) {
         Node node = _nodeMap.get(nodeId);
         node.currentlyAssigned = node.currentlyAssigned + 1;
         // check if its in one of the preferred position
@@ -591,18 +610,18 @@ public class AutoRebalanceStrategy {
    * @param allNodes Identifiers to all nodes, live and non-live
    * @return Preferred assignment of replicas
    */
-  private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes) {
+  private Map<Replica, Node> computePreferredPlacement(final List<String> nodeNames) {
     Map<Replica, Node> preferredMapping;
     preferredMapping = new HashMap<Replica, Node>();
     int partitionId = 0;
     int numReplicas = countStateReplicas();
     int count = countStateReplicas();
-    for (String partition : _partitions) {
+    for (PartitionId partition : _partitions) {
       for (int replicaId = 0; replicaId < count; replicaId++) {
         Replica replica = new Replica(partition, replicaId);
-        String nodeName =
-            _placementScheme.getLocation(partitionId, replicaId, _partitions.size(), numReplicas,
-                allNodes);
+        ParticipantId nodeName =
+            ParticipantId.from(_placementScheme.getLocation(partitionId, replicaId,
+                _partitions.size(), numReplicas, nodeNames));
         preferredMapping.put(replica, _nodeMap.get(nodeName));
       }
       partitionId = partitionId + 1;
@@ -627,10 +646,10 @@ public class AutoRebalanceStrategy {
    * Compute a map of replica ids to state names
    * @return Map: replica id -> state name
    */
-  private Map<Integer, String> generateStateMap() {
+  private Map<Integer, State> generateStateMap() {
     int replicaId = 0;
-    Map<Integer, String> stateMap = new HashMap<Integer, String>();
-    for (String state : _states.keySet()) {
+    Map<Integer, State> stateMap = new HashMap<Integer, State>();
+    for (State state : _states.keySet()) {
       Integer count = _states.get(state);
       for (int i = 0; i < count; i++) {
         stateMap.put(replicaId, state);
@@ -648,13 +667,13 @@ public class AutoRebalanceStrategy {
     public int currentlyAssigned;
     public int capacity;
     public boolean hasCeilingCapacity;
-    private String id;
+    private ParticipantId id;
     boolean isAlive;
     private List<Replica> preferred;
     private List<Replica> nonPreferred;
     private Set<Replica> newReplicas;
 
-    public Node(String id) {
+    public Node(ParticipantId id) {
       preferred = new ArrayList<Replica>();
       nonPreferred = new ArrayList<Replica>();
       newReplicas = new TreeSet<Replica>();
@@ -716,8 +735,8 @@ public class AutoRebalanceStrategy {
     @Override
     public String toString() {
       StringBuilder sb = new StringBuilder();
-      sb.append("##########\nname=").append(id).append("\npreferred:").append(preferred.size())
-          .append("\nnonpreferred:").append(nonPreferred.size());
+      sb.append("##########\nname=").append(id.toString()).append("\npreferred:")
+          .append(preferred.size()).append("\nnonpreferred:").append(nonPreferred.size());
       return sb.toString();
     }
   }
@@ -727,14 +746,14 @@ public class AutoRebalanceStrategy {
    * and an identifier signifying a specific replica of a given partition and state.
    */
   class Replica implements Comparable<Replica> {
-    private String partition;
+    private PartitionId partition;
     private int replicaId; // this is a partition-relative id
     private String format;
 
-    public Replica(String partition, int replicaId) {
+    public Replica(PartitionId partition, int replicaId) {
       this.partition = partition;
       this.replicaId = replicaId;
-      this.format = this.partition + "|" + this.replicaId;
+      this.format = this.partition.toString() + "|" + this.replicaId;
     }
 
     @Override
@@ -816,4 +835,11 @@ public class AutoRebalanceStrategy {
       return nodeNames.get(index);
     }
   }
+
+  private static class NodeComparator implements Comparator<ParticipantId> {
+    @Override
+    public int compare(ParticipantId o1, ParticipantId o2) {
+      return o1.toString().compareTo(o2.toString());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index 471530c..7527751 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -110,6 +110,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
 
     boolean success = false;
     switch (type) {
+    case RESOURCEASSIGNMENTS:
     case IDEALSTATES:
     case EXTERNALVIEW:
       // check if bucketized
@@ -200,6 +201,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
       // ZNRecord record = null;
 
       switch (type) {
+      case RESOURCEASSIGNMENTS:
       case CURRENTSTATES:
       case IDEALSTATES:
       case EXTERNALVIEW:
@@ -251,6 +253,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
     }
 
     switch (type) {
+    case RESOURCEASSIGNMENTS:
     case CURRENTSTATES:
     case IDEALSTATES:
     case EXTERNALVIEW:
@@ -313,6 +316,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
     if (children != null) {
       for (ZNRecord record : children) {
         switch (type) {
+        case RESOURCEASSIGNMENTS:
         case CURRENTSTATES:
         case IDEALSTATES:
         case EXTERNALVIEW:
@@ -419,6 +423,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
       HelixProperty value = children.get(i);
 
       switch (type) {
+      case RESOURCEASSIGNMENTS:
       case EXTERNALVIEW:
         if (value.getBucketSize() == 0) {
           records.add(value.getRecord());

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 173e251..9aff196 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -42,6 +42,12 @@ import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.api.id.StateModelFactoryId;
 import org.apache.helix.controller.rebalancer.HelixRebalancer;
 import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
+import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Enums;
@@ -72,7 +78,8 @@ public class IdealState extends HelixProperty {
     REBALANCE_TIMER_PERIOD,
     MAX_PARTITIONS_PER_INSTANCE,
     INSTANCE_GROUP_TAG,
-    REBALANCER_CLASS_NAME
+    REBALANCER_CLASS_NAME,
+    REBALANCER_CONFIG_NAME
   }
 
   public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
@@ -215,6 +222,42 @@ public class IdealState extends HelixProperty {
   }
 
   /**
+   * Set the RebalancerConfig implementation class for this resource
+   * @param clazz the class object
+   */
+  public void setRebalancerConfigClass(Class<? extends RebalancerConfig> clazz) {
+    String className = clazz.getName();
+    _record.setSimpleField(IdealStateProperty.REBALANCER_CONFIG_NAME.toString(), className);
+  }
+
+  /**
+   * Get the class representing the rebalancer config of this resource
+   * @return The rebalancer config class
+   */
+  public Class<? extends RebalancerConfig> getRebalancerConfigClass() {
+    // try to extract the class from the persisted data
+    String className = _record.getSimpleField(IdealStateProperty.REBALANCER_CONFIG_NAME.toString());
+    if (className != null) {
+      try {
+        return HelixUtil.loadClass(getClass(), className).asSubclass(RebalancerConfig.class);
+      } catch (ClassNotFoundException e) {
+        logger.error(className + " is not a valid class");
+      }
+    }
+    // the fallback is to use the mode
+    switch (getRebalanceMode()) {
+    case FULL_AUTO:
+      return FullAutoRebalancerConfig.class;
+    case SEMI_AUTO:
+      return SemiAutoRebalancerConfig.class;
+    case CUSTOMIZED:
+      return CustomRebalancerConfig.class;
+    default:
+      return PartitionedRebalancerConfig.class;
+    }
+  }
+
+  /**
    * Set the maximum number of partitions of this resource that an instance can serve
    * @param max the maximum number of partitions supported
    */
@@ -280,13 +323,21 @@ public class IdealState extends HelixProperty {
    * @return a set of partition names
    */
   public Set<String> getPartitionSet() {
-    if (getRebalanceMode() == RebalanceMode.SEMI_AUTO
-        || getRebalanceMode() == RebalanceMode.FULL_AUTO) {
+    switch (getRebalanceMode()) {
+    case SEMI_AUTO:
+    case FULL_AUTO:
       return _record.getListFields().keySet();
-    } else if (getRebalanceMode() == RebalanceMode.CUSTOMIZED
-        || getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+    case CUSTOMIZED:
       return _record.getMapFields().keySet();
-    } else {
+    case USER_DEFINED:
+      Class<? extends RebalancerConfig> configClass = getRebalancerConfigClass();
+      if (configClass.equals(SemiAutoRebalancerConfig.class)
+          || configClass.equals(FullAutoRebalancerConfig.class)) {
+        return _record.getListFields().keySet();
+      } else {
+        return _record.getMapFields().keySet();
+      }
+    default:
       logger.error("Invalid ideal state mode:" + getResourceName());
       return Collections.emptySet();
     }
@@ -360,8 +411,19 @@ public class IdealState extends HelixProperty {
    * @return set of instance names
    */
   public Set<String> getInstanceSet(String partitionName) {
-    if (getRebalanceMode() == RebalanceMode.SEMI_AUTO
-        || getRebalanceMode() == RebalanceMode.FULL_AUTO) {
+    boolean useListFields = false;
+    RebalanceMode rebalanceMode = getRebalanceMode();
+    if (rebalanceMode == RebalanceMode.USER_DEFINED) {
+      Class<? extends RebalancerConfig> configClass = getRebalancerConfigClass();
+      if (configClass.equals(SemiAutoRebalancerConfig.class)
+          || configClass.equals(FullAutoRebalancerConfig.class)) {
+        // override: if the user defined rebalancer expects auto-type inputs, use the list fields
+        useListFields = true;
+      }
+    }
+    if (useListFields || rebalanceMode == RebalanceMode.SEMI_AUTO
+        || rebalanceMode == RebalanceMode.FULL_AUTO) {
+      // get instances from list fields
       List<String> prefList = _record.getListField(partitionName);
       if (prefList != null) {
         return new TreeSet<String>(prefList);
@@ -369,8 +431,9 @@ public class IdealState extends HelixProperty {
         logger.warn(partitionName + " does NOT exist");
         return Collections.emptySet();
       }
-    } else if (getRebalanceMode() == RebalanceMode.CUSTOMIZED
-        || getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+    } else if (rebalanceMode == RebalanceMode.CUSTOMIZED
+        || rebalanceMode == RebalanceMode.USER_DEFINED) {
+      // get instances from map fields
       Map<String, String> stateMap = _record.getMapField(partitionName);
       if (stateMap != null) {
         return new TreeSet<String>(stateMap.keySet());

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
index 6da3dc2..c9a07ef 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -141,7 +141,7 @@ public class ResourceAssignment extends HelixProperty {
    * @param rawMaps the map of partition name to participant name and state
    * @return converted maps
    */
-  public static Map<? extends PartitionId, Map<ParticipantId, State>> replicaMapsFromStringMaps(
+  public static Map<PartitionId, Map<ParticipantId, State>> replicaMapsFromStringMaps(
       Map<String, Map<String, String>> rawMaps) {
     if (rawMaps == null) {
       return Collections.emptyMap();

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
index 8c5b863..aae58a8 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
@@ -112,4 +112,14 @@ public class ResourceConfiguration extends HelixProperty {
     RebalancerConfigHolder config = new RebalancerConfigHolder(this);
     return config.getRebalancerConfig(clazz);
   }
+
+  /**
+   * Check if this resource config has a rebalancer config
+   * @return true if a rebalancer config is attached, false otherwise
+   */
+  public boolean hasRebalancerConfig() {
+    return _record.getSimpleFields().containsKey(
+        RebalancerConfigHolder.class.getSimpleName() + NamespacedConfig.PREFIX_CHAR
+            + RebalancerConfigHolder.Fields.REBALANCER_CONFIG);
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index d1bce56..f52ef0a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -19,10 +19,6 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -54,6 +50,10 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
 /**
  * Custom rebalancer implementation for the {@code Task} state model.
  */
@@ -74,6 +74,9 @@ public class TaskRebalancer implements HelixRebalancer {
 
     // Fetch task configuration
     TaskConfig taskCfg = TaskUtil.getTaskCfg(_manager, resourceName);
+    if (taskCfg == null) {
+      return emptyAssignment(resourceId);
+    }
     String workflowResource = taskCfg.getWorkflow();
 
     // Fetch workflow configuration and context
@@ -333,7 +336,6 @@ public class TaskRebalancer implements HelixRebalancer {
       // Remove the set of task partitions that are completed or in one of the error states.
       pSet.removeAll(donePartitions);
     }
-
     if (isTaskComplete(taskCtx, allPartitions)) {
       workflowCtx.setTaskState(taskResource, TaskState.COMPLETED);
       if (isWorkflowComplete(workflowCtx, workflowConfig)) {

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index a9428c6..df7288a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -19,10 +19,10 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import com.google.common.base.Joiner;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.helix.AccessOption;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
@@ -37,6 +37,8 @@ import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Joiner;
+
 /**
  * Static utility methods.
  */
@@ -67,8 +69,10 @@ public class TaskUtil {
    */
   public static TaskConfig getTaskCfg(HelixManager manager, String taskResource) {
     Map<String, String> taskCfg = getResourceConfigMap(manager, taskResource);
+    if (taskCfg == null) {
+      return null;
+    }
     TaskConfig.Builder b = TaskConfig.Builder.fromMap(taskCfg);
-
     return b.build();
   }
 
@@ -107,8 +111,7 @@ public class TaskUtil {
     ZNRecord r =
         manager.getHelixPropertyStore().get(
             Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName,
-                TaskUtilEnum.PREV_RA_NODE.value()),
-            null, AccessOption.PERSISTENT);
+                TaskUtilEnum.PREV_RA_NODE.value()), null, AccessOption.PERSISTENT);
     return r != null ? new ResourceAssignment(r) : null;
   }
 
@@ -116,24 +119,21 @@ public class TaskUtil {
       ResourceAssignment ra) {
     manager.getHelixPropertyStore().set(
         Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName,
-            TaskUtilEnum.PREV_RA_NODE.value()),
-        ra.getRecord(), AccessOption.PERSISTENT);
+            TaskUtilEnum.PREV_RA_NODE.value()), ra.getRecord(), AccessOption.PERSISTENT);
   }
 
   public static TaskContext getTaskContext(HelixManager manager, String taskResource) {
     ZNRecord r =
         manager.getHelixPropertyStore().get(
             Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskResource,
-                TaskUtilEnum.CONTEXT_NODE.value()),
-            null, AccessOption.PERSISTENT);
+                TaskUtilEnum.CONTEXT_NODE.value()), null, AccessOption.PERSISTENT);
     return r != null ? new TaskContext(r) : null;
   }
 
   public static void setTaskContext(HelixManager manager, String taskResource, TaskContext ctx) {
     manager.getHelixPropertyStore().set(
         Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskResource,
-            TaskUtilEnum.CONTEXT_NODE.value()),
-        ctx.getRecord(), AccessOption.PERSISTENT);
+            TaskUtilEnum.CONTEXT_NODE.value()), ctx.getRecord(), AccessOption.PERSISTENT);
   }
 
   public static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) {
@@ -148,8 +148,7 @@ public class TaskUtil {
       WorkflowContext ctx) {
     manager.getHelixPropertyStore().set(
         Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource,
-            TaskUtilEnum.CONTEXT_NODE.value()),
-        ctx.getRecord(), AccessOption.PERSISTENT);
+            TaskUtilEnum.CONTEXT_NODE.value()), ctx.getRecord(), AccessOption.PERSISTENT);
   }
 
   public static String getNamespacedTaskName(String singleTaskWorkflow) {

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
index a0959cc..0cc4d4c 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
@@ -36,13 +36,11 @@ import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.log4j.Logger;
 
@@ -68,7 +66,8 @@ public class ClusterExternalViewVerifier extends ClusterVerifier {
 
   boolean verifyLiveNodes(List<ParticipantId> actualLiveNodes) {
     Collections.sort(actualLiveNodes);
-    List<String> rawActualLiveNodes = Lists.transform(actualLiveNodes, Functions.toStringFunction());
+    List<String> rawActualLiveNodes =
+        Lists.transform(actualLiveNodes, Functions.toStringFunction());
     return _expectSortedLiveNodes.equals(rawActualLiveNodes);
   }
 
@@ -97,7 +96,7 @@ public class ClusterExternalViewVerifier extends ClusterVerifier {
 
   BestPossibleStateOutput calculateBestPossibleState(Cluster cluster) throws Exception {
     ClusterEvent event = new ClusterEvent("event");
-    event.addAttribute("ClusterDataCache", cluster);
+    event.addAttribute("Cluster", cluster);
 
     List<Stage> stages = new ArrayList<Stage>();
     stages.add(new ResourceComputationStage());

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index da9d76e..973736b 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -440,7 +440,7 @@ public class ClusterStateVerifier {
 
   static BestPossibleStateOutput calcBestPossState(Cluster cluster) throws Exception {
     ClusterEvent event = new ClusterEvent("sampleEvent");
-    event.addAttribute("ClusterDataCache", cluster);
+    event.addAttribute("Cluster", cluster);
 
     ResourceComputationStage rcState = new ResourceComputationStage();
     CurrentStateComputationStage csStage = new CurrentStateComputationStage();

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
index 9ec1ef9..2d577ca 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
@@ -140,6 +140,14 @@ public class YAMLClusterSetup {
         }
         helixAdmin.addResource(cfg.clusterName, resource.name, partitions,
             resource.stateModel.name, resource.rebalancer.get("mode"));
+
+        // batch message mode
+        if (resource.batchMessageMode != null && resource.batchMessageMode) {
+          IdealState idealState = helixAdmin.getResourceIdealState(cfg.clusterName, resource.name);
+          idealState.setBatchMessageMode(true);
+          helixAdmin.setResourceIdealState(cfg.clusterName, resource.name, idealState);
+        }
+
         // user-defined rebalancer
         if (resource.rebalancer.containsKey("class")
             && resource.rebalancer.get("mode").equals(RebalanceMode.USER_DEFINED.toString())) {
@@ -268,6 +276,7 @@ public class YAMLClusterSetup {
       public Map<String, Integer> partitions;
       public StateModelConfig stateModel;
       public ConstraintsConfig constraints;
+      public Boolean batchMessageMode;
 
       public static class StateModelConfig {
         public String name;

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 919eb00..a89d27f 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -602,7 +602,7 @@ public class Mocks {
           String[] keySplit = key.split("\\/");
           String[] pathSplit = path.split("\\/");
           if (keySplit.length > pathSplit.length) {
-            child.add(keySplit[pathSplit.length + 1]);
+            child.add(keySplit[pathSplit.length]);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index ab4ead0..4f81729 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -127,7 +127,7 @@ public class TestNewStages extends ZkUnitTestBase {
           }
         });
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceConfigMap);
-    event.addAttribute("ClusterDataCache", cluster);
+    event.addAttribute("Cluster", cluster);
 
     // Run the stage
     try {

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 2e17ad3..e55c682 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -35,22 +35,13 @@ import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SessionId;
-import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.controller.pipeline.Pipeline;
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.CurrentStateComputationStage;
-import org.apache.helix.controller.stages.MessageSelectionStage;
-import org.apache.helix.controller.stages.MessageThrottleStage;
-import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.controller.stages.TaskAssignmentStage;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.Attributes;
@@ -174,7 +165,6 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
     clusterAccessor.initClusterStructure();
 
-
     ClusterControllerManager controller =
         new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
@@ -232,6 +222,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     ClusterEvent event = new ClusterEvent("testEvent");
     event.addAttribute("helixmanager", manager);
 
+    ClusterDataCache cache = new ClusterDataCache();
+    event.addAttribute("ClusterDataCache", cache);
+
     final String resourceName = "testResource_pending";
     String[] resourceGroups = new String[] {
       resourceName
@@ -288,6 +281,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     // message, make sure controller should not send O->DROPPEDN until O->S is done
     HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
     admin.dropResource(clusterName, resourceName);
+    List<IdealState> idealStates = accessor.getChildValues(accessor.keyBuilder().idealStates());
+    cache.setIdealStates(idealStates);
 
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
index ece46ff..b275e9c 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
@@ -76,7 +76,7 @@ public class TestResourceValidationStage {
     ClusterId clusterId = new ClusterId("sampleClusterId");
     ClusterAccessor clusterAccessor = new MockClusterAccessor(clusterId, accessor);
     Cluster cluster = clusterAccessor.readCluster();
-    event.addAttribute("ClusterDataCache", cluster);
+    event.addAttribute("Cluster", cluster);
     event.addAttribute(AttributeName.IDEAL_STATE_RULES.toString(),
         clusterConfiguration.getIdealStateRules());
 
@@ -113,7 +113,7 @@ public class TestResourceValidationStage {
     ClusterId clusterId = new ClusterId("sampleClusterId");
     ClusterAccessor clusterAccessor = new MockClusterAccessor(clusterId, accessor);
     Cluster cluster = clusterAccessor.readCluster();
-    event.addAttribute("ClusterDataCache", cluster);
+    event.addAttribute("Cluster", cluster);
     Map<String, Map<String, String>> emptyMap = Maps.newHashMap();
     event.addAttribute(AttributeName.IDEAL_STATE_RULES.toString(), emptyMap);
 
@@ -148,7 +148,7 @@ public class TestResourceValidationStage {
     ClusterId clusterId = new ClusterId("sampleClusterId");
     ClusterAccessor clusterAccessor = new MockClusterAccessor(clusterId, accessor);
     Cluster cluster = clusterAccessor.readCluster();
-    event.addAttribute("ClusterDataCache", cluster);
+    event.addAttribute("Cluster", cluster);
     Map<String, Map<String, String>> emptyMap = Maps.newHashMap();
     event.addAttribute(AttributeName.IDEAL_STATE_RULES.toString(), emptyMap);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/test/java/org/apache/helix/integration/TestReelectedPipelineCorrectness.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestReelectedPipelineCorrectness.java b/helix-core/src/test/java/org/apache/helix/integration/TestReelectedPipelineCorrectness.java
new file mode 100644
index 0000000..78927f9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestReelectedPipelineCorrectness.java
@@ -0,0 +1,151 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * The controller pipeline will only update ideal states, live instances, and instance configs
+ * when the change. However, if a controller loses leadership and subsequently regains it, we need
+ * to ensure that the controller can verify its cache. That's what this test is for.
+ */
+public class TestReelectedPipelineCorrectness extends ZkUnitTestBase {
+  @Test
+  public void testReelection() throws Exception {
+    final int NUM_CONTROLLERS = 2;
+    final int NUM_PARTICIPANTS = 4;
+    final int NUM_PARTITIONS = 8;
+    final int NUM_REPLICAS = 2;
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+
+    // Set up cluster
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        NUM_PARTITIONS, // partitions per resource
+        NUM_PARTICIPANTS, // number of nodes
+        NUM_REPLICAS, // replicas
+        "MasterSlave", RebalanceMode.FULL_AUTO, true); // do rebalance
+
+    // configure distributed controllers
+    String controllerCluster = clusterName + "_controllers";
+    setupTool.addCluster(controllerCluster, true);
+    for (int i = 0; i < NUM_CONTROLLERS; i++) {
+      setupTool.addInstanceToCluster(controllerCluster, "controller_" + i);
+    }
+    setupTool.activateCluster(clusterName, controllerCluster, true);
+
+    // start participants
+    MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      final String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    // start controllers
+    ClusterDistributedController[] controllers = new ClusterDistributedController[NUM_CONTROLLERS];
+    for (int i = 0; i < NUM_CONTROLLERS; i++) {
+      controllers[i] =
+          new ClusterDistributedController(ZK_ADDR, controllerCluster, "controller_" + i);
+      controllers[i].syncStart();
+    }
+    Thread.sleep(1000);
+
+    // Ensure a balanced cluster
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    // Disable the leader, resulting in a leader election
+    HelixDataAccessor accessor = participants[0].getHelixDataAccessor();
+    LiveInstance leader = accessor.getProperty(accessor.keyBuilder().controllerLeader());
+    String leaderId = leader.getId();
+    String standbyId = (leaderId.equals("controller_0")) ? "controller_1" : "controller_0";
+    HelixAdmin admin = setupTool.getClusterManagementTool();
+    admin.enableInstance(controllerCluster, leaderId, false);
+
+    // Stop a participant to make sure that the leader election worked
+    Thread.sleep(500);
+    participants[0].syncStop();
+    Thread.sleep(500);
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    // Disable the original standby (leaving 0 active controllers) and kill another participant
+    admin.enableInstance(controllerCluster, standbyId, false);
+    Thread.sleep(500);
+    participants[1].syncStop();
+
+    // Also change the ideal state
+    IdealState idealState = admin.getResourceIdealState(clusterName, "TestDB0");
+    idealState.setMaxPartitionsPerInstance(1);
+    admin.setResourceIdealState(clusterName, "TestDB0", idealState);
+    Thread.sleep(500);
+
+    // Also disable an instance in the main cluster
+    admin.enableInstance(clusterName, "localhost_12920", false);
+
+    // Re-enable the original leader
+    admin.enableInstance(controllerCluster, leaderId, true);
+
+    // Now check that both the ideal state and the live instances are adhered to by the rebalance
+    Thread.sleep(500);
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    // cleanup
+    for (int i = 0; i < NUM_CONTROLLERS; i++) {
+      controllers[i].syncStop();
+    }
+    for (int i = 2; i < NUM_PARTICIPANTS; i++) {
+      participants[i].syncStop();
+    }
+
+    System.out.println("STOP " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java b/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
index 27004fe..2a026c2 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
@@ -79,26 +79,30 @@ public class TestUserDefRebalancerCompatibility extends
 
     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, 3);
 
-    boolean result =
-        ClusterStateVerifier
-            .verifyByZkCallback(new TestCustomizedIdealStateRebalancer.ExternalViewBalancedVerifier(
-                _gZkClient, CLUSTER_NAME, db2));
-    Assert.assertTrue(result);
-    Thread.sleep(1000);
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    Builder keyBuilder = accessor.keyBuilder();
-    ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2));
-    Assert.assertEquals(ev.getPartitionSet().size(), 60);
-    for (String partition : ev.getPartitionSet()) {
-      Assert.assertEquals(ev.getStateMap(partition).size(), 1);
-    }
-    IdealState is = accessor.getProperty(keyBuilder.idealStates(db2));
-    for (PartitionId partition : is.getPartitionIdSet()) {
-      Assert.assertEquals(is.getPreferenceList(partition).size(), 3);
-      Assert.assertEquals(is.getParticipantStateMap(partition).size(), 3);
+    try {
+      boolean result =
+          ClusterStateVerifier
+              .verifyByZkCallback(new TestCustomizedIdealStateRebalancer.ExternalViewBalancedVerifier(
+                  _gZkClient, CLUSTER_NAME, db2));
+      Assert.assertTrue(result);
+      Thread.sleep(1000);
+      HelixDataAccessor accessor =
+          new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+      Builder keyBuilder = accessor.keyBuilder();
+      ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2));
+      Assert.assertEquals(ev.getPartitionSet().size(), 60);
+      for (String partition : ev.getPartitionSet()) {
+        Assert.assertEquals(ev.getStateMap(partition).size(), 1);
+      }
+      IdealState is = accessor.getProperty(keyBuilder.idealStates(db2));
+      for (PartitionId partition : is.getPartitionIdSet()) {
+        Assert.assertEquals(is.getPreferenceList(partition).size(), 3);
+        Assert.assertEquals(is.getParticipantStateMap(partition).size(), 3);
+      }
+      Assert.assertTrue(testRebalancerCreated);
+      Assert.assertTrue(testRebalancerInvoked);
+    } finally {
+      _setupTool.dropResourceFromCluster(CLUSTER_NAME, db2);
     }
-    Assert.assertTrue(testRebalancerCreated);
-    Assert.assertTrue(testRebalancerInvoked);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index 46af6c4..a00db67 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -198,8 +198,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
 
     // participant goes away. should be no change in number of beans as config is still present
     Thread.sleep(1000);
-    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered);
-    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered);
+    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered);
+    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered);
 
     HelixDataAccessor accessor = _participants[n - 1].getHelixDataAccessor();
     String firstControllerName =
@@ -215,8 +215,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
     Thread.sleep(1000);
 
     // 1 cluster status monitor, 1 resource monitor, 5 instances
-    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 7);
-    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 7);
+    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
+    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 7);
 
     String instanceName = "localhost0_" + (12918 + 0);
     _participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName);
@@ -224,8 +224,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
 
     // participant goes back. should be no change
     Thread.sleep(1000);
-    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 7);
-    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 7);
+    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
+    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 7);
 
     // Add a resource, one more mbean registered
     ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
@@ -237,14 +237,14 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
         Integer.parseInt(idealState.getReplicas()));
 
     Thread.sleep(1000);
-    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 7);
-    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 8);
+    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
+    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 8);
 
     // remove resource, no change
     setupTool.dropResourceFromCluster(_firstClusterName, "TestDB1");
     Thread.sleep(1000);
-    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 7);
-    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 8);
+    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
+    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 8);
 
   }
 }


[2/2] git commit: [HELIX-345] Speed up the controller pipeline

Posted by ka...@apache.org.
[HELIX-345] Speed up the controller pipeline


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/51329f6f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/51329f6f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/51329f6f

Branch: refs/heads/master
Commit: 51329f6f0bbb8a43cdfd06ffd3bf7ac5c8a93c68
Parents: c8a644f
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Feb 4 10:50:11 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Feb 20 10:41:46 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/ConfigAccessor.java   |   9 +-
 .../main/java/org/apache/helix/api/Cluster.java |  27 +-
 .../helix/api/accessor/ClusterAccessor.java     | 268 ++++++++++++------
 .../helix/api/accessor/ResourceAccessor.java    |  42 ++-
 .../apache/helix/api/config/ClusterConfig.java  | 185 +-----------
 .../controller/GenericHelixController.java      |  40 +++
 .../rebalancer/FullAutoRebalancer.java          |  16 +-
 .../controller/rebalancer/RebalancerRef.java    |  27 +-
 .../config/CustomRebalancerConfig.java          |   7 +-
 .../config/FullAutoRebalancerConfig.java        |   7 +-
 .../config/PartitionedRebalancerConfig.java     |  57 +++-
 .../config/RebalancerConfigHolder.java          |   2 +-
 .../config/SemiAutoRebalancerConfig.java        |   7 +-
 .../stages/BestPossibleStateCalcStage.java      |  46 ++-
 .../controller/stages/ClusterDataCache.java     | 282 +++++++++++++++++--
 .../stages/CompatibilityCheckStage.java         |   4 +-
 .../stages/CurrentStateComputationStage.java    |   4 +-
 .../stages/ExternalViewComputeStage.java        |   4 +-
 .../stages/MessageGenerationStage.java          |   4 +-
 .../stages/MessageSelectionStage.java           |   7 +-
 .../controller/stages/MessageThrottleStage.java |   4 +-
 .../stages/PersistAssignmentStage.java          |  57 +++-
 .../controller/stages/PersistContextStage.java  |  11 +-
 .../controller/stages/ReadClusterDataStage.java |  14 +-
 .../stages/ResourceComputationStage.java        |   2 +-
 .../stages/ResourceValidationStage.java         |   4 +-
 .../controller/stages/TaskAssignmentStage.java  |  12 +-
 .../strategy/AutoRebalanceStrategy.java         | 186 ++++++------
 .../helix/manager/zk/ZKHelixDataAccessor.java   |   5 +
 .../java/org/apache/helix/model/IdealState.java |  83 +++++-
 .../apache/helix/model/ResourceAssignment.java  |   2 +-
 .../helix/model/ResourceConfiguration.java      |  10 +
 .../org/apache/helix/task/TaskRebalancer.java   |  12 +-
 .../java/org/apache/helix/task/TaskUtil.java    |  23 +-
 .../tools/ClusterExternalViewVerifier.java      |   7 +-
 .../helix/tools/ClusterStateVerifier.java       |   2 +-
 .../apache/helix/tools/YAMLClusterSetup.java    |   9 +
 .../src/test/java/org/apache/helix/Mocks.java   |   2 +-
 .../org/apache/helix/api/TestNewStages.java     |   2 +-
 .../stages/TestRebalancePipeline.java           |  17 +-
 .../stages/TestResourceValidationStage.java     |   6 +-
 .../TestReelectedPipelineCorrectness.java       | 151 ++++++++++
 .../TestUserDefRebalancerCompatibility.java     |  44 +--
 .../TestClusterStatusMonitorLifecycle.java      |  20 +-
 44 files changed, 1188 insertions(+), 542 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index f46e537..3589165 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.manager.zk.ZKUtil;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ConfigScope;
@@ -489,7 +490,13 @@ public class ConfigAccessor {
     List<String> retKeys = null;
 
     if (scope.isFullKey()) {
-      ZNRecord record = zkClient.readData(zkPath);
+      ZNRecord record;
+      try {
+        record = zkClient.readData(zkPath);
+      } catch (ZkNoNodeException e) {
+        LOG.warn(zkPath + " no longer exists");
+        return Collections.emptyList();
+      }
       if (mapKey == null) {
         retKeys = new ArrayList<String>(record.getSimpleFields().keySet());
       } else {

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index 98072d1..adaf200 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -34,10 +34,8 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SpectatorId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.context.ControllerContext;
-import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.PersistentStats;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.Transition;
 
@@ -91,8 +89,6 @@ public class Cluster {
    * @param constraintMap
    * @param stateModelMap
    * @param contextMap
-   * @param stats
-   * @param alerts
    * @param userConfig
    * @param isPaused
    * @param autoJoinAllowed
@@ -101,8 +97,8 @@ public class Cluster {
       Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
       ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap,
       Map<StateModelDefId, StateModelDefinition> stateModelMap,
-      Map<ContextId, ControllerContext> contextMap, PersistentStats stats, Alerts alerts,
-      UserConfig userConfig, boolean isPaused, boolean autoJoinAllowed) {
+      Map<ContextId, ControllerContext> contextMap, UserConfig userConfig, boolean isPaused,
+      boolean autoJoinAllowed) {
 
     // build the config
     // Guava's transform and "copy" functions really return views so the maps all reflect each other
@@ -124,8 +120,7 @@ public class Cluster {
         new ClusterConfig.Builder(id).addResources(resourceConfigMap.values())
             .addParticipants(participantConfigMap.values()).addConstraints(constraintMap.values())
             .addStateModelDefinitions(stateModelMap.values()).pausedStatus(isPaused)
-            .userConfig(userConfig).autoJoin(autoJoinAllowed).addStats(stats).addAlerts(alerts)
-            .build();
+            .userConfig(userConfig).autoJoin(autoJoinAllowed).build();
 
     _resourceMap = ImmutableMap.copyOf(resourceMap);
 
@@ -240,22 +235,6 @@ public class Cluster {
   }
 
   /**
-   * Get all the persisted stats for the cluster
-   * @return PersistentStats instance
-   */
-  public PersistentStats getStats() {
-    return _config.getStats();
-  }
-
-  /**
-   * Get all the persisted alerts for the cluster
-   * @return Alerts instance
-   */
-  public Alerts getAlerts() {
-    return _config.getAlerts();
-  }
-
-  /**
    * Get user-specified configuration properties of this cluster
    * @return UserConfig properties
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index abb3e49..5ecc210 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -19,6 +19,7 @@ package org.apache.helix.api.accessor;
  * under the License.
  */
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -54,9 +55,11 @@ import org.apache.helix.api.id.SessionId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.context.ControllerContext;
 import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
 import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
+import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.manager.zk.ZKUtil;
 import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConfiguration;
@@ -86,15 +89,27 @@ public class ClusterAccessor {
   private final PropertyKey.Builder _keyBuilder;
   private final ClusterId _clusterId;
 
+  private final ClusterDataCache _cache;
+
   /**
    * Instantiate a cluster accessor
    * @param clusterId the cluster to access
    * @param accessor HelixDataAccessor for the physical store
    */
   public ClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
+    this(clusterId, accessor, new ClusterDataCache());
+  }
+
+  /**
+   * Instantiate a cluster accessor
+   * @param clusterId the cluster to access
+   * @param accessor HelixDataAccessor for the physical store
+   */
+  public ClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor, ClusterDataCache cache) {
     _accessor = accessor;
     _keyBuilder = accessor.keyBuilder();
     _clusterId = clusterId;
+    _cache = cache;
   }
 
   /**
@@ -129,9 +144,6 @@ public class ClusterAccessor {
     if (cluster.autoJoinAllowed()) {
       clusterConfig.setAutoJoinAllowed(cluster.autoJoinAllowed());
     }
-    if (cluster.getStats() != null && !cluster.getStats().getMapFields().isEmpty()) {
-      _accessor.setProperty(_keyBuilder.persistantStat(), cluster.getStats());
-    }
     if (cluster.isPaused()) {
       pauseCluster();
     }
@@ -173,16 +185,6 @@ public class ClusterAccessor {
       ClusterConstraints constraint = constraints.get(type);
       _accessor.setProperty(_keyBuilder.constraint(type.toString()), constraint);
     }
-    if (config.getStats() == null || config.getStats().getMapFields().isEmpty()) {
-      _accessor.removeProperty(_keyBuilder.persistantStat());
-    } else {
-      _accessor.setProperty(_keyBuilder.persistantStat(), config.getStats());
-    }
-    if (config.getAlerts() == null || config.getAlerts().getMapFields().isEmpty()) {
-      _accessor.removeProperty(_keyBuilder.alerts());
-    } else {
-      _accessor.setProperty(_keyBuilder.alerts(), config.getAlerts());
-    }
     return true;
   }
 
@@ -218,19 +220,22 @@ public class ClusterAccessor {
       LOG.error("Cluster is not fully set up");
       return null;
     }
-    LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+
+    // refresh the cache
+    _cache.refresh(_accessor);
+
+    LiveInstance leader = _cache.getLeader();
 
     /**
      * map of constraint-type to constraints
      */
-    Map<String, ClusterConstraints> constraintMap =
-        _accessor.getChildValuesMap(_keyBuilder.constraints());
+    Map<String, ClusterConstraints> constraintMap = _cache.getConstraintMap();
 
     // read all the resources
-    Map<ResourceId, Resource> resourceMap = readResources();
+    Map<ResourceId, Resource> resourceMap = readResources(true);
 
     // read all the participants
-    Map<ParticipantId, Participant> participantMap = readParticipants();
+    Map<ParticipantId, Participant> participantMap = readParticipants(true);
 
     // read the controllers
     Map<ControllerId, Controller> controllerMap = new HashMap<ControllerId, Controller>();
@@ -249,10 +254,10 @@ public class ClusterAccessor {
     }
 
     // read the pause status
-    PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
+    PauseSignal pauseSignal = _cache.getPauseSignal();
     boolean isPaused = pauseSignal != null;
 
-    ClusterConfiguration clusterConfig = _accessor.getProperty(_keyBuilder.clusterConfig());
+    ClusterConfiguration clusterConfig = _cache.getClusterConfig();
     boolean autoJoinAllowed = false;
     UserConfig userConfig;
     if (clusterConfig != null) {
@@ -263,21 +268,14 @@ public class ClusterAccessor {
     }
 
     // read the state model definitions
-    Map<StateModelDefId, StateModelDefinition> stateModelMap = readStateModelDefinitions();
-
-    // read the stats
-    PersistentStats stats = _accessor.getProperty(_keyBuilder.persistantStat());
-
-    // read the alerts
-    Alerts alerts = _accessor.getProperty(_keyBuilder.alerts());
+    Map<StateModelDefId, StateModelDefinition> stateModelMap = readStateModelDefinitions(true);
 
     // read controller context
-    Map<ContextId, ControllerContext> contextMap = readControllerContext();
+    Map<ContextId, ControllerContext> contextMap = readControllerContext(true);
 
     // create the cluster snapshot object
     return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
-        clusterConstraintMap, stateModelMap, contextMap, stats, alerts, userConfig, isPaused,
-        autoJoinAllowed);
+        clusterConstraintMap, stateModelMap, contextMap, userConfig, isPaused, autoJoinAllowed);
   }
 
   /**
@@ -285,9 +283,22 @@ public class ClusterAccessor {
    * @return map of state model def id to state model definition
    */
   public Map<StateModelDefId, StateModelDefinition> readStateModelDefinitions() {
+    return readStateModelDefinitions(false);
+  }
+
+  /**
+   * Get all the state model definitions for this cluster
+   * @param useCache Use the ClusterDataCache associated with this class rather than reading again
+   * @return map of state model def id to state model definition
+   */
+  private Map<StateModelDefId, StateModelDefinition> readStateModelDefinitions(boolean useCache) {
     Map<StateModelDefId, StateModelDefinition> stateModelDefs = Maps.newHashMap();
-    List<StateModelDefinition> stateModelList =
-        _accessor.getChildValues(_keyBuilder.stateModelDefs());
+    Collection<StateModelDefinition> stateModelList;
+    if (useCache) {
+      stateModelList = _cache.getStateModelDefMap().values();
+    } else {
+      stateModelList = _accessor.getChildValues(_keyBuilder.stateModelDefs());
+    }
     for (StateModelDefinition stateModelDef : stateModelList) {
       stateModelDefs.put(stateModelDef.getStateModelDefId(), stateModelDef);
     }
@@ -299,35 +310,70 @@ public class ClusterAccessor {
    * @return map of resource id to resource
    */
   public Map<ResourceId, Resource> readResources() {
-    if (!isClusterStructureValid()) {
+    return readResources(false);
+  }
+
+  /**
+   * Read all resources in the cluster
+   * @param useCache Use the ClusterDataCache associated with this class rather than reading again
+   * @return map of resource id to resource
+   */
+  private Map<ResourceId, Resource> readResources(boolean useCache) {
+    if (!useCache && !isClusterStructureValid()) {
       LOG.error("Cluster is not fully set up yet!");
       return Collections.emptyMap();
     }
 
-    /**
-     * map of resource-id to ideal-state
-     */
-    Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
-
-    /**
-     * Map of resource id to external view
-     */
-    Map<String, ExternalView> externalViewMap =
-        _accessor.getChildValuesMap(_keyBuilder.externalViews());
+    Map<String, IdealState> idealStateMap;
+    Map<String, ResourceConfiguration> resourceConfigMap;
+    Map<String, ExternalView> externalViewMap;
+    Map<String, ResourceAssignment> resourceAssignmentMap;
+    if (useCache) {
+      idealStateMap = _cache.getIdealStates();
+      resourceConfigMap = _cache.getResourceConfigs();
+    } else {
+      idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
+      resourceConfigMap = _accessor.getChildValuesMap(_keyBuilder.resourceConfigs());
+    }
 
-    /**
-     * Map of resource id to user configuration
-     */
-    Map<String, ResourceConfiguration> resourceConfigMap =
-        _accessor.getChildValuesMap(_keyBuilder.resourceConfigs());
+    // check if external view and resource assignment reads are required
+    boolean extraReadsRequired = false;
+    for (String resourceName : idealStateMap.keySet()) {
+      if (extraReadsRequired) {
+        break;
+      }
+      // a rebalancer can be user defined if it has that mode set, or has a different rebalancer
+      // class
+      IdealState idealState = idealStateMap.get(resourceName);
+      extraReadsRequired =
+          extraReadsRequired || (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED);
+      RebalancerRef ref = idealState.getRebalancerRef();
+      if (ref != null) {
+        extraReadsRequired =
+            extraReadsRequired
+                || !PartitionedRebalancerConfig.isBuiltinRebalancer(ref.getRebalancerClass());
+      }
+    }
+    for (String resourceName : resourceConfigMap.keySet()) {
+      if (extraReadsRequired) {
+        break;
+      }
+      extraReadsRequired =
+          extraReadsRequired || resourceConfigMap.get(resourceName).hasRebalancerConfig();
+    }
 
-    /**
-     * Map of resource id to resource assignment
-     */
-    Map<String, ResourceAssignment> resourceAssignmentMap =
-        _accessor.getChildValuesMap(_keyBuilder.resourceAssignments());
+    // now read external view and resource assignments if needed
+    if (!useCache || extraReadsRequired) {
+      externalViewMap = _accessor.getChildValuesMap(_keyBuilder.externalViews());
+      resourceAssignmentMap = _accessor.getChildValuesMap(_keyBuilder.resourceAssignments());
+      _cache.setAssignmentWritePolicy(true);
+    } else {
+      externalViewMap = Maps.newHashMap();
+      resourceAssignmentMap = Maps.newHashMap();
+      _cache.setAssignmentWritePolicy(false);
+    }
 
-    // read all the resources
+    // populate all the resources
     Set<String> allResources = Sets.newHashSet();
     allResources.addAll(idealStateMap.keySet());
     allResources.addAll(resourceConfigMap.keySet());
@@ -347,45 +393,58 @@ public class ClusterAccessor {
    * @return map of participant id to participant, or empty map
    */
   public Map<ParticipantId, Participant> readParticipants() {
-    if (!isClusterStructureValid()) {
+    return readParticipants(false);
+  }
+
+  /**
+   * Read all participants in the cluster
+   * @param useCache Use the ClusterDataCache associated with this class rather than reading again
+   * @return map of participant id to participant, or empty map
+   */
+  private Map<ParticipantId, Participant> readParticipants(boolean useCache) {
+    if (!useCache && !isClusterStructureValid()) {
       LOG.error("Cluster is not fully set up yet!");
       return Collections.emptyMap();
     }
-
-    /**
-     * map of instance-id to instance-config
-     */
-    Map<String, InstanceConfig> instanceConfigMap =
-        _accessor.getChildValuesMap(_keyBuilder.instanceConfigs());
-
-    /**
-     * map of instance-id to live-instance
-     */
-    Map<String, LiveInstance> liveInstanceMap =
-        _accessor.getChildValuesMap(_keyBuilder.liveInstances());
+    Map<String, InstanceConfig> instanceConfigMap;
+    Map<String, LiveInstance> liveInstanceMap;
+    if (useCache) {
+      instanceConfigMap = _cache.getInstanceConfigMap();
+      liveInstanceMap = _cache.getLiveInstances();
+    } else {
+      instanceConfigMap = _accessor.getChildValuesMap(_keyBuilder.instanceConfigs());
+      liveInstanceMap = _accessor.getChildValuesMap(_keyBuilder.liveInstances());
+    }
 
     /**
      * map of participant-id to map of message-id to message
      */
-    Map<String, Map<String, Message>> messageMap = new HashMap<String, Map<String, Message>>();
-    for (String instanceName : liveInstanceMap.keySet()) {
-      Map<String, Message> instanceMsgMap =
-          _accessor.getChildValuesMap(_keyBuilder.messages(instanceName));
-      messageMap.put(instanceName, instanceMsgMap);
+    Map<String, Map<String, Message>> messageMap = Maps.newHashMap();
+    for (String participantName : liveInstanceMap.keySet()) {
+      Map<String, Message> instanceMsgMap;
+      if (useCache) {
+        instanceMsgMap = _cache.getMessages(participantName);
+      } else {
+        instanceMsgMap = _accessor.getChildValuesMap(_keyBuilder.messages(participantName));
+      }
+      messageMap.put(participantName, instanceMsgMap);
     }
 
     /**
      * map of participant-id to map of resource-id to current-state
      */
-    Map<String, Map<String, CurrentState>> currentStateMap =
-        new HashMap<String, Map<String, CurrentState>>();
+    Map<String, Map<String, CurrentState>> currentStateMap = Maps.newHashMap();
     for (String participantName : liveInstanceMap.keySet()) {
       LiveInstance liveInstance = liveInstanceMap.get(participantName);
       SessionId sessionId = liveInstance.getTypedSessionId();
-      Map<String, CurrentState> instanceCurStateMap =
-          _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
-              sessionId.stringify()));
-
+      Map<String, CurrentState> instanceCurStateMap;
+      if (useCache) {
+        instanceCurStateMap = _cache.getCurrentState(participantName, sessionId.stringify());
+      } else {
+        instanceCurStateMap =
+            _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
+                sessionId.stringify()));
+      }
       currentStateMap.put(participantName, instanceCurStateMap);
     }
 
@@ -472,8 +531,21 @@ public class ClusterAccessor {
    * @return map of context id to controller context
    */
   public Map<ContextId, ControllerContext> readControllerContext() {
-    Map<String, ControllerContextHolder> contextHolders =
-        _accessor.getChildValuesMap(_keyBuilder.controllerContexts());
+    return readControllerContext(false);
+  }
+
+  /**
+   * Read the persisted controller contexts
+   * @param useCache Use the ClusterDataCache associated with this class rather than reading again
+   * @return map of context id to controller context
+   */
+  private Map<ContextId, ControllerContext> readControllerContext(boolean useCache) {
+    Map<String, ControllerContextHolder> contextHolders;
+    if (useCache) {
+      contextHolders = _cache.getContextMap();
+    } else {
+      contextHolders = _accessor.getChildValuesMap(_keyBuilder.controllerContexts());
+    }
     Map<ContextId, ControllerContext> contexts = Maps.newHashMap();
     for (String contextName : contextHolders.keySet()) {
       contexts.put(ContextId.from(contextName), contextHolders.get(contextName).getContext());
@@ -482,6 +554,22 @@ public class ClusterAccessor {
   }
 
   /**
+   * Get the current cluster stats
+   * @return PersistentStats
+   */
+  public PersistentStats getStats() {
+    return _accessor.getProperty(_keyBuilder.persistantStat());
+  }
+
+  /**
+   * Get the current cluster alerts
+   * @return Alerts
+   */
+  public Alerts getAlerts() {
+    return _accessor.getProperty(_keyBuilder.alerts());
+  }
+
+  /**
    * Add a statistic specification to the cluster. Existing stat specifications will not be
    * overwritten
    * @param statName string representing a stat specification
@@ -673,14 +761,22 @@ public class ClusterAccessor {
       return false;
     }
 
+    // Create an IdealState from a RebalancerConfig (if the resource supports it)
+    IdealState idealState =
+        ResourceAccessor.rebalancerConfigToIdealState(resource.getRebalancerConfig(),
+            resource.getBucketSize(), resource.getBatchMessageMode());
+    if (idealState != null) {
+      _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
+    }
+
     // Add resource user config
     if (resource.getUserConfig() != null) {
       ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
       configuration.setType(resource.getType());
       configuration.addNamespacedConfig(resource.getUserConfig());
       PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
-      if (partitionedConfig == null
-          || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+      if (idealState == null
+          && (partitionedConfig == null || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED)) {
         // only persist if this is not easily convertible to an ideal state
         configuration
             .addNamespacedConfig(new RebalancerConfigHolder(resource.getRebalancerConfig())
@@ -688,14 +784,6 @@ public class ClusterAccessor {
       }
       _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
     }
-
-    // Create an IdealState from a RebalancerConfig (if the resource is partitioned)
-    IdealState idealState =
-        ResourceAccessor.rebalancerConfigToIdealState(resource.getRebalancerConfig(),
-            resource.getBucketSize(), resource.getBatchMessageMode());
-    if (idealState != null) {
-      _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
-    }
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index 73d43b0..a1d6580 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -39,6 +39,7 @@ import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
 import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
@@ -137,14 +138,19 @@ public class ResourceAccessor {
    */
   private boolean setConfiguration(ResourceId resourceId, ResourceConfiguration configuration,
       RebalancerConfig rebalancerConfig) {
-    boolean status =
-        _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+    boolean status = true;
+    if (configuration != null) {
+      status =
+          _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+    }
     // set an ideal state if the resource supports it
     IdealState idealState =
         rebalancerConfigToIdealState(rebalancerConfig, configuration.getBucketSize(),
             configuration.getBatchMessageMode());
     if (idealState != null) {
-      _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
+      status =
+          status
+              && _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
     }
     return status;
   }
@@ -253,7 +259,14 @@ public class ResourceAccessor {
     }
     ResourceId resourceId = resourceConfig.getId();
     ResourceConfiguration config = new ResourceConfiguration(resourceId);
-    config.addNamespacedConfig(resourceConfig.getUserConfig());
+    UserConfig userConfig = resourceConfig.getUserConfig();
+    if (userConfig != null
+        && (!userConfig.getSimpleFields().isEmpty() || !userConfig.getListFields().isEmpty() || !userConfig
+            .getMapFields().isEmpty())) {
+      config.addNamespacedConfig(userConfig);
+    } else {
+      userConfig = null;
+    }
     PartitionedRebalancerConfig partitionedConfig =
         PartitionedRebalancerConfig.from(resourceConfig.getRebalancerConfig());
     if (partitionedConfig == null
@@ -261,9 +274,11 @@ public class ResourceAccessor {
       // only persist if this is not easily convertible to an ideal state
       config.addNamespacedConfig(new RebalancerConfigHolder(resourceConfig.getRebalancerConfig())
           .toNamespacedConfig());
+      config.setBucketSize(resourceConfig.getBucketSize());
+      config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
+    } else if (userConfig == null) {
+      config = null;
     }
-    config.setBucketSize(resourceConfig.getBucketSize());
-    config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
     setConfiguration(resourceId, config, resourceConfig.getRebalancerConfig());
     return true;
   }
@@ -387,9 +402,17 @@ public class ResourceAccessor {
       boolean batchMessageMode) {
     PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
     if (partitionedConfig != null) {
+      if (!PartitionedRebalancerConfig.isBuiltinConfig(partitionedConfig.getClass())) {
+        // don't proceed if this resource cannot be described by an ideal state
+        return null;
+      }
       IdealState idealState = new IdealState(partitionedConfig.getResourceId());
       idealState.setRebalanceMode(partitionedConfig.getRebalanceMode());
-      idealState.setRebalancerRef(partitionedConfig.getRebalancerRef());
+
+      RebalancerRef ref = partitionedConfig.getRebalancerRef();
+      if (ref != null) {
+        idealState.setRebalancerRef(partitionedConfig.getRebalancerRef());
+      }
       String replicas = null;
       if (partitionedConfig.anyLiveParticipant()) {
         replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
@@ -404,13 +427,14 @@ public class ResourceAccessor {
       idealState.setStateModelFactoryId(partitionedConfig.getStateModelFactoryId());
       idealState.setBucketSize(bucketSize);
       idealState.setBatchMessageMode(batchMessageMode);
-      if (partitionedConfig.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+      idealState.setRebalancerConfigClass(config.getClass());
+      if (SemiAutoRebalancerConfig.class.equals(config.getClass())) {
         SemiAutoRebalancerConfig semiAutoConfig =
             BasicRebalancerConfig.convert(config, SemiAutoRebalancerConfig.class);
         for (PartitionId partitionId : semiAutoConfig.getPartitionSet()) {
           idealState.setPreferenceList(partitionId, semiAutoConfig.getPreferenceList(partitionId));
         }
-      } else if (partitionedConfig.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+      } else if (CustomRebalancerConfig.class.equals(config.getClass())) {
         CustomRebalancerConfig customConfig =
             BasicRebalancerConfig.convert(config, CustomRebalancerConfig.class);
         for (PartitionId partitionId : customConfig.getPartitionSet()) {

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
index 22a1528..ddc98fa 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
@@ -5,8 +5,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.helix.alerts.AlertsHolder;
-import org.apache.helix.alerts.StatsHolder;
 import org.apache.helix.api.Scope;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.ClusterId;
@@ -14,14 +12,12 @@ import org.apache.helix.api.id.ConstraintId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ClusterConstraints.ConstraintValue;
 import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.PersistentStats;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.Transition;
 import org.apache.helix.model.builder.ConstraintItemBuilder;
@@ -61,8 +57,6 @@ public class ClusterConfig {
   private final Map<ParticipantId, ParticipantConfig> _participantMap;
   private final Map<ConstraintType, ClusterConstraints> _constraintMap;
   private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
-  private final PersistentStats _stats;
-  private final Alerts _alerts;
   private final UserConfig _userConfig;
   private final boolean _isPaused;
   private final boolean _autoJoin;
@@ -74,8 +68,6 @@ public class ClusterConfig {
    * @param participantMap map of participant id to participant config
    * @param constraintMap map of constraint type to all constraints of that type
    * @param stateModelMap map of state model id to state model definition
-   * @param stats statistics to watch on the cluster
-   * @param alerts alerts that the cluster can trigger
    * @param userConfig user-defined cluster properties
    * @param isPaused true if paused, false if active
    * @param allowAutoJoin true if participants can join automatically, false otherwise
@@ -83,15 +75,13 @@ public class ClusterConfig {
   private ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
       Map<ParticipantId, ParticipantConfig> participantMap,
       Map<ConstraintType, ClusterConstraints> constraintMap,
-      Map<StateModelDefId, StateModelDefinition> stateModelMap, PersistentStats stats,
-      Alerts alerts, UserConfig userConfig, boolean isPaused, boolean allowAutoJoin) {
+      Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
+      boolean isPaused, boolean allowAutoJoin) {
     _id = id;
     _resourceMap = ImmutableMap.copyOf(resourceMap);
     _participantMap = ImmutableMap.copyOf(participantMap);
     _constraintMap = ImmutableMap.copyOf(constraintMap);
     _stateModelMap = ImmutableMap.copyOf(stateModelMap);
-    _stats = stats;
-    _alerts = alerts;
     _userConfig = userConfig;
     _isPaused = isPaused;
     _autoJoin = allowAutoJoin;
@@ -237,22 +227,6 @@ public class ClusterConfig {
   }
 
   /**
-   * Get all the statistics persisted on the cluster
-   * @return PersistentStats instance
-   */
-  public PersistentStats getStats() {
-    return _stats;
-  }
-
-  /**
-   * Get all the alerts persisted on the cluster
-   * @return Alerts instance
-   */
-  public Alerts getAlerts() {
-    return _alerts;
-  }
-
-  /**
    * Get user-specified configuration properties of this cluster
    * @return UserConfig properties
    */
@@ -287,8 +261,6 @@ public class ClusterConfig {
 
     private Set<Fields> _updateFields;
     private Map<ConstraintType, Set<ConstraintId>> _removedConstraints;
-    private PersistentStats _removedStats;
-    private Alerts _removedAlerts;
     private Builder _builder;
 
     /**
@@ -302,8 +274,6 @@ public class ClusterConfig {
         Set<ConstraintId> constraints = Sets.newHashSet();
         _removedConstraints.put(type, constraints);
       }
-      _removedStats = new PersistentStats(PersistentStats.nodeName);
-      _removedAlerts = new Alerts(Alerts.nodeName);
       _builder = new Builder(clusterId);
     }
 
@@ -431,57 +401,6 @@ public class ClusterConfig {
     }
 
     /**
-     * Add a statistic specification to the cluster. Existing specifications will not be overwritten
-     * @param stat string specifying the stat specification
-     * @return Delta
-     */
-    public Delta addStat(String stat) {
-      _builder.addStat(stat);
-      return this;
-    }
-
-    /**
-     * Add an alert specification for the cluster. Existing specifications will not be overwritten
-     * @param alert string specifying the alert specification
-     * @return Delta
-     */
-    public Delta addAlert(String alert) {
-      _builder.addAlert(alert);
-      return this;
-    }
-
-    /**
-     * Remove a statistic specification from the cluster
-     * @param stat statistic specification
-     * @return Delta
-     */
-    public Delta removeStat(String stat) {
-      Map<String, Map<String, String>> parsedStat = StatsHolder.parseStat(stat);
-      Map<String, Map<String, String>> currentStats = _removedStats.getMapFields();
-      for (String statName : parsedStat.keySet()) {
-        currentStats.put(statName, parsedStat.get(statName));
-      }
-      return this;
-    }
-
-    /**
-     * Remove an alert specification for the cluster
-     * @param alert alert specification
-     * @return Delta
-     */
-    public Delta removeAlert(String alert) {
-      Map<String, Map<String, String>> currAlertMap = _removedAlerts.getMapFields();
-      if (!currAlertMap.containsKey(alert)) {
-        Map<String, String> parsedAlert = Maps.newHashMap();
-        StringBuilder statsName = new StringBuilder();
-        AlertsHolder.parseAlert(alert, statsName, parsedAlert);
-        removeStat(statsName.toString());
-        currAlertMap.put(alert, parsedAlert);
-      }
-      return this;
-    }
-
-    /**
      * Create a ClusterConfig that is the combination of an existing ClusterConfig and this delta
      * @param orig the original ClusterConfig
      * @return updated ClusterConfig
@@ -494,8 +413,7 @@ public class ClusterConfig {
               .addParticipants(orig.getParticipantMap().values())
               .addStateModelDefinitions(orig.getStateModelMap().values())
               .userConfig(orig.getUserConfig()).pausedStatus(orig.isPaused())
-              .autoJoin(orig.autoJoinAllowed()).addStats(orig.getStats())
-              .addAlerts(orig.getAlerts());
+              .autoJoin(orig.autoJoinAllowed());
       for (Fields field : _updateFields) {
         switch (field) {
         case USER_CONFIG:
@@ -529,29 +447,8 @@ public class ClusterConfig {
         builder.addConstraint(constraints);
       }
 
-      // add stats and alerts
-      builder.addStats(deltaConfig.getStats());
-      builder.addAlerts(deltaConfig.getAlerts());
-
       // get the result
-      ClusterConfig result = builder.build();
-
-      // remove stats
-      PersistentStats stats = result.getStats();
-      for (String removedStat : _removedStats.getMapFields().keySet()) {
-        if (stats.getMapFields().containsKey(removedStat)) {
-          stats.getMapFields().remove(removedStat);
-        }
-      }
-
-      // remove alerts
-      Alerts alerts = result.getAlerts();
-      for (String removedAlert : _removedAlerts.getMapFields().keySet()) {
-        if (alerts.getMapFields().containsKey(removedAlert)) {
-          alerts.getMapFields().remove(removedAlert);
-        }
-      }
-      return result;
+      return builder.build();
     }
   }
 
@@ -565,8 +462,6 @@ public class ClusterConfig {
     private final Map<ConstraintType, ClusterConstraints> _constraintMap;
     private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
     private UserConfig _userConfig;
-    private PersistentStats _stats;
-    private Alerts _alerts;
     private boolean _isPaused;
     private boolean _autoJoin;
 
@@ -583,8 +478,6 @@ public class ClusterConfig {
       _isPaused = false;
       _autoJoin = false;
       _userConfig = new UserConfig(Scope.cluster(id));
-      _stats = new PersistentStats(PersistentStats.nodeName);
-      _alerts = new Alerts(Alerts.nodeName);
     }
 
     /**
@@ -789,74 +682,6 @@ public class ClusterConfig {
     }
 
     /**
-     * Add a statistic specification to the cluster. Existing specifications will not be overwritten
-     * @param stat String specifying the stat specification
-     * @return Builder
-     */
-    public Builder addStat(String stat) {
-      Map<String, Map<String, String>> parsedStat = StatsHolder.parseStat(stat);
-      Map<String, Map<String, String>> currentStats = _stats.getMapFields();
-      for (String statName : parsedStat.keySet()) {
-        if (!currentStats.containsKey(statName)) {
-          currentStats.put(statName, parsedStat.get(statName));
-        }
-      }
-      return this;
-    }
-
-    /**
-     * Add statistic specifications to the cluster. Existing specifications will not be overwritten
-     * @param stats PersistentStats specifying the stat specification
-     * @return Builder
-     */
-    public Builder addStats(PersistentStats stats) {
-      if (stats == null) {
-        return this;
-      }
-      Map<String, Map<String, String>> parsedStat = stats.getMapFields();
-      Map<String, Map<String, String>> currentStats = _stats.getMapFields();
-      for (String statName : parsedStat.keySet()) {
-        if (!currentStats.containsKey(statName)) {
-          currentStats.put(statName, parsedStat.get(statName));
-        }
-      }
-      return this;
-    }
-
-    /**
-     * Add alert specifications to the cluster. Existing specifications will not be overwritten
-     * @param alert string representing alert specifications
-     * @return Builder
-     */
-    public Builder addAlert(String alert) {
-      Map<String, Map<String, String>> currAlertMap = _alerts.getMapFields();
-      if (!currAlertMap.containsKey(alert)) {
-        Map<String, String> parsedAlert = Maps.newHashMap();
-        StringBuilder statsName = new StringBuilder();
-        AlertsHolder.parseAlert(alert, statsName, parsedAlert);
-        addStat(statsName.toString());
-        currAlertMap.put(alert, parsedAlert);
-      }
-      return this;
-    }
-
-    /**
-     * Add alert specifications to the cluster. Existing specifications will not be overwritten
-     * @param alerts Alerts instance
-     * @return Builder
-     */
-    public Builder addAlerts(Alerts alerts) {
-      if (alerts == null) {
-        return this;
-      }
-      Map<String, Map<String, String>> alertMap = alerts.getMapFields();
-      for (String alert : alertMap.keySet()) {
-        addAlert(alert);
-      }
-      return this;
-    }
-
-    /**
      * Set the paused status of the cluster
      * @param isPaused true if paused, false otherwise
      * @return Builder
@@ -892,7 +717,7 @@ public class ClusterConfig {
      */
     public ClusterConfig build() {
       return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _stateModelMap,
-          _stats, _alerts, _userConfig, _isPaused, _autoJoin);
+          _userConfig, _isPaused, _autoJoin);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 2b2a71e..b11ab9c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -48,6 +48,7 @@ import org.apache.helix.api.id.SessionId;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.PipelineRegistry;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ClusterEventBlockingQueue;
 import org.apache.helix.controller.stages.CompatibilityCheckStage;
@@ -121,6 +122,11 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   int _timerPeriod = Integer.MAX_VALUE;
 
   /**
+   * A cache maintained across pipelines
+   */
+  private ClusterDataCache _cache;
+
+  /**
    * Default constructor that creates a default pipeline registry. This is sufficient in
    * most cases, but if there is a some thing specific needed use another constructor
    * where in you can pass a pipeline registry
@@ -138,6 +144,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
 
     @Override
     public void run() {
+      _cache.requireFullRefresh();
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.CALLBACK);
       ClusterEvent event = new ClusterEvent("periodicalRebalance");
@@ -228,6 +235,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     _registry = registry;
     _lastSeenInstances = new AtomicReference<Map<String, LiveInstance>>();
     _lastSeenSessions = new AtomicReference<Map<String, LiveInstance>>();
+    _cache = new ClusterDataCache();
     _eventQueue = new ClusterEventBlockingQueue();
     _eventThread = new ClusterEventProcessor();
     _eventThread.start();
@@ -276,6 +284,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       }
     }
 
+    // add the cache
+    event.addAttribute("ClusterDataCache", _cache);
+
     List<Pipeline> pipelines = _registry.getPipelinesForEvent(event.getName());
     if (pipelines == null || pipelines.size() == 0) {
       logger.info("No pipeline to run for event:" + event.getName());
@@ -318,6 +329,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   public void onStateChange(String instanceName, List<CurrentState> statesInfo,
       NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onStateChange()");
+    if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+      _cache.requireFullRefresh();
+    }
     ClusterEvent event = new ClusterEvent("currentStateChange");
     event.addAttribute("helixmanager", changeContext.getManager());
     event.addAttribute("instanceName", instanceName);
@@ -341,6 +355,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   public void onMessage(String instanceName, List<Message> messages,
       NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onMessage()");
+    if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+      _cache.requireFullRefresh();
+    }
 
     ClusterEvent event = new ClusterEvent("messageChange");
     event.addAttribute("helixmanager", changeContext.getManager());
@@ -360,10 +377,15 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   public void onLiveInstanceChange(List<LiveInstance> liveInstances,
       NotificationContext changeContext) {
     logger.info("START: Generic GenericClusterController.onLiveInstanceChange()");
+    if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+      _cache.requireFullRefresh();
+    }
 
     if (liveInstances == null) {
       liveInstances = Collections.emptyList();
     }
+    _cache.setLiveInstances(liveInstances);
+
     // Go though the live instance list and make sure that we are observing them
     // accordingly. The action is done regardless of the paused flag.
     if (changeContext.getType() == NotificationContext.Type.INIT
@@ -403,6 +425,14 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   @Override
   public void onIdealStateChange(List<IdealState> idealStates, NotificationContext changeContext) {
     logger.info("START: Generic GenericClusterController.onIdealStateChange()");
+    if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+      _cache.requireFullRefresh();
+    }
+
+    if (idealStates == null) {
+      idealStates = Collections.emptyList();
+    }
+    _cache.setIdealStates(idealStates);
     ClusterEvent event = new ClusterEvent("idealStateChange");
     event.addAttribute("helixmanager", changeContext.getManager());
     event.addAttribute("changeContext", changeContext);
@@ -419,6 +449,15 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   @Override
   public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onConfigChange()");
+    if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+      _cache.requireFullRefresh();
+    }
+
+    if (configs == null) {
+      configs = Collections.emptyList();
+    }
+    _cache.setInstanceConfigs(configs);
+
     ClusterEvent event = new ClusterEvent("configChange");
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("helixmanager", changeContext.getManager());
@@ -438,6 +477,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   @Override
   public void onControllerChange(NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onControllerChange()");
+    _cache.requireFullRefresh();
     if (changeContext != null && changeContext.getType() == Type.FINALIZE) {
       logger.info("GenericClusterController.onControllerChange() FINALIZE");
       return;

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
index 0c55d45..13616b3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
@@ -113,8 +113,8 @@ public class FullAutoRebalancer implements HelixRebalancer {
       }
       if (!taggedLiveNodes.isEmpty()) {
         // live nodes exist that have this tag
-        if (LOG.isInfoEnabled()) {
-          LOG.info("found the following participants with tag " + config.getParticipantGroupTag()
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("found the following participants with tag " + config.getParticipantGroupTag()
               + " for " + config.getResourceId() + ": " + taggedLiveNodes);
         }
       } else if (taggedNodes.isEmpty()) {
@@ -132,12 +132,12 @@ public class FullAutoRebalancer implements HelixRebalancer {
 
     // determine which nodes the replicas should live on
     int maxPartition = config.getMaxPartitionsPerParticipant();
-    if (LOG.isInfoEnabled()) {
-      LOG.info("currentMapping: " + currentMapping);
-      LOG.info("stateCountMap: " + stateCountMap);
-      LOG.info("liveNodes: " + liveParticipantList);
-      LOG.info("allNodes: " + allParticipantList);
-      LOG.info("maxPartition: " + maxPartition);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("currentMapping: " + currentMapping);
+      LOG.debug("stateCountMap: " + stateCountMap);
+      LOG.debug("liveNodes: " + liveParticipantList);
+      LOG.debug("allNodes: " + allParticipantList);
+      LOG.debug("maxPartition: " + maxPartition);
     }
     ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
     _algorithm =

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
index 974222d..8439583 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
@@ -34,25 +34,46 @@ public class RebalancerRef {
   @JsonProperty("rebalancerClassName")
   private final String _rebalancerClassName;
 
+  @JsonIgnore
+  private Class<? extends HelixRebalancer> _class;
+
   @JsonCreator
   private RebalancerRef(@JsonProperty("rebalancerClassName") String rebalancerClassName) {
     _rebalancerClassName = rebalancerClassName;
+    _class = null;
   }
 
   /**
-   * Get an instantiated Rebalancer
-   * @return Rebalancer or null if instantiation failed
+   * Get an instantiated HelixRebalancer
+   * @return HelixRebalancer or null if instantiation failed
    */
   @JsonIgnore
   public HelixRebalancer getRebalancer() {
     try {
-      return (HelixRebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
+      return (HelixRebalancer) (getRebalancerClass().newInstance());
     } catch (Exception e) {
       LOG.warn("Exception while invoking custom rebalancer class:" + _rebalancerClassName, e);
     }
     return null;
   }
 
+  /**
+   * Get the class object of this rebalancer ref
+   * @return Class
+   */
+  @JsonIgnore
+  public Class<? extends HelixRebalancer> getRebalancerClass() {
+    try {
+      if (_class == null) {
+        _class =
+            HelixUtil.loadClass(getClass(), _rebalancerClassName).asSubclass(HelixRebalancer.class);
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception while loading rebalancer class:" + _rebalancerClassName, e);
+    }
+    return _class;
+  }
+
   @Override
   public String toString() {
     return _rebalancerClassName;

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
index 73c3ccc..a44b230 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
@@ -53,7 +53,12 @@ public class CustomRebalancerConfig extends PartitionedRebalancerConfig {
    * Instantiate a CustomRebalancerConfig
    */
   public CustomRebalancerConfig() {
-    setRebalanceMode(RebalanceMode.CUSTOMIZED);
+    if (getClass().equals(CustomRebalancerConfig.class)) {
+      // only mark this as customized mode if this specifc config is used
+      setRebalanceMode(RebalanceMode.CUSTOMIZED);
+    } else {
+      setRebalanceMode(RebalanceMode.USER_DEFINED);
+    }
     setRebalancerRef(RebalancerRef.from(CustomRebalancer.class));
     _preferenceMaps = Maps.newHashMap();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
index 828d509..16bb4cb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
@@ -30,7 +30,12 @@ import org.apache.helix.model.IdealState.RebalanceMode;
  */
 public class FullAutoRebalancerConfig extends PartitionedRebalancerConfig {
   public FullAutoRebalancerConfig() {
-    setRebalanceMode(RebalanceMode.FULL_AUTO);
+    if (getClass().equals(FullAutoRebalancerConfig.class)) {
+      // only mark this as full auto mode if this specifc config is used
+      setRebalanceMode(RebalanceMode.FULL_AUTO);
+    } else {
+      setRebalanceMode(RebalanceMode.USER_DEFINED);
+    }
     setRebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
index 2c9769d..dd661d9 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
@@ -10,14 +10,20 @@ import org.apache.helix.api.Partition;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.CustomRebalancer;
+import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
 import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.task.TaskRebalancer;
 import org.codehaus.jackson.annotate.JsonIgnore;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -51,6 +57,23 @@ public class PartitionedRebalancerConfig extends BasicRebalancerConfig implement
   private int _maxPartitionsPerParticipant;
   private RebalanceMode _rebalanceMode;
 
+  @JsonIgnore
+  private static final Set<Class<? extends RebalancerConfig>> BUILTIN_CONFIG_CLASSES = Sets
+      .newHashSet();
+  @JsonIgnore
+  private static final Set<Class<? extends HelixRebalancer>> BUILTIN_REBALANCER_CLASSES = Sets
+      .newHashSet();
+  static {
+    BUILTIN_CONFIG_CLASSES.add(PartitionedRebalancerConfig.class);
+    BUILTIN_CONFIG_CLASSES.add(FullAutoRebalancerConfig.class);
+    BUILTIN_CONFIG_CLASSES.add(SemiAutoRebalancerConfig.class);
+    BUILTIN_CONFIG_CLASSES.add(CustomRebalancerConfig.class);
+    BUILTIN_REBALANCER_CLASSES.add(FullAutoRebalancer.class);
+    BUILTIN_REBALANCER_CLASSES.add(SemiAutoRebalancer.class);
+    BUILTIN_REBALANCER_CLASSES.add(CustomRebalancer.class);
+    BUILTIN_REBALANCER_CLASSES.add(TaskRebalancer.class);
+  }
+
   /**
    * Instantiate a PartitionedRebalancerConfig
    */
@@ -186,13 +209,42 @@ public class PartitionedRebalancerConfig extends BasicRebalancerConfig implement
   }
 
   /**
+   * Check if the given class is compatible with an {@link IdealState}
+   * @param clazz the PartitionedRebalancerConfig subclass
+   * @return true if IdealState can be used to describe this config, false otherwise
+   */
+  public static boolean isBuiltinConfig(Class<? extends RebalancerConfig> clazz) {
+    return BUILTIN_CONFIG_CLASSES.contains(clazz);
+  }
+
+  /**
+   * Check if the given class is a built-in rebalancer class
+   * @param clazz the HelixRebalancer subclass
+   * @return true if the rebalancer class is built in, false otherwise
+   */
+  public static boolean isBuiltinRebalancer(Class<? extends HelixRebalancer> clazz) {
+    return BUILTIN_REBALANCER_CLASSES.contains(clazz);
+  }
+
+  /**
    * Convert a physically-stored IdealState into a rebalancer config for a partitioned resource
    * @param idealState populated IdealState
    * @return PartitionedRebalancerConfig
    */
   public static PartitionedRebalancerConfig from(IdealState idealState) {
     PartitionedRebalancerConfig config;
-    switch (idealState.getRebalanceMode()) {
+    RebalanceMode mode = idealState.getRebalanceMode();
+    if (mode == RebalanceMode.USER_DEFINED) {
+      Class<? extends RebalancerConfig> configClass = idealState.getRebalancerConfigClass();
+      if (configClass.equals(FullAutoRebalancerConfig.class)) {
+        mode = RebalanceMode.FULL_AUTO;
+      } else if (configClass.equals(SemiAutoRebalancerConfig.class)) {
+        mode = RebalanceMode.SEMI_AUTO;
+      } else if (configClass.equals(CustomRebalancerConfig.class)) {
+        mode = RebalanceMode.CUSTOMIZED;
+      }
+    }
+    switch (mode) {
     case FULL_AUTO:
       FullAutoRebalancerConfig.Builder fullAutoBuilder =
           new FullAutoRebalancerConfig.Builder(idealState.getResourceId());
@@ -252,7 +304,8 @@ public class PartitionedRebalancerConfig extends BasicRebalancerConfig implement
         .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
         .participantGroupTag(idealState.getInstanceGroupTag())
         .stateModelDefId(idealState.getStateModelDefId())
-        .stateModelFactoryId(idealState.getStateModelFactoryId());
+        .stateModelFactoryId(idealState.getStateModelFactoryId())
+        .rebalanceMode(idealState.getRebalanceMode());
     RebalancerRef rebalancerRef = idealState.getRebalancerRef();
     if (rebalancerRef != null) {
       builder.rebalancerRef(rebalancerRef);

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
index 8581732..d6ddb50 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
@@ -32,7 +32,7 @@ import org.apache.log4j.Logger;
  * information specific to each rebalancer.
  */
 public final class RebalancerConfigHolder {
-  private enum Fields {
+  public enum Fields {
     SERIALIZER_CLASS,
     REBALANCER_CONFIG,
     REBALANCER_CONFIG_CLASS

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
index bfc3309..727c3df 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
@@ -56,7 +56,12 @@ public final class SemiAutoRebalancerConfig extends PartitionedRebalancerConfig
    * Instantiate a SemiAutoRebalancerConfig
    */
   public SemiAutoRebalancerConfig() {
-    setRebalanceMode(RebalanceMode.SEMI_AUTO);
+    if (getClass().equals(SemiAutoRebalancerConfig.class)) {
+      // only mark this as semi auto mode if this specifc config is used
+      setRebalanceMode(RebalanceMode.SEMI_AUTO);
+    } else {
+      setRebalanceMode(RebalanceMode.USER_DEFINED);
+    }
     setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
     _preferenceLists = Maps.newHashMap();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index ec812b2..644b9f6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -37,12 +37,14 @@ import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.controller.rebalancer.FallbackRebalancer;
 import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 /**
@@ -52,6 +54,9 @@ import com.google.common.collect.Sets;
 public class BestPossibleStateCalcStage extends AbstractBaseStage {
   private static final Logger LOG = Logger.getLogger(BestPossibleStateCalcStage.class.getName());
 
+  // cache for rebalancer instances
+  private Map<ResourceId, HelixRebalancer> _rebalancerMap = Maps.newHashMap();
+
   @Override
   public void process(ClusterEvent event) throws Exception {
     long startTime = System.currentTimeMillis();
@@ -63,11 +68,11 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
     Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
-    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Cluster cluster = event.getAttribute("Cluster");
 
     if (currentStateOutput == null || resourceMap == null || cluster == null) {
       throw new StageException("Missing attributes in event:" + event
-          + ". Requires CURRENT_STATE|RESOURCES|DataCache");
+          + ". Requires CURRENT_STATE|RESOURCES|Cluster");
     }
 
     BestPossibleStateOutput bestPossibleStateOutput =
@@ -178,25 +183,42 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
           stateModelDefs.get(rebalancerConfig.getStateModelDefId());
       ResourceAssignment resourceAssignment = null;
       if (rebalancerConfig != null) {
+        // use a cached rebalancer if possible
+        RebalancerRef ref = rebalancerConfig.getRebalancerRef();
         HelixRebalancer rebalancer = null;
-        if (rebalancerConfig != null && rebalancerConfig.getRebalancerRef() != null) {
-          rebalancer = rebalancerConfig.getRebalancerRef().getRebalancer();
+        if (_rebalancerMap.containsKey(resourceId)) {
+          HelixRebalancer candidateRebalancer = _rebalancerMap.get(resourceId);
+          if (ref != null && candidateRebalancer.getClass().equals(ref.toString())) {
+            rebalancer = candidateRebalancer;
+          }
         }
-        HelixManager manager = event.getAttribute("helixmanager");
-        ControllerContextProvider provider =
-            event.getAttribute(AttributeName.CONTEXT_PROVIDER.toString());
+
+        // otherwise instantiate a new one
         if (rebalancer == null) {
-          rebalancer = new FallbackRebalancer();
+          if (ref != null) {
+            rebalancer = ref.getRebalancer();
+          }
+          HelixManager manager = event.getAttribute("helixmanager");
+          ControllerContextProvider provider =
+              event.getAttribute(AttributeName.CONTEXT_PROVIDER.toString());
+          if (rebalancer == null) {
+            rebalancer = new FallbackRebalancer();
+          }
+          rebalancer.init(manager, provider);
+          _rebalancerMap.put(resourceId, rebalancer);
         }
-        rebalancer.init(manager, provider);
         ResourceAssignment currentAssignment = null;
         Resource resourceSnapshot = cluster.getResource(resourceId);
         if (resourceSnapshot != null) {
           currentAssignment = resourceSnapshot.getResourceAssignment();
         }
-        resourceAssignment =
-            rebalancer.computeResourceMapping(rebalancerConfig, currentAssignment, cluster,
-                currentStateOutput);
+        try {
+          resourceAssignment =
+              rebalancer.computeResourceMapping(rebalancerConfig, currentAssignment, cluster,
+                  currentStateOutput);
+        } catch (Exception e) {
+          LOG.error("Rebalancer for resource " + resourceId + " failed.", e);
+        }
       }
       if (resourceAssignment == null) {
         resourceAssignment =

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 6f09d26..0c28bdf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -22,12 +22,18 @@ package org.apache.helix.controller.stages;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.HelixConstants.StateModelToken;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.model.ClusterConfiguration;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.CurrentState;
@@ -35,22 +41,43 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.ResourceConfiguration;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 /**
  * Reads the data from the cluster using data accessor. This output ClusterData which
  * provides useful methods to search/lookup properties
  */
-@Deprecated
 public class ClusterDataCache {
   Map<String, LiveInstance> _liveInstanceMap;
+  Map<String, LiveInstance> _liveInstanceCacheMap;
   Map<String, IdealState> _idealStateMap;
+  Map<String, IdealState> _idealStateCacheMap;
   Map<String, StateModelDefinition> _stateModelDefMap;
   Map<String, InstanceConfig> _instanceConfigMap;
+  Map<String, InstanceConfig> _instanceConfigCacheMap;
   Map<String, ClusterConstraints> _constraintMap;
   Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
   Map<String, Map<String, Message>> _messageMap;
+  Map<String, Map<String, String>> _idealStateRuleMap;
+  Map<String, ResourceConfiguration> _resourceConfigMap;
+  Map<String, ControllerContextHolder> _controllerContextMap;
+  PauseSignal _pause;
+  LiveInstance _leader;
+  ClusterConfiguration _clusterConfig;
+  boolean _writeAssignments;
+
+  // maintain a cache of participant messages across pipeline runs
+  Map<String, Map<String, Message>> _messageCache = Maps.newHashMap();
+
+  boolean _init = true;
 
   // Map<String, Map<String, HealthStat>> _healthStatMap;
   // private HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
@@ -66,39 +93,111 @@ public class ClusterDataCache {
    * @param accessor
    * @return
    */
-  public boolean refresh(HelixDataAccessor accessor) {
+  public synchronized boolean refresh(HelixDataAccessor accessor) {
+    LOG.info("START: ClusterDataCache.refresh()");
+    long startTime = System.currentTimeMillis();
+
     Builder keyBuilder = accessor.keyBuilder();
-    _idealStateMap = accessor.getChildValuesMap(keyBuilder.idealStates());
-    _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+
+    if (_init) {
+      _idealStateCacheMap = accessor.getChildValuesMap(keyBuilder.idealStates());
+      _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+      _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+    }
+    _idealStateMap = Maps.newHashMap(_idealStateCacheMap);
+    _liveInstanceMap = Maps.newHashMap(_liveInstanceCacheMap);
+    _instanceConfigMap = Maps.newHashMap(_instanceConfigCacheMap);
 
     for (LiveInstance instance : _liveInstanceMap.values()) {
-      LOG.trace("live instance: " + instance.getParticipantId() + " "
-          + instance.getTypedSessionId());
+      LOG.trace("live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
     }
 
     _stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
-    _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
     _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());
 
     Map<String, Map<String, Message>> msgMap = new HashMap<String, Map<String, Message>>();
+    List<PropertyKey> newMessageKeys = Lists.newLinkedList();
+    long purgeSum = 0;
     for (String instanceName : _liveInstanceMap.keySet()) {
-      Map<String, Message> map = accessor.getChildValuesMap(keyBuilder.messages(instanceName));
-      msgMap.put(instanceName, map);
+      // get the cache
+      Map<String, Message> cachedMap = _messageCache.get(instanceName);
+      if (cachedMap == null) {
+        cachedMap = Maps.newHashMap();
+        _messageCache.put(instanceName, cachedMap);
+      }
+      msgMap.put(instanceName, cachedMap);
+
+      // get the current names
+      Set<String> messageNames =
+          Sets.newHashSet(accessor.getChildNames(keyBuilder.messages(instanceName)));
+
+      long purgeStart = System.currentTimeMillis();
+      // clear stale names
+      Iterator<String> cachedNamesIter = cachedMap.keySet().iterator();
+      while (cachedNamesIter.hasNext()) {
+        String messageName = cachedNamesIter.next();
+        if (!messageNames.contains(messageName)) {
+          cachedNamesIter.remove();
+        }
+      }
+      long purgeEnd = System.currentTimeMillis();
+      purgeSum += purgeEnd - purgeStart;
+
+      // get the keys for the new messages
+      for (String messageName : messageNames) {
+        if (!cachedMap.containsKey(messageName)) {
+          newMessageKeys.add(keyBuilder.message(instanceName, messageName));
+        }
+      }
+    }
+
+    // get the new messages
+    if (newMessageKeys.size() > 0) {
+      List<Message> newMessages = accessor.getProperty(newMessageKeys);
+      for (Message message : newMessages) {
+        if (message != null) {
+          Map<String, Message> cachedMap = _messageCache.get(message.getTgtName());
+          cachedMap.put(message.getId(), message);
+        }
+      }
     }
     _messageMap = Collections.unmodifiableMap(msgMap);
+    LOG.debug("Purge took: " + purgeSum);
 
+    List<PropertyKey> currentStateKeys = Lists.newLinkedList();
     Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap =
         new HashMap<String, Map<String, Map<String, CurrentState>>>();
     for (String instanceName : _liveInstanceMap.keySet()) {
       LiveInstance liveInstance = _liveInstanceMap.get(instanceName);
-      String sessionId = liveInstance.getTypedSessionId().stringify();
-      if (!allCurStateMap.containsKey(instanceName)) {
-        allCurStateMap.put(instanceName, new HashMap<String, Map<String, CurrentState>>());
+      String sessionId = liveInstance.getSessionId();
+      List<String> currentStateNames =
+          accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
+      for (String currentStateName : currentStateNames) {
+        currentStateKeys.add(keyBuilder.currentState(instanceName, sessionId, currentStateName));
+      }
+
+      // ensure an empty current state map for all live instances and sessions
+      Map<String, Map<String, CurrentState>> instanceCurStateMap = allCurStateMap.get(instanceName);
+      if (instanceCurStateMap == null) {
+        instanceCurStateMap = Maps.newHashMap();
+        allCurStateMap.put(instanceName, instanceCurStateMap);
+      }
+      Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(sessionId);
+      if (sessionCurStateMap == null) {
+        sessionCurStateMap = Maps.newHashMap();
+        instanceCurStateMap.put(sessionId, sessionCurStateMap);
+      }
+    }
+    List<CurrentState> currentStates = accessor.getProperty(currentStateKeys);
+    Iterator<PropertyKey> csKeyIter = currentStateKeys.iterator();
+    for (CurrentState currentState : currentStates) {
+      PropertyKey key = csKeyIter.next();
+      String[] params = key.getParams();
+      if (currentState != null && params.length >= 4) {
+        Map<String, Map<String, CurrentState>> instanceCurStateMap = allCurStateMap.get(params[1]);
+        Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(params[2]);
+        sessionCurStateMap.put(params[3], currentState);
       }
-      Map<String, Map<String, CurrentState>> curStateMap = allCurStateMap.get(instanceName);
-      Map<String, CurrentState> map =
-          accessor.getChildValuesMap(keyBuilder.currentStates(instanceName, sessionId));
-      curStateMap.put(sessionId, map);
     }
 
     for (String instance : allCurStateMap.keySet()) {
@@ -106,10 +205,63 @@ public class ClusterDataCache {
     }
     _currentStateMap = Collections.unmodifiableMap(allCurStateMap);
 
+    // New in 0.7: Read more information for the benefit of user-defined rebalancers
+    _resourceConfigMap = accessor.getChildValuesMap(keyBuilder.resourceConfigs());
+    _controllerContextMap = accessor.getChildValuesMap(keyBuilder.controllerContexts());
+
+    // Read all single properties together
+    List<HelixProperty> singleProperties =
+        accessor.getProperty(ImmutableList.of(keyBuilder.clusterConfig(),
+            keyBuilder.controllerLeader(), keyBuilder.pause()));
+    _clusterConfig = (ClusterConfiguration) singleProperties.get(0);
+    if (_clusterConfig != null) {
+      _idealStateRuleMap = _clusterConfig.getIdealStateRules();
+    } else {
+      _idealStateRuleMap = Collections.emptyMap();
+    }
+    _leader = (LiveInstance) singleProperties.get(1);
+    _pause = (PauseSignal) singleProperties.get(2);
+
+    long endTime = System.currentTimeMillis();
+    LOG.info("END: ClusterDataCache.refresh(), took " + (endTime - startTime) + " ms");
+
+    if (LOG.isDebugEnabled()) {
+      int numPaths =
+          _liveInstanceMap.size() + _idealStateMap.size() + +_resourceConfigMap.size()
+              + _stateModelDefMap.size() + _instanceConfigMap.size() + _constraintMap.size()
+              + _controllerContextMap.size() + newMessageKeys.size() + currentStateKeys.size();
+      LOG.debug("Paths read: " + numPaths);
+    }
+
+    _init = false;
     return true;
   }
 
   /**
+   * Get the live instance associated with the controller leader
+   * @return LiveInstance
+   */
+  public LiveInstance getLeader() {
+    return _leader;
+  }
+
+  /**
+   * Get the pause signal (if any)
+   * @return PauseSignal
+   */
+  public PauseSignal getPauseSignal() {
+    return _pause;
+  }
+
+  /**
+   * Retrieves the configs for all resources
+   * @return
+   */
+  public Map<String, ResourceConfiguration> getResourceConfigs() {
+    return _resourceConfigMap;
+  }
+
+  /**
    * Retrieves the idealstates for all resources
    * @return
    */
@@ -117,6 +269,18 @@ public class ClusterDataCache {
     return _idealStateMap;
   }
 
+  public synchronized void setIdealStates(List<IdealState> idealStates) {
+    Map<String, IdealState> idealStateMap = Maps.newHashMap();
+    for (IdealState idealState : idealStates) {
+      idealStateMap.put(idealState.getId(), idealState);
+    }
+    _idealStateCacheMap = idealStateMap;
+  }
+
+  public Map<String, Map<String, String>> getIdealStateRules() {
+    return _idealStateRuleMap;
+  }
+
   /**
    * Returns the LiveInstances for each of the instances that are curretnly up and running
    * @return
@@ -125,6 +289,14 @@ public class ClusterDataCache {
     return _liveInstanceMap;
   }
 
+  public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
+    Map<String, LiveInstance> liveInstanceMap = Maps.newHashMap();
+    for (LiveInstance liveInstance : liveInstances) {
+      liveInstanceMap.put(liveInstance.getId(), liveInstance);
+    }
+    _liveInstanceCacheMap = liveInstanceMap;
+  }
+
   /**
    * Provides the current state of the node for a given session id,
    * the sessionid can be got from LiveInstance
@@ -133,6 +305,10 @@ public class ClusterDataCache {
    * @return
    */
   public Map<String, CurrentState> getCurrentState(String instanceName, String clientSessionId) {
+    if (!_currentStateMap.containsKey(instanceName)
+        || !_currentStateMap.get(instanceName).containsKey(clientSessionId)) {
+      return Collections.emptyMap();
+    }
     return _currentStateMap.get(instanceName).get(clientSessionId);
   }
 
@@ -150,6 +326,14 @@ public class ClusterDataCache {
     }
   }
 
+  /**
+   * Provides all outstanding messages
+   * @return
+   */
+  public Map<String, Map<String, Message>> getMessageMap() {
+    return _messageMap;
+  }
+
   // public HealthStat getGlobalStats()
   // {
   // return _globalStats;
@@ -187,11 +371,18 @@ public class ClusterDataCache {
    * @return
    */
   public StateModelDefinition getStateModelDef(String stateModelDefRef) {
-
     return _stateModelDefMap.get(stateModelDefRef);
   }
 
   /**
+   * Get all state model definitions
+   * @return map of name to state model definition
+   */
+  public Map<String, StateModelDefinition> getStateModelDefMap() {
+    return _stateModelDefMap;
+  }
+
+  /**
    * Provides the idealstate for a given resource
    * @param resourceName
    * @return
@@ -208,6 +399,14 @@ public class ClusterDataCache {
     return _instanceConfigMap;
   }
 
+  public synchronized void setInstanceConfigs(List<InstanceConfig> instanceConfigs) {
+    Map<String, InstanceConfig> instanceConfigMap = Maps.newHashMap();
+    for (InstanceConfig instanceConfig : instanceConfigs) {
+      instanceConfigMap.put(instanceConfig.getId(), instanceConfig);
+    }
+    _instanceConfigCacheMap = instanceConfigMap;
+  }
+
   /**
    * Some partitions might be disabled on specific nodes.
    * This method allows one to fetch the set of nodes where a given partition is disabled
@@ -266,6 +465,55 @@ public class ClusterDataCache {
     return null;
   }
 
+  public Map<String, ClusterConstraints> getConstraintMap() {
+    return _constraintMap;
+  }
+
+  public Map<String, ControllerContextHolder> getContextMap() {
+    return _controllerContextMap;
+  }
+
+  public ClusterConfiguration getClusterConfig() {
+    return _clusterConfig;
+  }
+
+  public void cacheMessages(List<Message> messages) {
+    for (Message message : messages) {
+      String instanceName = message.getTgtName();
+      Map<String, Message> instMsgMap = null;
+      if (_messageCache.containsKey(instanceName)) {
+        instMsgMap = _messageCache.get(instanceName);
+      } else {
+        instMsgMap = Maps.newHashMap();
+        _messageCache.put(instanceName, instMsgMap);
+      }
+      instMsgMap.put(message.getId(), message);
+    }
+  }
+
+  /**
+   * Enable or disable writing resource assignments
+   * @param enable true to enable, false to disable
+   */
+  public void setAssignmentWritePolicy(boolean enable) {
+    _writeAssignments = enable;
+  }
+
+  /**
+   * Check if writing resource assignments is enabled
+   * @return true if enabled, false if disabled
+   */
+  public boolean assignmentWriteEnabled() {
+    return _writeAssignments;
+  }
+
+  /**
+   * Indicate that a full read should be done on the next refresh
+   */
+  public synchronized void requireFullRefresh() {
+    _init = true;
+  }
+
   /**
    * toString method to print the entire cluster state
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
index 532ecb5..15264ca 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
@@ -40,10 +40,10 @@ public class CompatibilityCheckStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
     HelixManager manager = event.getAttribute("helixmanager");
-    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Cluster cluster = event.getAttribute("Cluster");
     if (manager == null || cluster == null) {
       throw new StageException("Missing attributes in event:" + event
-          + ". Requires HelixManager | DataCache");
+          + ". Requires HelixManager | Cluster");
     }
 
     HelixManagerProperties properties = manager.getProperties();

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 8235173..64bf792 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -47,13 +47,13 @@ import org.apache.helix.model.Message.MessageType;
 public class CurrentStateComputationStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
-    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Cluster cluster = event.getAttribute("Cluster");
     Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
 
     if (cluster == null || resourceMap == null) {
       throw new StageException("Missing attributes in event:" + event
-          + ". Requires DataCache|RESOURCE");
+          + ". Requires Cluster|RESOURCE");
     }
 
     ResourceCurrentState currentStateOutput = new ResourceCurrentState();

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index e8e42bf..a46acbd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -65,11 +65,11 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
     HelixManager manager = event.getAttribute("helixmanager");
     Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
-    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Cluster cluster = event.getAttribute("Cluster");
 
     if (manager == null || resourceMap == null || cluster == null) {
       throw new StageException("Missing attributes in event:" + event
-          + ". Requires ClusterManager|RESOURCES|DataCache");
+          + ". Requires ClusterManager|RESOURCES|Cluster");
     }
 
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();