You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/02/20 19:42:00 UTC

[2/2] git commit: [HELIX-345] Speed up the controller pipeline

[HELIX-345] Speed up the controller pipeline


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/51329f6f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/51329f6f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/51329f6f

Branch: refs/heads/master
Commit: 51329f6f0bbb8a43cdfd06ffd3bf7ac5c8a93c68
Parents: c8a644f
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Feb 4 10:50:11 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Feb 20 10:41:46 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/ConfigAccessor.java   |   9 +-
 .../main/java/org/apache/helix/api/Cluster.java |  27 +-
 .../helix/api/accessor/ClusterAccessor.java     | 268 ++++++++++++------
 .../helix/api/accessor/ResourceAccessor.java    |  42 ++-
 .../apache/helix/api/config/ClusterConfig.java  | 185 +-----------
 .../controller/GenericHelixController.java      |  40 +++
 .../rebalancer/FullAutoRebalancer.java          |  16 +-
 .../controller/rebalancer/RebalancerRef.java    |  27 +-
 .../config/CustomRebalancerConfig.java          |   7 +-
 .../config/FullAutoRebalancerConfig.java        |   7 +-
 .../config/PartitionedRebalancerConfig.java     |  57 +++-
 .../config/RebalancerConfigHolder.java          |   2 +-
 .../config/SemiAutoRebalancerConfig.java        |   7 +-
 .../stages/BestPossibleStateCalcStage.java      |  46 ++-
 .../controller/stages/ClusterDataCache.java     | 282 +++++++++++++++++--
 .../stages/CompatibilityCheckStage.java         |   4 +-
 .../stages/CurrentStateComputationStage.java    |   4 +-
 .../stages/ExternalViewComputeStage.java        |   4 +-
 .../stages/MessageGenerationStage.java          |   4 +-
 .../stages/MessageSelectionStage.java           |   7 +-
 .../controller/stages/MessageThrottleStage.java |   4 +-
 .../stages/PersistAssignmentStage.java          |  57 +++-
 .../controller/stages/PersistContextStage.java  |  11 +-
 .../controller/stages/ReadClusterDataStage.java |  14 +-
 .../stages/ResourceComputationStage.java        |   2 +-
 .../stages/ResourceValidationStage.java         |   4 +-
 .../controller/stages/TaskAssignmentStage.java  |  12 +-
 .../strategy/AutoRebalanceStrategy.java         | 186 ++++++------
 .../helix/manager/zk/ZKHelixDataAccessor.java   |   5 +
 .../java/org/apache/helix/model/IdealState.java |  83 +++++-
 .../apache/helix/model/ResourceAssignment.java  |   2 +-
 .../helix/model/ResourceConfiguration.java      |  10 +
 .../org/apache/helix/task/TaskRebalancer.java   |  12 +-
 .../java/org/apache/helix/task/TaskUtil.java    |  23 +-
 .../tools/ClusterExternalViewVerifier.java      |   7 +-
 .../helix/tools/ClusterStateVerifier.java       |   2 +-
 .../apache/helix/tools/YAMLClusterSetup.java    |   9 +
 .../src/test/java/org/apache/helix/Mocks.java   |   2 +-
 .../org/apache/helix/api/TestNewStages.java     |   2 +-
 .../stages/TestRebalancePipeline.java           |  17 +-
 .../stages/TestResourceValidationStage.java     |   6 +-
 .../TestReelectedPipelineCorrectness.java       | 151 ++++++++++
 .../TestUserDefRebalancerCompatibility.java     |  44 +--
 .../TestClusterStatusMonitorLifecycle.java      |  20 +-
 44 files changed, 1188 insertions(+), 542 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index f46e537..3589165 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.manager.zk.ZKUtil;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ConfigScope;
@@ -489,7 +490,13 @@ public class ConfigAccessor {
     List<String> retKeys = null;
 
     if (scope.isFullKey()) {
-      ZNRecord record = zkClient.readData(zkPath);
+      ZNRecord record;
+      try {
+        record = zkClient.readData(zkPath);
+      } catch (ZkNoNodeException e) {
+        LOG.warn(zkPath + " no longer exists");
+        return Collections.emptyList();
+      }
       if (mapKey == null) {
         retKeys = new ArrayList<String>(record.getSimpleFields().keySet());
       } else {

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index 98072d1..adaf200 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -34,10 +34,8 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SpectatorId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.context.ControllerContext;
-import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.PersistentStats;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.Transition;
 
@@ -91,8 +89,6 @@ public class Cluster {
    * @param constraintMap
    * @param stateModelMap
    * @param contextMap
-   * @param stats
-   * @param alerts
    * @param userConfig
    * @param isPaused
    * @param autoJoinAllowed
@@ -101,8 +97,8 @@ public class Cluster {
       Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
       ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap,
       Map<StateModelDefId, StateModelDefinition> stateModelMap,
-      Map<ContextId, ControllerContext> contextMap, PersistentStats stats, Alerts alerts,
-      UserConfig userConfig, boolean isPaused, boolean autoJoinAllowed) {
+      Map<ContextId, ControllerContext> contextMap, UserConfig userConfig, boolean isPaused,
+      boolean autoJoinAllowed) {
 
     // build the config
     // Guava's transform and "copy" functions really return views so the maps all reflect each other
@@ -124,8 +120,7 @@ public class Cluster {
         new ClusterConfig.Builder(id).addResources(resourceConfigMap.values())
             .addParticipants(participantConfigMap.values()).addConstraints(constraintMap.values())
             .addStateModelDefinitions(stateModelMap.values()).pausedStatus(isPaused)
-            .userConfig(userConfig).autoJoin(autoJoinAllowed).addStats(stats).addAlerts(alerts)
-            .build();
+            .userConfig(userConfig).autoJoin(autoJoinAllowed).build();
 
     _resourceMap = ImmutableMap.copyOf(resourceMap);
 
@@ -240,22 +235,6 @@ public class Cluster {
   }
 
   /**
-   * Get all the persisted stats for the cluster
-   * @return PersistentStats instance
-   */
-  public PersistentStats getStats() {
-    return _config.getStats();
-  }
-
-  /**
-   * Get all the persisted alerts for the cluster
-   * @return Alerts instance
-   */
-  public Alerts getAlerts() {
-    return _config.getAlerts();
-  }
-
-  /**
    * Get user-specified configuration properties of this cluster
    * @return UserConfig properties
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index abb3e49..5ecc210 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -19,6 +19,7 @@ package org.apache.helix.api.accessor;
  * under the License.
  */
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -54,9 +55,11 @@ import org.apache.helix.api.id.SessionId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.context.ControllerContext;
 import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
 import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
+import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.manager.zk.ZKUtil;
 import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConfiguration;
@@ -86,15 +89,27 @@ public class ClusterAccessor {
   private final PropertyKey.Builder _keyBuilder;
   private final ClusterId _clusterId;
 
+  private final ClusterDataCache _cache;
+
   /**
    * Instantiate a cluster accessor
    * @param clusterId the cluster to access
    * @param accessor HelixDataAccessor for the physical store
    */
   public ClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
+    this(clusterId, accessor, new ClusterDataCache());
+  }
+
+  /**
+   * Instantiate a cluster accessor
+   * @param clusterId the cluster to access
+   * @param accessor HelixDataAccessor for the physical store
+   */
+  public ClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor, ClusterDataCache cache) {
     _accessor = accessor;
     _keyBuilder = accessor.keyBuilder();
     _clusterId = clusterId;
+    _cache = cache;
   }
 
   /**
@@ -129,9 +144,6 @@ public class ClusterAccessor {
     if (cluster.autoJoinAllowed()) {
       clusterConfig.setAutoJoinAllowed(cluster.autoJoinAllowed());
     }
-    if (cluster.getStats() != null && !cluster.getStats().getMapFields().isEmpty()) {
-      _accessor.setProperty(_keyBuilder.persistantStat(), cluster.getStats());
-    }
     if (cluster.isPaused()) {
       pauseCluster();
     }
@@ -173,16 +185,6 @@ public class ClusterAccessor {
       ClusterConstraints constraint = constraints.get(type);
       _accessor.setProperty(_keyBuilder.constraint(type.toString()), constraint);
     }
-    if (config.getStats() == null || config.getStats().getMapFields().isEmpty()) {
-      _accessor.removeProperty(_keyBuilder.persistantStat());
-    } else {
-      _accessor.setProperty(_keyBuilder.persistantStat(), config.getStats());
-    }
-    if (config.getAlerts() == null || config.getAlerts().getMapFields().isEmpty()) {
-      _accessor.removeProperty(_keyBuilder.alerts());
-    } else {
-      _accessor.setProperty(_keyBuilder.alerts(), config.getAlerts());
-    }
     return true;
   }
 
@@ -218,19 +220,22 @@ public class ClusterAccessor {
       LOG.error("Cluster is not fully set up");
       return null;
     }
-    LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+
+    // refresh the cache
+    _cache.refresh(_accessor);
+
+    LiveInstance leader = _cache.getLeader();
 
     /**
      * map of constraint-type to constraints
      */
-    Map<String, ClusterConstraints> constraintMap =
-        _accessor.getChildValuesMap(_keyBuilder.constraints());
+    Map<String, ClusterConstraints> constraintMap = _cache.getConstraintMap();
 
     // read all the resources
-    Map<ResourceId, Resource> resourceMap = readResources();
+    Map<ResourceId, Resource> resourceMap = readResources(true);
 
     // read all the participants
-    Map<ParticipantId, Participant> participantMap = readParticipants();
+    Map<ParticipantId, Participant> participantMap = readParticipants(true);
 
     // read the controllers
     Map<ControllerId, Controller> controllerMap = new HashMap<ControllerId, Controller>();
@@ -249,10 +254,10 @@ public class ClusterAccessor {
     }
 
     // read the pause status
-    PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
+    PauseSignal pauseSignal = _cache.getPauseSignal();
     boolean isPaused = pauseSignal != null;
 
-    ClusterConfiguration clusterConfig = _accessor.getProperty(_keyBuilder.clusterConfig());
+    ClusterConfiguration clusterConfig = _cache.getClusterConfig();
     boolean autoJoinAllowed = false;
     UserConfig userConfig;
     if (clusterConfig != null) {
@@ -263,21 +268,14 @@ public class ClusterAccessor {
     }
 
     // read the state model definitions
-    Map<StateModelDefId, StateModelDefinition> stateModelMap = readStateModelDefinitions();
-
-    // read the stats
-    PersistentStats stats = _accessor.getProperty(_keyBuilder.persistantStat());
-
-    // read the alerts
-    Alerts alerts = _accessor.getProperty(_keyBuilder.alerts());
+    Map<StateModelDefId, StateModelDefinition> stateModelMap = readStateModelDefinitions(true);
 
     // read controller context
-    Map<ContextId, ControllerContext> contextMap = readControllerContext();
+    Map<ContextId, ControllerContext> contextMap = readControllerContext(true);
 
     // create the cluster snapshot object
     return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
-        clusterConstraintMap, stateModelMap, contextMap, stats, alerts, userConfig, isPaused,
-        autoJoinAllowed);
+        clusterConstraintMap, stateModelMap, contextMap, userConfig, isPaused, autoJoinAllowed);
   }
 
   /**
@@ -285,9 +283,22 @@ public class ClusterAccessor {
    * @return map of state model def id to state model definition
    */
   public Map<StateModelDefId, StateModelDefinition> readStateModelDefinitions() {
+    return readStateModelDefinitions(false);
+  }
+
+  /**
+   * Get all the state model definitions for this cluster
+   * @param useCache Use the ClusterDataCache associated with this class rather than reading again
+   * @return map of state model def id to state model definition
+   */
+  private Map<StateModelDefId, StateModelDefinition> readStateModelDefinitions(boolean useCache) {
     Map<StateModelDefId, StateModelDefinition> stateModelDefs = Maps.newHashMap();
-    List<StateModelDefinition> stateModelList =
-        _accessor.getChildValues(_keyBuilder.stateModelDefs());
+    Collection<StateModelDefinition> stateModelList;
+    if (useCache) {
+      stateModelList = _cache.getStateModelDefMap().values();
+    } else {
+      stateModelList = _accessor.getChildValues(_keyBuilder.stateModelDefs());
+    }
     for (StateModelDefinition stateModelDef : stateModelList) {
       stateModelDefs.put(stateModelDef.getStateModelDefId(), stateModelDef);
     }
@@ -299,35 +310,70 @@ public class ClusterAccessor {
    * @return map of resource id to resource
    */
   public Map<ResourceId, Resource> readResources() {
-    if (!isClusterStructureValid()) {
+    return readResources(false);
+  }
+
+  /**
+   * Read all resources in the cluster
+   * @param useCache Use the ClusterDataCache associated with this class rather than reading again
+   * @return map of resource id to resource
+   */
+  private Map<ResourceId, Resource> readResources(boolean useCache) {
+    if (!useCache && !isClusterStructureValid()) {
       LOG.error("Cluster is not fully set up yet!");
       return Collections.emptyMap();
     }
 
-    /**
-     * map of resource-id to ideal-state
-     */
-    Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
-
-    /**
-     * Map of resource id to external view
-     */
-    Map<String, ExternalView> externalViewMap =
-        _accessor.getChildValuesMap(_keyBuilder.externalViews());
+    Map<String, IdealState> idealStateMap;
+    Map<String, ResourceConfiguration> resourceConfigMap;
+    Map<String, ExternalView> externalViewMap;
+    Map<String, ResourceAssignment> resourceAssignmentMap;
+    if (useCache) {
+      idealStateMap = _cache.getIdealStates();
+      resourceConfigMap = _cache.getResourceConfigs();
+    } else {
+      idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
+      resourceConfigMap = _accessor.getChildValuesMap(_keyBuilder.resourceConfigs());
+    }
 
-    /**
-     * Map of resource id to user configuration
-     */
-    Map<String, ResourceConfiguration> resourceConfigMap =
-        _accessor.getChildValuesMap(_keyBuilder.resourceConfigs());
+    // check if external view and resource assignment reads are required
+    boolean extraReadsRequired = false;
+    for (String resourceName : idealStateMap.keySet()) {
+      if (extraReadsRequired) {
+        break;
+      }
+      // a rebalancer can be user defined if it has that mode set, or has a different rebalancer
+      // class
+      IdealState idealState = idealStateMap.get(resourceName);
+      extraReadsRequired =
+          extraReadsRequired || (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED);
+      RebalancerRef ref = idealState.getRebalancerRef();
+      if (ref != null) {
+        extraReadsRequired =
+            extraReadsRequired
+                || !PartitionedRebalancerConfig.isBuiltinRebalancer(ref.getRebalancerClass());
+      }
+    }
+    for (String resourceName : resourceConfigMap.keySet()) {
+      if (extraReadsRequired) {
+        break;
+      }
+      extraReadsRequired =
+          extraReadsRequired || resourceConfigMap.get(resourceName).hasRebalancerConfig();
+    }
 
-    /**
-     * Map of resource id to resource assignment
-     */
-    Map<String, ResourceAssignment> resourceAssignmentMap =
-        _accessor.getChildValuesMap(_keyBuilder.resourceAssignments());
+    // now read external view and resource assignments if needed
+    if (!useCache || extraReadsRequired) {
+      externalViewMap = _accessor.getChildValuesMap(_keyBuilder.externalViews());
+      resourceAssignmentMap = _accessor.getChildValuesMap(_keyBuilder.resourceAssignments());
+      _cache.setAssignmentWritePolicy(true);
+    } else {
+      externalViewMap = Maps.newHashMap();
+      resourceAssignmentMap = Maps.newHashMap();
+      _cache.setAssignmentWritePolicy(false);
+    }
 
-    // read all the resources
+    // populate all the resources
     Set<String> allResources = Sets.newHashSet();
     allResources.addAll(idealStateMap.keySet());
     allResources.addAll(resourceConfigMap.keySet());
@@ -347,45 +393,58 @@ public class ClusterAccessor {
    * @return map of participant id to participant, or empty map
    */
   public Map<ParticipantId, Participant> readParticipants() {
-    if (!isClusterStructureValid()) {
+    return readParticipants(false);
+  }
+
+  /**
+   * Read all participants in the cluster
+   * @param useCache Use the ClusterDataCache associated with this class rather than reading again
+   * @return map of participant id to participant, or empty map
+   */
+  private Map<ParticipantId, Participant> readParticipants(boolean useCache) {
+    if (!useCache && !isClusterStructureValid()) {
       LOG.error("Cluster is not fully set up yet!");
       return Collections.emptyMap();
     }
-
-    /**
-     * map of instance-id to instance-config
-     */
-    Map<String, InstanceConfig> instanceConfigMap =
-        _accessor.getChildValuesMap(_keyBuilder.instanceConfigs());
-
-    /**
-     * map of instance-id to live-instance
-     */
-    Map<String, LiveInstance> liveInstanceMap =
-        _accessor.getChildValuesMap(_keyBuilder.liveInstances());
+    Map<String, InstanceConfig> instanceConfigMap;
+    Map<String, LiveInstance> liveInstanceMap;
+    if (useCache) {
+      instanceConfigMap = _cache.getInstanceConfigMap();
+      liveInstanceMap = _cache.getLiveInstances();
+    } else {
+      instanceConfigMap = _accessor.getChildValuesMap(_keyBuilder.instanceConfigs());
+      liveInstanceMap = _accessor.getChildValuesMap(_keyBuilder.liveInstances());
+    }
 
     /**
      * map of participant-id to map of message-id to message
      */
-    Map<String, Map<String, Message>> messageMap = new HashMap<String, Map<String, Message>>();
-    for (String instanceName : liveInstanceMap.keySet()) {
-      Map<String, Message> instanceMsgMap =
-          _accessor.getChildValuesMap(_keyBuilder.messages(instanceName));
-      messageMap.put(instanceName, instanceMsgMap);
+    Map<String, Map<String, Message>> messageMap = Maps.newHashMap();
+    for (String participantName : liveInstanceMap.keySet()) {
+      Map<String, Message> instanceMsgMap;
+      if (useCache) {
+        instanceMsgMap = _cache.getMessages(participantName);
+      } else {
+        instanceMsgMap = _accessor.getChildValuesMap(_keyBuilder.messages(participantName));
+      }
+      messageMap.put(participantName, instanceMsgMap);
     }
 
     /**
      * map of participant-id to map of resource-id to current-state
      */
-    Map<String, Map<String, CurrentState>> currentStateMap =
-        new HashMap<String, Map<String, CurrentState>>();
+    Map<String, Map<String, CurrentState>> currentStateMap = Maps.newHashMap();
     for (String participantName : liveInstanceMap.keySet()) {
       LiveInstance liveInstance = liveInstanceMap.get(participantName);
       SessionId sessionId = liveInstance.getTypedSessionId();
-      Map<String, CurrentState> instanceCurStateMap =
-          _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
-              sessionId.stringify()));
-
+      Map<String, CurrentState> instanceCurStateMap;
+      if (useCache) {
+        instanceCurStateMap = _cache.getCurrentState(participantName, sessionId.stringify());
+      } else {
+        instanceCurStateMap =
+            _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
+                sessionId.stringify()));
+      }
       currentStateMap.put(participantName, instanceCurStateMap);
     }
 
@@ -472,8 +531,21 @@ public class ClusterAccessor {
    * @return map of context id to controller context
    */
   public Map<ContextId, ControllerContext> readControllerContext() {
-    Map<String, ControllerContextHolder> contextHolders =
-        _accessor.getChildValuesMap(_keyBuilder.controllerContexts());
+    return readControllerContext(false);
+  }
+
+  /**
+   * Read the persisted controller contexts
+   * @param useCache Use the ClusterDataCache associated with this class rather than reading again
+   * @return map of context id to controller context
+   */
+  private Map<ContextId, ControllerContext> readControllerContext(boolean useCache) {
+    Map<String, ControllerContextHolder> contextHolders;
+    if (useCache) {
+      contextHolders = _cache.getContextMap();
+    } else {
+      contextHolders = _accessor.getChildValuesMap(_keyBuilder.controllerContexts());
+    }
     Map<ContextId, ControllerContext> contexts = Maps.newHashMap();
     for (String contextName : contextHolders.keySet()) {
       contexts.put(ContextId.from(contextName), contextHolders.get(contextName).getContext());
@@ -482,6 +554,22 @@ public class ClusterAccessor {
   }
 
   /**
+   * Get the current cluster stats
+   * @return PersistentStats
+   */
+  public PersistentStats getStats() {
+    return _accessor.getProperty(_keyBuilder.persistantStat());
+  }
+
+  /**
+   * Get the current cluster alerts
+   * @return Alerts
+   */
+  public Alerts getAlerts() {
+    return _accessor.getProperty(_keyBuilder.alerts());
+  }
+
+  /**
    * Add a statistic specification to the cluster. Existing stat specifications will not be
    * overwritten
    * @param statName string representing a stat specification
@@ -673,14 +761,22 @@ public class ClusterAccessor {
       return false;
     }
 
+    // Create an IdealState from a RebalancerConfig (if the resource supports it)
+    IdealState idealState =
+        ResourceAccessor.rebalancerConfigToIdealState(resource.getRebalancerConfig(),
+            resource.getBucketSize(), resource.getBatchMessageMode());
+    if (idealState != null) {
+      _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
+    }
+
     // Add resource user config
     if (resource.getUserConfig() != null) {
       ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
       configuration.setType(resource.getType());
       configuration.addNamespacedConfig(resource.getUserConfig());
       PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
-      if (partitionedConfig == null
-          || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+      if (idealState == null
+          && (partitionedConfig == null || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED)) {
         // only persist if this is not easily convertible to an ideal state
         configuration
             .addNamespacedConfig(new RebalancerConfigHolder(resource.getRebalancerConfig())
@@ -688,14 +784,6 @@ public class ClusterAccessor {
       }
       _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
     }
-
-    // Create an IdealState from a RebalancerConfig (if the resource is partitioned)
-    IdealState idealState =
-        ResourceAccessor.rebalancerConfigToIdealState(resource.getRebalancerConfig(),
-            resource.getBucketSize(), resource.getBatchMessageMode());
-    if (idealState != null) {
-      _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
-    }
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index 73d43b0..a1d6580 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -39,6 +39,7 @@ import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
 import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
@@ -137,14 +138,19 @@ public class ResourceAccessor {
    */
   private boolean setConfiguration(ResourceId resourceId, ResourceConfiguration configuration,
       RebalancerConfig rebalancerConfig) {
-    boolean status =
-        _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+    boolean status = true;
+    if (configuration != null) {
+      status =
+          _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+    }
     // set an ideal state if the resource supports it
     IdealState idealState =
         rebalancerConfigToIdealState(rebalancerConfig, configuration.getBucketSize(),
             configuration.getBatchMessageMode());
     if (idealState != null) {
-      _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
+      status =
+          status
+              && _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
     }
     return status;
   }
@@ -253,7 +259,14 @@ public class ResourceAccessor {
     }
     ResourceId resourceId = resourceConfig.getId();
     ResourceConfiguration config = new ResourceConfiguration(resourceId);
-    config.addNamespacedConfig(resourceConfig.getUserConfig());
+    UserConfig userConfig = resourceConfig.getUserConfig();
+    if (userConfig != null
+        && (!userConfig.getSimpleFields().isEmpty() || !userConfig.getListFields().isEmpty() || !userConfig
+            .getMapFields().isEmpty())) {
+      config.addNamespacedConfig(userConfig);
+    } else {
+      userConfig = null;
+    }
     PartitionedRebalancerConfig partitionedConfig =
         PartitionedRebalancerConfig.from(resourceConfig.getRebalancerConfig());
     if (partitionedConfig == null
@@ -261,9 +274,11 @@ public class ResourceAccessor {
       // only persist if this is not easily convertible to an ideal state
       config.addNamespacedConfig(new RebalancerConfigHolder(resourceConfig.getRebalancerConfig())
           .toNamespacedConfig());
+      config.setBucketSize(resourceConfig.getBucketSize());
+      config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
+    } else if (userConfig == null) {
+      config = null;
     }
-    config.setBucketSize(resourceConfig.getBucketSize());
-    config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
     setConfiguration(resourceId, config, resourceConfig.getRebalancerConfig());
     return true;
   }
@@ -387,9 +402,17 @@ public class ResourceAccessor {
       boolean batchMessageMode) {
     PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
     if (partitionedConfig != null) {
+      if (!PartitionedRebalancerConfig.isBuiltinConfig(partitionedConfig.getClass())) {
+        // don't proceed if this resource cannot be described by an ideal state
+        return null;
+      }
       IdealState idealState = new IdealState(partitionedConfig.getResourceId());
       idealState.setRebalanceMode(partitionedConfig.getRebalanceMode());
-      idealState.setRebalancerRef(partitionedConfig.getRebalancerRef());
+
+      RebalancerRef ref = partitionedConfig.getRebalancerRef();
+      if (ref != null) {
+        idealState.setRebalancerRef(partitionedConfig.getRebalancerRef());
+      }
       String replicas = null;
       if (partitionedConfig.anyLiveParticipant()) {
         replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
@@ -404,13 +427,14 @@ public class ResourceAccessor {
       idealState.setStateModelFactoryId(partitionedConfig.getStateModelFactoryId());
       idealState.setBucketSize(bucketSize);
       idealState.setBatchMessageMode(batchMessageMode);
-      if (partitionedConfig.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+      idealState.setRebalancerConfigClass(config.getClass());
+      if (SemiAutoRebalancerConfig.class.equals(config.getClass())) {
         SemiAutoRebalancerConfig semiAutoConfig =
             BasicRebalancerConfig.convert(config, SemiAutoRebalancerConfig.class);
         for (PartitionId partitionId : semiAutoConfig.getPartitionSet()) {
           idealState.setPreferenceList(partitionId, semiAutoConfig.getPreferenceList(partitionId));
         }
-      } else if (partitionedConfig.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+      } else if (CustomRebalancerConfig.class.equals(config.getClass())) {
         CustomRebalancerConfig customConfig =
             BasicRebalancerConfig.convert(config, CustomRebalancerConfig.class);
         for (PartitionId partitionId : customConfig.getPartitionSet()) {

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
index 22a1528..ddc98fa 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
@@ -5,8 +5,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.helix.alerts.AlertsHolder;
-import org.apache.helix.alerts.StatsHolder;
 import org.apache.helix.api.Scope;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.ClusterId;
@@ -14,14 +12,12 @@ import org.apache.helix.api.id.ConstraintId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ClusterConstraints.ConstraintValue;
 import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.PersistentStats;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.Transition;
 import org.apache.helix.model.builder.ConstraintItemBuilder;
@@ -61,8 +57,6 @@ public class ClusterConfig {
   private final Map<ParticipantId, ParticipantConfig> _participantMap;
   private final Map<ConstraintType, ClusterConstraints> _constraintMap;
   private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
-  private final PersistentStats _stats;
-  private final Alerts _alerts;
   private final UserConfig _userConfig;
   private final boolean _isPaused;
   private final boolean _autoJoin;
@@ -74,8 +68,6 @@ public class ClusterConfig {
    * @param participantMap map of participant id to participant config
    * @param constraintMap map of constraint type to all constraints of that type
    * @param stateModelMap map of state model id to state model definition
-   * @param stats statistics to watch on the cluster
-   * @param alerts alerts that the cluster can trigger
    * @param userConfig user-defined cluster properties
    * @param isPaused true if paused, false if active
    * @param allowAutoJoin true if participants can join automatically, false otherwise
@@ -83,15 +75,13 @@ public class ClusterConfig {
   private ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
       Map<ParticipantId, ParticipantConfig> participantMap,
       Map<ConstraintType, ClusterConstraints> constraintMap,
-      Map<StateModelDefId, StateModelDefinition> stateModelMap, PersistentStats stats,
-      Alerts alerts, UserConfig userConfig, boolean isPaused, boolean allowAutoJoin) {
+      Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
+      boolean isPaused, boolean allowAutoJoin) {
     _id = id;
     _resourceMap = ImmutableMap.copyOf(resourceMap);
     _participantMap = ImmutableMap.copyOf(participantMap);
     _constraintMap = ImmutableMap.copyOf(constraintMap);
     _stateModelMap = ImmutableMap.copyOf(stateModelMap);
-    _stats = stats;
-    _alerts = alerts;
     _userConfig = userConfig;
     _isPaused = isPaused;
     _autoJoin = allowAutoJoin;
@@ -237,22 +227,6 @@ public class ClusterConfig {
   }
 
   /**
-   * Get all the statistics persisted on the cluster
-   * @return PersistentStats instance
-   */
-  public PersistentStats getStats() {
-    return _stats;
-  }
-
-  /**
-   * Get all the alerts persisted on the cluster
-   * @return Alerts instance
-   */
-  public Alerts getAlerts() {
-    return _alerts;
-  }
-
-  /**
    * Get user-specified configuration properties of this cluster
    * @return UserConfig properties
    */
@@ -287,8 +261,6 @@ public class ClusterConfig {
 
     private Set<Fields> _updateFields;
     private Map<ConstraintType, Set<ConstraintId>> _removedConstraints;
-    private PersistentStats _removedStats;
-    private Alerts _removedAlerts;
     private Builder _builder;
 
     /**
@@ -302,8 +274,6 @@ public class ClusterConfig {
         Set<ConstraintId> constraints = Sets.newHashSet();
         _removedConstraints.put(type, constraints);
       }
-      _removedStats = new PersistentStats(PersistentStats.nodeName);
-      _removedAlerts = new Alerts(Alerts.nodeName);
       _builder = new Builder(clusterId);
     }
 
@@ -431,57 +401,6 @@ public class ClusterConfig {
     }
 
     /**
-     * Add a statistic specification to the cluster. Existing specifications will not be overwritten
-     * @param stat string specifying the stat specification
-     * @return Delta
-     */
-    public Delta addStat(String stat) {
-      _builder.addStat(stat);
-      return this;
-    }
-
-    /**
-     * Add an alert specification for the cluster. Existing specifications will not be overwritten
-     * @param alert string specifying the alert specification
-     * @return Delta
-     */
-    public Delta addAlert(String alert) {
-      _builder.addAlert(alert);
-      return this;
-    }
-
-    /**
-     * Remove a statistic specification from the cluster
-     * @param stat statistic specification
-     * @return Delta
-     */
-    public Delta removeStat(String stat) {
-      Map<String, Map<String, String>> parsedStat = StatsHolder.parseStat(stat);
-      Map<String, Map<String, String>> currentStats = _removedStats.getMapFields();
-      for (String statName : parsedStat.keySet()) {
-        currentStats.put(statName, parsedStat.get(statName));
-      }
-      return this;
-    }
-
-    /**
-     * Remove an alert specification for the cluster
-     * @param alert alert specification
-     * @return Delta
-     */
-    public Delta removeAlert(String alert) {
-      Map<String, Map<String, String>> currAlertMap = _removedAlerts.getMapFields();
-      if (!currAlertMap.containsKey(alert)) {
-        Map<String, String> parsedAlert = Maps.newHashMap();
-        StringBuilder statsName = new StringBuilder();
-        AlertsHolder.parseAlert(alert, statsName, parsedAlert);
-        removeStat(statsName.toString());
-        currAlertMap.put(alert, parsedAlert);
-      }
-      return this;
-    }
-
-    /**
      * Create a ClusterConfig that is the combination of an existing ClusterConfig and this delta
      * @param orig the original ClusterConfig
      * @return updated ClusterConfig
@@ -494,8 +413,7 @@ public class ClusterConfig {
               .addParticipants(orig.getParticipantMap().values())
               .addStateModelDefinitions(orig.getStateModelMap().values())
               .userConfig(orig.getUserConfig()).pausedStatus(orig.isPaused())
-              .autoJoin(orig.autoJoinAllowed()).addStats(orig.getStats())
-              .addAlerts(orig.getAlerts());
+              .autoJoin(orig.autoJoinAllowed());
       for (Fields field : _updateFields) {
         switch (field) {
         case USER_CONFIG:
@@ -529,29 +447,8 @@ public class ClusterConfig {
         builder.addConstraint(constraints);
       }
 
-      // add stats and alerts
-      builder.addStats(deltaConfig.getStats());
-      builder.addAlerts(deltaConfig.getAlerts());
-
       // get the result
-      ClusterConfig result = builder.build();
-
-      // remove stats
-      PersistentStats stats = result.getStats();
-      for (String removedStat : _removedStats.getMapFields().keySet()) {
-        if (stats.getMapFields().containsKey(removedStat)) {
-          stats.getMapFields().remove(removedStat);
-        }
-      }
-
-      // remove alerts
-      Alerts alerts = result.getAlerts();
-      for (String removedAlert : _removedAlerts.getMapFields().keySet()) {
-        if (alerts.getMapFields().containsKey(removedAlert)) {
-          alerts.getMapFields().remove(removedAlert);
-        }
-      }
-      return result;
+      return builder.build();
     }
   }
 
@@ -565,8 +462,6 @@ public class ClusterConfig {
     private final Map<ConstraintType, ClusterConstraints> _constraintMap;
     private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
     private UserConfig _userConfig;
-    private PersistentStats _stats;
-    private Alerts _alerts;
     private boolean _isPaused;
     private boolean _autoJoin;
 
@@ -583,8 +478,6 @@ public class ClusterConfig {
       _isPaused = false;
       _autoJoin = false;
       _userConfig = new UserConfig(Scope.cluster(id));
-      _stats = new PersistentStats(PersistentStats.nodeName);
-      _alerts = new Alerts(Alerts.nodeName);
     }
 
     /**
@@ -789,74 +682,6 @@ public class ClusterConfig {
     }
 
     /**
-     * Add a statistic specification to the cluster. Existing specifications will not be overwritten
-     * @param stat String specifying the stat specification
-     * @return Builder
-     */
-    public Builder addStat(String stat) {
-      Map<String, Map<String, String>> parsedStat = StatsHolder.parseStat(stat);
-      Map<String, Map<String, String>> currentStats = _stats.getMapFields();
-      for (String statName : parsedStat.keySet()) {
-        if (!currentStats.containsKey(statName)) {
-          currentStats.put(statName, parsedStat.get(statName));
-        }
-      }
-      return this;
-    }
-
-    /**
-     * Add statistic specifications to the cluster. Existing specifications will not be overwritten
-     * @param stats PersistentStats specifying the stat specification
-     * @return Builder
-     */
-    public Builder addStats(PersistentStats stats) {
-      if (stats == null) {
-        return this;
-      }
-      Map<String, Map<String, String>> parsedStat = stats.getMapFields();
-      Map<String, Map<String, String>> currentStats = _stats.getMapFields();
-      for (String statName : parsedStat.keySet()) {
-        if (!currentStats.containsKey(statName)) {
-          currentStats.put(statName, parsedStat.get(statName));
-        }
-      }
-      return this;
-    }
-
-    /**
-     * Add alert specifications to the cluster. Existing specifications will not be overwritten
-     * @param alert string representing alert specifications
-     * @return Builder
-     */
-    public Builder addAlert(String alert) {
-      Map<String, Map<String, String>> currAlertMap = _alerts.getMapFields();
-      if (!currAlertMap.containsKey(alert)) {
-        Map<String, String> parsedAlert = Maps.newHashMap();
-        StringBuilder statsName = new StringBuilder();
-        AlertsHolder.parseAlert(alert, statsName, parsedAlert);
-        addStat(statsName.toString());
-        currAlertMap.put(alert, parsedAlert);
-      }
-      return this;
-    }
-
-    /**
-     * Add alert specifications to the cluster. Existing specifications will not be overwritten
-     * @param alerts Alerts instance
-     * @return Builder
-     */
-    public Builder addAlerts(Alerts alerts) {
-      if (alerts == null) {
-        return this;
-      }
-      Map<String, Map<String, String>> alertMap = alerts.getMapFields();
-      for (String alert : alertMap.keySet()) {
-        addAlert(alert);
-      }
-      return this;
-    }
-
-    /**
      * Set the paused status of the cluster
      * @param isPaused true if paused, false otherwise
      * @return Builder
@@ -892,7 +717,7 @@ public class ClusterConfig {
      */
     public ClusterConfig build() {
       return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _stateModelMap,
-          _stats, _alerts, _userConfig, _isPaused, _autoJoin);
+          _userConfig, _isPaused, _autoJoin);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 2b2a71e..b11ab9c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -48,6 +48,7 @@ import org.apache.helix.api.id.SessionId;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.PipelineRegistry;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ClusterEventBlockingQueue;
 import org.apache.helix.controller.stages.CompatibilityCheckStage;
@@ -121,6 +122,11 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   int _timerPeriod = Integer.MAX_VALUE;
 
   /**
+   * A cache maintained across pipelines
+   */
+  private ClusterDataCache _cache;
+
+  /**
    * Default constructor that creates a default pipeline registry. This is sufficient in
    * most cases, but if there is a some thing specific needed use another constructor
    * where in you can pass a pipeline registry
@@ -138,6 +144,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
 
     @Override
     public void run() {
+      _cache.requireFullRefresh();
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.CALLBACK);
       ClusterEvent event = new ClusterEvent("periodicalRebalance");
@@ -228,6 +235,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     _registry = registry;
     _lastSeenInstances = new AtomicReference<Map<String, LiveInstance>>();
     _lastSeenSessions = new AtomicReference<Map<String, LiveInstance>>();
+    _cache = new ClusterDataCache();
     _eventQueue = new ClusterEventBlockingQueue();
     _eventThread = new ClusterEventProcessor();
     _eventThread.start();
@@ -276,6 +284,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       }
     }
 
+    // add the cache
+    event.addAttribute("ClusterDataCache", _cache);
+
     List<Pipeline> pipelines = _registry.getPipelinesForEvent(event.getName());
     if (pipelines == null || pipelines.size() == 0) {
       logger.info("No pipeline to run for event:" + event.getName());
@@ -318,6 +329,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   public void onStateChange(String instanceName, List<CurrentState> statesInfo,
       NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onStateChange()");
+    if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+      _cache.requireFullRefresh();
+    }
     ClusterEvent event = new ClusterEvent("currentStateChange");
     event.addAttribute("helixmanager", changeContext.getManager());
     event.addAttribute("instanceName", instanceName);
@@ -341,6 +355,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   public void onMessage(String instanceName, List<Message> messages,
       NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onMessage()");
+    if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+      _cache.requireFullRefresh();
+    }
 
     ClusterEvent event = new ClusterEvent("messageChange");
     event.addAttribute("helixmanager", changeContext.getManager());
@@ -360,10 +377,15 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   public void onLiveInstanceChange(List<LiveInstance> liveInstances,
       NotificationContext changeContext) {
     logger.info("START: Generic GenericClusterController.onLiveInstanceChange()");
+    if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+      _cache.requireFullRefresh();
+    }
 
     if (liveInstances == null) {
       liveInstances = Collections.emptyList();
     }
+    _cache.setLiveInstances(liveInstances);
+
     // Go though the live instance list and make sure that we are observing them
     // accordingly. The action is done regardless of the paused flag.
     if (changeContext.getType() == NotificationContext.Type.INIT
@@ -403,6 +425,14 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   @Override
   public void onIdealStateChange(List<IdealState> idealStates, NotificationContext changeContext) {
     logger.info("START: Generic GenericClusterController.onIdealStateChange()");
+    if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+      _cache.requireFullRefresh();
+    }
+
+    if (idealStates == null) {
+      idealStates = Collections.emptyList();
+    }
+    _cache.setIdealStates(idealStates);
     ClusterEvent event = new ClusterEvent("idealStateChange");
     event.addAttribute("helixmanager", changeContext.getManager());
     event.addAttribute("changeContext", changeContext);
@@ -419,6 +449,15 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   @Override
   public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onConfigChange()");
+    if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+      _cache.requireFullRefresh();
+    }
+
+    if (configs == null) {
+      configs = Collections.emptyList();
+    }
+    _cache.setInstanceConfigs(configs);
+
     ClusterEvent event = new ClusterEvent("configChange");
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("helixmanager", changeContext.getManager());
@@ -438,6 +477,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   @Override
   public void onControllerChange(NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onControllerChange()");
+    _cache.requireFullRefresh();
     if (changeContext != null && changeContext.getType() == Type.FINALIZE) {
       logger.info("GenericClusterController.onControllerChange() FINALIZE");
       return;

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
index 0c55d45..13616b3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
@@ -113,8 +113,8 @@ public class FullAutoRebalancer implements HelixRebalancer {
       }
       if (!taggedLiveNodes.isEmpty()) {
         // live nodes exist that have this tag
-        if (LOG.isInfoEnabled()) {
-          LOG.info("found the following participants with tag " + config.getParticipantGroupTag()
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("found the following participants with tag " + config.getParticipantGroupTag()
               + " for " + config.getResourceId() + ": " + taggedLiveNodes);
         }
       } else if (taggedNodes.isEmpty()) {
@@ -132,12 +132,12 @@ public class FullAutoRebalancer implements HelixRebalancer {
 
     // determine which nodes the replicas should live on
     int maxPartition = config.getMaxPartitionsPerParticipant();
-    if (LOG.isInfoEnabled()) {
-      LOG.info("currentMapping: " + currentMapping);
-      LOG.info("stateCountMap: " + stateCountMap);
-      LOG.info("liveNodes: " + liveParticipantList);
-      LOG.info("allNodes: " + allParticipantList);
-      LOG.info("maxPartition: " + maxPartition);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("currentMapping: " + currentMapping);
+      LOG.debug("stateCountMap: " + stateCountMap);
+      LOG.debug("liveNodes: " + liveParticipantList);
+      LOG.debug("allNodes: " + allParticipantList);
+      LOG.debug("maxPartition: " + maxPartition);
     }
     ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
     _algorithm =

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
index 974222d..8439583 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
@@ -34,25 +34,46 @@ public class RebalancerRef {
   @JsonProperty("rebalancerClassName")
   private final String _rebalancerClassName;
 
+  @JsonIgnore
+  private Class<? extends HelixRebalancer> _class;
+
   @JsonCreator
   private RebalancerRef(@JsonProperty("rebalancerClassName") String rebalancerClassName) {
     _rebalancerClassName = rebalancerClassName;
+    _class = null;
   }
 
   /**
-   * Get an instantiated Rebalancer
-   * @return Rebalancer or null if instantiation failed
+   * Get an instantiated HelixRebalancer
+   * @return HelixRebalancer or null if instantiation failed
    */
   @JsonIgnore
   public HelixRebalancer getRebalancer() {
     try {
-      return (HelixRebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
+      return (HelixRebalancer) (getRebalancerClass().newInstance());
     } catch (Exception e) {
       LOG.warn("Exception while invoking custom rebalancer class:" + _rebalancerClassName, e);
     }
     return null;
   }
 
+  /**
+   * Get the class object of this rebalancer ref
+   * @return Class
+   */
+  @JsonIgnore
+  public Class<? extends HelixRebalancer> getRebalancerClass() {
+    try {
+      if (_class == null) {
+        _class =
+            HelixUtil.loadClass(getClass(), _rebalancerClassName).asSubclass(HelixRebalancer.class);
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception while loading rebalancer class:" + _rebalancerClassName, e);
+    }
+    return _class;
+  }
+
   @Override
   public String toString() {
     return _rebalancerClassName;

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
index 73c3ccc..a44b230 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
@@ -53,7 +53,12 @@ public class CustomRebalancerConfig extends PartitionedRebalancerConfig {
    * Instantiate a CustomRebalancerConfig
    */
   public CustomRebalancerConfig() {
-    setRebalanceMode(RebalanceMode.CUSTOMIZED);
+    if (getClass().equals(CustomRebalancerConfig.class)) {
+      // only mark this as customized mode if this specifc config is used
+      setRebalanceMode(RebalanceMode.CUSTOMIZED);
+    } else {
+      setRebalanceMode(RebalanceMode.USER_DEFINED);
+    }
     setRebalancerRef(RebalancerRef.from(CustomRebalancer.class));
     _preferenceMaps = Maps.newHashMap();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
index 828d509..16bb4cb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
@@ -30,7 +30,12 @@ import org.apache.helix.model.IdealState.RebalanceMode;
  */
 public class FullAutoRebalancerConfig extends PartitionedRebalancerConfig {
   public FullAutoRebalancerConfig() {
-    setRebalanceMode(RebalanceMode.FULL_AUTO);
+    if (getClass().equals(FullAutoRebalancerConfig.class)) {
+      // only mark this as full auto mode if this specifc config is used
+      setRebalanceMode(RebalanceMode.FULL_AUTO);
+    } else {
+      setRebalanceMode(RebalanceMode.USER_DEFINED);
+    }
     setRebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
index 2c9769d..dd661d9 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
@@ -10,14 +10,20 @@ import org.apache.helix.api.Partition;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.CustomRebalancer;
+import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
 import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.task.TaskRebalancer;
 import org.codehaus.jackson.annotate.JsonIgnore;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -51,6 +57,23 @@ public class PartitionedRebalancerConfig extends BasicRebalancerConfig implement
   private int _maxPartitionsPerParticipant;
   private RebalanceMode _rebalanceMode;
 
+  @JsonIgnore
+  private static final Set<Class<? extends RebalancerConfig>> BUILTIN_CONFIG_CLASSES = Sets
+      .newHashSet();
+  @JsonIgnore
+  private static final Set<Class<? extends HelixRebalancer>> BUILTIN_REBALANCER_CLASSES = Sets
+      .newHashSet();
+  static {
+    BUILTIN_CONFIG_CLASSES.add(PartitionedRebalancerConfig.class);
+    BUILTIN_CONFIG_CLASSES.add(FullAutoRebalancerConfig.class);
+    BUILTIN_CONFIG_CLASSES.add(SemiAutoRebalancerConfig.class);
+    BUILTIN_CONFIG_CLASSES.add(CustomRebalancerConfig.class);
+    BUILTIN_REBALANCER_CLASSES.add(FullAutoRebalancer.class);
+    BUILTIN_REBALANCER_CLASSES.add(SemiAutoRebalancer.class);
+    BUILTIN_REBALANCER_CLASSES.add(CustomRebalancer.class);
+    BUILTIN_REBALANCER_CLASSES.add(TaskRebalancer.class);
+  }
+
   /**
    * Instantiate a PartitionedRebalancerConfig
    */
@@ -186,13 +209,42 @@ public class PartitionedRebalancerConfig extends BasicRebalancerConfig implement
   }
 
   /**
+   * Check if the given class is compatible with an {@link IdealState}
+   * @param clazz the PartitionedRebalancerConfig subclass
+   * @return true if IdealState can be used to describe this config, false otherwise
+   */
+  public static boolean isBuiltinConfig(Class<? extends RebalancerConfig> clazz) {
+    return BUILTIN_CONFIG_CLASSES.contains(clazz);
+  }
+
+  /**
+   * Check if the given class is a built-in rebalancer class
+   * @param clazz the HelixRebalancer subclass
+   * @return true if the rebalancer class is built in, false otherwise
+   */
+  public static boolean isBuiltinRebalancer(Class<? extends HelixRebalancer> clazz) {
+    return BUILTIN_REBALANCER_CLASSES.contains(clazz);
+  }
+
+  /**
    * Convert a physically-stored IdealState into a rebalancer config for a partitioned resource
    * @param idealState populated IdealState
    * @return PartitionedRebalancerConfig
    */
   public static PartitionedRebalancerConfig from(IdealState idealState) {
     PartitionedRebalancerConfig config;
-    switch (idealState.getRebalanceMode()) {
+    RebalanceMode mode = idealState.getRebalanceMode();
+    if (mode == RebalanceMode.USER_DEFINED) {
+      Class<? extends RebalancerConfig> configClass = idealState.getRebalancerConfigClass();
+      if (configClass.equals(FullAutoRebalancerConfig.class)) {
+        mode = RebalanceMode.FULL_AUTO;
+      } else if (configClass.equals(SemiAutoRebalancerConfig.class)) {
+        mode = RebalanceMode.SEMI_AUTO;
+      } else if (configClass.equals(CustomRebalancerConfig.class)) {
+        mode = RebalanceMode.CUSTOMIZED;
+      }
+    }
+    switch (mode) {
     case FULL_AUTO:
       FullAutoRebalancerConfig.Builder fullAutoBuilder =
           new FullAutoRebalancerConfig.Builder(idealState.getResourceId());
@@ -252,7 +304,8 @@ public class PartitionedRebalancerConfig extends BasicRebalancerConfig implement
         .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
         .participantGroupTag(idealState.getInstanceGroupTag())
         .stateModelDefId(idealState.getStateModelDefId())
-        .stateModelFactoryId(idealState.getStateModelFactoryId());
+        .stateModelFactoryId(idealState.getStateModelFactoryId())
+        .rebalanceMode(idealState.getRebalanceMode());
     RebalancerRef rebalancerRef = idealState.getRebalancerRef();
     if (rebalancerRef != null) {
       builder.rebalancerRef(rebalancerRef);

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
index 8581732..d6ddb50 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
@@ -32,7 +32,7 @@ import org.apache.log4j.Logger;
  * information specific to each rebalancer.
  */
 public final class RebalancerConfigHolder {
-  private enum Fields {
+  public enum Fields {
     SERIALIZER_CLASS,
     REBALANCER_CONFIG,
     REBALANCER_CONFIG_CLASS

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
index bfc3309..727c3df 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
@@ -56,7 +56,12 @@ public final class SemiAutoRebalancerConfig extends PartitionedRebalancerConfig
    * Instantiate a SemiAutoRebalancerConfig
    */
   public SemiAutoRebalancerConfig() {
-    setRebalanceMode(RebalanceMode.SEMI_AUTO);
+    if (getClass().equals(SemiAutoRebalancerConfig.class)) {
+      // only mark this as semi auto mode if this specifc config is used
+      setRebalanceMode(RebalanceMode.SEMI_AUTO);
+    } else {
+      setRebalanceMode(RebalanceMode.USER_DEFINED);
+    }
     setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
     _preferenceLists = Maps.newHashMap();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index ec812b2..644b9f6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -37,12 +37,14 @@ import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.controller.rebalancer.FallbackRebalancer;
 import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 /**
@@ -52,6 +54,9 @@ import com.google.common.collect.Sets;
 public class BestPossibleStateCalcStage extends AbstractBaseStage {
   private static final Logger LOG = Logger.getLogger(BestPossibleStateCalcStage.class.getName());
 
+  // cache for rebalancer instances
+  private Map<ResourceId, HelixRebalancer> _rebalancerMap = Maps.newHashMap();
+
   @Override
   public void process(ClusterEvent event) throws Exception {
     long startTime = System.currentTimeMillis();
@@ -63,11 +68,11 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
     Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
-    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Cluster cluster = event.getAttribute("Cluster");
 
     if (currentStateOutput == null || resourceMap == null || cluster == null) {
       throw new StageException("Missing attributes in event:" + event
-          + ". Requires CURRENT_STATE|RESOURCES|DataCache");
+          + ". Requires CURRENT_STATE|RESOURCES|Cluster");
     }
 
     BestPossibleStateOutput bestPossibleStateOutput =
@@ -178,25 +183,42 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
           stateModelDefs.get(rebalancerConfig.getStateModelDefId());
       ResourceAssignment resourceAssignment = null;
       if (rebalancerConfig != null) {
+        // use a cached rebalancer if possible
+        RebalancerRef ref = rebalancerConfig.getRebalancerRef();
         HelixRebalancer rebalancer = null;
-        if (rebalancerConfig != null && rebalancerConfig.getRebalancerRef() != null) {
-          rebalancer = rebalancerConfig.getRebalancerRef().getRebalancer();
+        if (_rebalancerMap.containsKey(resourceId)) {
+          HelixRebalancer candidateRebalancer = _rebalancerMap.get(resourceId);
+          if (ref != null && candidateRebalancer.getClass().equals(ref.toString())) {
+            rebalancer = candidateRebalancer;
+          }
         }
-        HelixManager manager = event.getAttribute("helixmanager");
-        ControllerContextProvider provider =
-            event.getAttribute(AttributeName.CONTEXT_PROVIDER.toString());
+
+        // otherwise instantiate a new one
         if (rebalancer == null) {
-          rebalancer = new FallbackRebalancer();
+          if (ref != null) {
+            rebalancer = ref.getRebalancer();
+          }
+          HelixManager manager = event.getAttribute("helixmanager");
+          ControllerContextProvider provider =
+              event.getAttribute(AttributeName.CONTEXT_PROVIDER.toString());
+          if (rebalancer == null) {
+            rebalancer = new FallbackRebalancer();
+          }
+          rebalancer.init(manager, provider);
+          _rebalancerMap.put(resourceId, rebalancer);
         }
-        rebalancer.init(manager, provider);
         ResourceAssignment currentAssignment = null;
         Resource resourceSnapshot = cluster.getResource(resourceId);
         if (resourceSnapshot != null) {
           currentAssignment = resourceSnapshot.getResourceAssignment();
         }
-        resourceAssignment =
-            rebalancer.computeResourceMapping(rebalancerConfig, currentAssignment, cluster,
-                currentStateOutput);
+        try {
+          resourceAssignment =
+              rebalancer.computeResourceMapping(rebalancerConfig, currentAssignment, cluster,
+                  currentStateOutput);
+        } catch (Exception e) {
+          LOG.error("Rebalancer for resource " + resourceId + " failed.", e);
+        }
       }
       if (resourceAssignment == null) {
         resourceAssignment =

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 6f09d26..0c28bdf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -22,12 +22,18 @@ package org.apache.helix.controller.stages;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.HelixConstants.StateModelToken;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.model.ClusterConfiguration;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.CurrentState;
@@ -35,22 +41,43 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.ResourceConfiguration;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 /**
  * Reads the data from the cluster using data accessor. This output ClusterData which
  * provides useful methods to search/lookup properties
  */
-@Deprecated
 public class ClusterDataCache {
   Map<String, LiveInstance> _liveInstanceMap;
+  Map<String, LiveInstance> _liveInstanceCacheMap;
   Map<String, IdealState> _idealStateMap;
+  Map<String, IdealState> _idealStateCacheMap;
   Map<String, StateModelDefinition> _stateModelDefMap;
   Map<String, InstanceConfig> _instanceConfigMap;
+  Map<String, InstanceConfig> _instanceConfigCacheMap;
   Map<String, ClusterConstraints> _constraintMap;
   Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
   Map<String, Map<String, Message>> _messageMap;
+  Map<String, Map<String, String>> _idealStateRuleMap;
+  Map<String, ResourceConfiguration> _resourceConfigMap;
+  Map<String, ControllerContextHolder> _controllerContextMap;
+  PauseSignal _pause;
+  LiveInstance _leader;
+  ClusterConfiguration _clusterConfig;
+  boolean _writeAssignments;
+
+  // maintain a cache of participant messages across pipeline runs
+  Map<String, Map<String, Message>> _messageCache = Maps.newHashMap();
+
+  boolean _init = true;
 
   // Map<String, Map<String, HealthStat>> _healthStatMap;
   // private HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
@@ -66,39 +93,111 @@ public class ClusterDataCache {
    * @param accessor
    * @return
    */
-  public boolean refresh(HelixDataAccessor accessor) {
+  public synchronized boolean refresh(HelixDataAccessor accessor) {
+    LOG.info("START: ClusterDataCache.refresh()");
+    long startTime = System.currentTimeMillis();
+
     Builder keyBuilder = accessor.keyBuilder();
-    _idealStateMap = accessor.getChildValuesMap(keyBuilder.idealStates());
-    _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+
+    if (_init) {
+      _idealStateCacheMap = accessor.getChildValuesMap(keyBuilder.idealStates());
+      _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+      _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+    }
+    _idealStateMap = Maps.newHashMap(_idealStateCacheMap);
+    _liveInstanceMap = Maps.newHashMap(_liveInstanceCacheMap);
+    _instanceConfigMap = Maps.newHashMap(_instanceConfigCacheMap);
 
     for (LiveInstance instance : _liveInstanceMap.values()) {
-      LOG.trace("live instance: " + instance.getParticipantId() + " "
-          + instance.getTypedSessionId());
+      LOG.trace("live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
     }
 
     _stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
-    _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
     _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());
 
     Map<String, Map<String, Message>> msgMap = new HashMap<String, Map<String, Message>>();
+    List<PropertyKey> newMessageKeys = Lists.newLinkedList();
+    long purgeSum = 0;
     for (String instanceName : _liveInstanceMap.keySet()) {
-      Map<String, Message> map = accessor.getChildValuesMap(keyBuilder.messages(instanceName));
-      msgMap.put(instanceName, map);
+      // get the cache
+      Map<String, Message> cachedMap = _messageCache.get(instanceName);
+      if (cachedMap == null) {
+        cachedMap = Maps.newHashMap();
+        _messageCache.put(instanceName, cachedMap);
+      }
+      msgMap.put(instanceName, cachedMap);
+
+      // get the current names
+      Set<String> messageNames =
+          Sets.newHashSet(accessor.getChildNames(keyBuilder.messages(instanceName)));
+
+      long purgeStart = System.currentTimeMillis();
+      // clear stale names
+      Iterator<String> cachedNamesIter = cachedMap.keySet().iterator();
+      while (cachedNamesIter.hasNext()) {
+        String messageName = cachedNamesIter.next();
+        if (!messageNames.contains(messageName)) {
+          cachedNamesIter.remove();
+        }
+      }
+      long purgeEnd = System.currentTimeMillis();
+      purgeSum += purgeEnd - purgeStart;
+
+      // get the keys for the new messages
+      for (String messageName : messageNames) {
+        if (!cachedMap.containsKey(messageName)) {
+          newMessageKeys.add(keyBuilder.message(instanceName, messageName));
+        }
+      }
+    }
+
+    // get the new messages
+    if (newMessageKeys.size() > 0) {
+      List<Message> newMessages = accessor.getProperty(newMessageKeys);
+      for (Message message : newMessages) {
+        if (message != null) {
+          Map<String, Message> cachedMap = _messageCache.get(message.getTgtName());
+          cachedMap.put(message.getId(), message);
+        }
+      }
     }
     _messageMap = Collections.unmodifiableMap(msgMap);
+    LOG.debug("Purge took: " + purgeSum);
 
+    List<PropertyKey> currentStateKeys = Lists.newLinkedList();
     Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap =
         new HashMap<String, Map<String, Map<String, CurrentState>>>();
     for (String instanceName : _liveInstanceMap.keySet()) {
       LiveInstance liveInstance = _liveInstanceMap.get(instanceName);
-      String sessionId = liveInstance.getTypedSessionId().stringify();
-      if (!allCurStateMap.containsKey(instanceName)) {
-        allCurStateMap.put(instanceName, new HashMap<String, Map<String, CurrentState>>());
+      String sessionId = liveInstance.getSessionId();
+      List<String> currentStateNames =
+          accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
+      for (String currentStateName : currentStateNames) {
+        currentStateKeys.add(keyBuilder.currentState(instanceName, sessionId, currentStateName));
+      }
+
+      // ensure an empty current state map for all live instances and sessions
+      Map<String, Map<String, CurrentState>> instanceCurStateMap = allCurStateMap.get(instanceName);
+      if (instanceCurStateMap == null) {
+        instanceCurStateMap = Maps.newHashMap();
+        allCurStateMap.put(instanceName, instanceCurStateMap);
+      }
+      Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(sessionId);
+      if (sessionCurStateMap == null) {
+        sessionCurStateMap = Maps.newHashMap();
+        instanceCurStateMap.put(sessionId, sessionCurStateMap);
+      }
+    }
+    List<CurrentState> currentStates = accessor.getProperty(currentStateKeys);
+    Iterator<PropertyKey> csKeyIter = currentStateKeys.iterator();
+    for (CurrentState currentState : currentStates) {
+      PropertyKey key = csKeyIter.next();
+      String[] params = key.getParams();
+      if (currentState != null && params.length >= 4) {
+        Map<String, Map<String, CurrentState>> instanceCurStateMap = allCurStateMap.get(params[1]);
+        Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(params[2]);
+        sessionCurStateMap.put(params[3], currentState);
       }
-      Map<String, Map<String, CurrentState>> curStateMap = allCurStateMap.get(instanceName);
-      Map<String, CurrentState> map =
-          accessor.getChildValuesMap(keyBuilder.currentStates(instanceName, sessionId));
-      curStateMap.put(sessionId, map);
     }
 
     for (String instance : allCurStateMap.keySet()) {
@@ -106,10 +205,63 @@ public class ClusterDataCache {
     }
     _currentStateMap = Collections.unmodifiableMap(allCurStateMap);
 
+    // New in 0.7: Read more information for the benefit of user-defined rebalancers
+    _resourceConfigMap = accessor.getChildValuesMap(keyBuilder.resourceConfigs());
+    _controllerContextMap = accessor.getChildValuesMap(keyBuilder.controllerContexts());
+
+    // Read all single properties together
+    List<HelixProperty> singleProperties =
+        accessor.getProperty(ImmutableList.of(keyBuilder.clusterConfig(),
+            keyBuilder.controllerLeader(), keyBuilder.pause()));
+    _clusterConfig = (ClusterConfiguration) singleProperties.get(0);
+    if (_clusterConfig != null) {
+      _idealStateRuleMap = _clusterConfig.getIdealStateRules();
+    } else {
+      _idealStateRuleMap = Collections.emptyMap();
+    }
+    _leader = (LiveInstance) singleProperties.get(1);
+    _pause = (PauseSignal) singleProperties.get(2);
+
+    long endTime = System.currentTimeMillis();
+    LOG.info("END: ClusterDataCache.refresh(), took " + (endTime - startTime) + " ms");
+
+    if (LOG.isDebugEnabled()) {
+      int numPaths =
+          _liveInstanceMap.size() + _idealStateMap.size() + +_resourceConfigMap.size()
+              + _stateModelDefMap.size() + _instanceConfigMap.size() + _constraintMap.size()
+              + _controllerContextMap.size() + newMessageKeys.size() + currentStateKeys.size();
+      LOG.debug("Paths read: " + numPaths);
+    }
+
+    _init = false;
     return true;
   }
 
   /**
+   * Get the live instance associated with the controller leader
+   * @return LiveInstance
+   */
+  public LiveInstance getLeader() {
+    return _leader;
+  }
+
+  /**
+   * Get the pause signal (if any)
+   * @return PauseSignal
+   */
+  public PauseSignal getPauseSignal() {
+    return _pause;
+  }
+
+  /**
+   * Retrieves the configs for all resources
+   * @return
+   */
+  public Map<String, ResourceConfiguration> getResourceConfigs() {
+    return _resourceConfigMap;
+  }
+
+  /**
    * Retrieves the idealstates for all resources
    * @return
    */
@@ -117,6 +269,18 @@ public class ClusterDataCache {
     return _idealStateMap;
   }
 
+  public synchronized void setIdealStates(List<IdealState> idealStates) {
+    Map<String, IdealState> idealStateMap = Maps.newHashMap();
+    for (IdealState idealState : idealStates) {
+      idealStateMap.put(idealState.getId(), idealState);
+    }
+    _idealStateCacheMap = idealStateMap;
+  }
+
+  public Map<String, Map<String, String>> getIdealStateRules() {
+    return _idealStateRuleMap;
+  }
+
   /**
    * Returns the LiveInstances for each of the instances that are curretnly up and running
    * @return
@@ -125,6 +289,14 @@ public class ClusterDataCache {
     return _liveInstanceMap;
   }
 
+  public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
+    Map<String, LiveInstance> liveInstanceMap = Maps.newHashMap();
+    for (LiveInstance liveInstance : liveInstances) {
+      liveInstanceMap.put(liveInstance.getId(), liveInstance);
+    }
+    _liveInstanceCacheMap = liveInstanceMap;
+  }
+
   /**
    * Provides the current state of the node for a given session id,
    * the sessionid can be got from LiveInstance
@@ -133,6 +305,10 @@ public class ClusterDataCache {
    * @return
    */
   public Map<String, CurrentState> getCurrentState(String instanceName, String clientSessionId) {
+    if (!_currentStateMap.containsKey(instanceName)
+        || !_currentStateMap.get(instanceName).containsKey(clientSessionId)) {
+      return Collections.emptyMap();
+    }
     return _currentStateMap.get(instanceName).get(clientSessionId);
   }
 
@@ -150,6 +326,14 @@ public class ClusterDataCache {
     }
   }
 
+  /**
+   * Provides all outstanding messages
+   * @return
+   */
+  public Map<String, Map<String, Message>> getMessageMap() {
+    return _messageMap;
+  }
+
   // public HealthStat getGlobalStats()
   // {
   // return _globalStats;
@@ -187,11 +371,18 @@ public class ClusterDataCache {
    * @return
    */
   public StateModelDefinition getStateModelDef(String stateModelDefRef) {
-
     return _stateModelDefMap.get(stateModelDefRef);
   }
 
   /**
+   * Get all state model definitions
+   * @return map of name to state model definition
+   */
+  public Map<String, StateModelDefinition> getStateModelDefMap() {
+    return _stateModelDefMap;
+  }
+
+  /**
    * Provides the idealstate for a given resource
    * @param resourceName
    * @return
@@ -208,6 +399,14 @@ public class ClusterDataCache {
     return _instanceConfigMap;
   }
 
+  public synchronized void setInstanceConfigs(List<InstanceConfig> instanceConfigs) {
+    Map<String, InstanceConfig> instanceConfigMap = Maps.newHashMap();
+    for (InstanceConfig instanceConfig : instanceConfigs) {
+      instanceConfigMap.put(instanceConfig.getId(), instanceConfig);
+    }
+    _instanceConfigCacheMap = instanceConfigMap;
+  }
+
   /**
    * Some partitions might be disabled on specific nodes.
    * This method allows one to fetch the set of nodes where a given partition is disabled
@@ -266,6 +465,55 @@ public class ClusterDataCache {
     return null;
   }
 
+  public Map<String, ClusterConstraints> getConstraintMap() {
+    return _constraintMap;
+  }
+
+  public Map<String, ControllerContextHolder> getContextMap() {
+    return _controllerContextMap;
+  }
+
+  public ClusterConfiguration getClusterConfig() {
+    return _clusterConfig;
+  }
+
+  public void cacheMessages(List<Message> messages) {
+    for (Message message : messages) {
+      String instanceName = message.getTgtName();
+      Map<String, Message> instMsgMap = null;
+      if (_messageCache.containsKey(instanceName)) {
+        instMsgMap = _messageCache.get(instanceName);
+      } else {
+        instMsgMap = Maps.newHashMap();
+        _messageCache.put(instanceName, instMsgMap);
+      }
+      instMsgMap.put(message.getId(), message);
+    }
+  }
+
+  /**
+   * Enable or disable writing resource assignments
+   * @param enable true to enable, false to disable
+   */
+  public void setAssignmentWritePolicy(boolean enable) {
+    _writeAssignments = enable;
+  }
+
+  /**
+   * Check if writing resource assignments is enabled
+   * @return true if enabled, false if disabled
+   */
+  public boolean assignmentWriteEnabled() {
+    return _writeAssignments;
+  }
+
+  /**
+   * Indicate that a full read should be done on the next refresh
+   */
+  public synchronized void requireFullRefresh() {
+    _init = true;
+  }
+
   /**
    * toString method to print the entire cluster state
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
index 532ecb5..15264ca 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
@@ -40,10 +40,10 @@ public class CompatibilityCheckStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
     HelixManager manager = event.getAttribute("helixmanager");
-    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Cluster cluster = event.getAttribute("Cluster");
     if (manager == null || cluster == null) {
       throw new StageException("Missing attributes in event:" + event
-          + ". Requires HelixManager | DataCache");
+          + ". Requires HelixManager | Cluster");
     }
 
     HelixManagerProperties properties = manager.getProperties();

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 8235173..64bf792 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -47,13 +47,13 @@ import org.apache.helix.model.Message.MessageType;
 public class CurrentStateComputationStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
-    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Cluster cluster = event.getAttribute("Cluster");
     Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
 
     if (cluster == null || resourceMap == null) {
       throw new StageException("Missing attributes in event:" + event
-          + ". Requires DataCache|RESOURCE");
+          + ". Requires Cluster|RESOURCE");
     }
 
     ResourceCurrentState currentStateOutput = new ResourceCurrentState();

http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index e8e42bf..a46acbd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -65,11 +65,11 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
     HelixManager manager = event.getAttribute("helixmanager");
     Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
-    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Cluster cluster = event.getAttribute("Cluster");
 
     if (manager == null || resourceMap == null || cluster == null) {
       throw new StageException("Missing attributes in event:" + event
-          + ". Requires ClusterManager|RESOURCES|DataCache");
+          + ". Requires ClusterManager|RESOURCES|Cluster");
     }
 
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();