You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/07 07:40:38 UTC

[1/3] [HELIX-109] adding config classes

Updated Branches:
  refs/heads/helix-logical-model c07569d47 -> 5972a44e7


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index fbc8328..9fc8447 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -25,13 +25,16 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.api.Id;
-import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.State;
+import org.apache.helix.controller.rebalancer.NewRebalancer;
 import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.NewCurrentStateOutput;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
@@ -39,8 +42,6 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.IdealStateProperty;
 import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -51,38 +52,27 @@ import org.testng.annotations.Test;
 public class TestCustomizedIdealStateRebalancer extends
     ZkStandAloneCMTestBaseWithPropertyServerCheck {
   String db2 = TEST_DB + "2";
-  static boolean testRebalancerCreated = false;
   static boolean testRebalancerInvoked = false;
 
-  public static class TestRebalancer implements Rebalancer {
-
-    @Override
-    public void init(HelixManager manager) {
-      testRebalancerCreated = true;
-    }
+  public static class TestRebalancer implements NewRebalancer {
 
     /**
      * Very basic mapping that evenly assigns one replica of each partition to live nodes, each of
      * which is in the highest-priority state.
      */
     @Override
-    public ResourceAssignment computeResourceMapping(Resource resource,
-        IdealState currentIdealState, CurrentStateOutput currentStateOutput,
-        ClusterDataCache clusterData) {
-      List<String> liveInstances = new ArrayList<String>(clusterData.getLiveInstances().keySet());
-      String stateModelName = currentIdealState.getStateModelDefRef();
-      StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
-      ResourceAssignment resourceMapping =
-          new ResourceAssignment(Id.resource(resource.getResourceName()));
+    public ResourceAssignment computeResourceMapping(Resource resource, Cluster cluster,
+        StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
+      List<ParticipantId> liveParticipants =
+          new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
+      ResourceAssignment resourceMapping = new ResourceAssignment(resource.getId());
       int i = 0;
-      for (Partition partition : resource.getPartitions()) {
-        String partitionName = partition.getPartitionName();
-        int nodeIndex = i % liveInstances.size();
-        currentIdealState.getInstanceStateMap(partitionName).clear();
-        currentIdealState.getInstanceStateMap(partitionName).put(liveInstances.get(nodeIndex),
-            stateModelDef.getStatesPriorityStringList().get(0));
-        resourceMapping.addReplicaMap(Id.partition(partitionName), ResourceAssignment
-            .replicaMapFromStringMap(currentIdealState.getInstanceStateMap(partitionName)));
+      for (PartitionId partitionId : resource.getPartitionSet()) {
+        int nodeIndex = i % liveParticipants.size();
+        Map<ParticipantId, State> replicaMap = new HashMap<ParticipantId, State>();
+        replicaMap.put(liveParticipants.get(nodeIndex), stateModelDef.getStatesPriorityList()
+            .get(0));
+        resourceMapping.addReplicaMap(partitionId, replicaMap);
         i++;
       }
       testRebalancerInvoked = true;
@@ -107,7 +97,7 @@ public class TestCustomizedIdealStateRebalancer extends
     Assert.assertTrue(result);
     Thread.sleep(1000);
     HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor(_zkClient));
+        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
     ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2));
     Assert.assertEquals(ev.getPartitionStringSet().size(), 60);
@@ -119,7 +109,6 @@ public class TestCustomizedIdealStateRebalancer extends
       Assert.assertEquals(is.getPreferenceList(partition).size(), 0);
       Assert.assertEquals(is.getInstanceStateMap(partition).size(), 0);
     }
-    Assert.assertTrue(testRebalancerCreated);
     Assert.assertTrue(testRebalancerInvoked);
   }
 
@@ -138,7 +127,7 @@ public class TestCustomizedIdealStateRebalancer extends
     public boolean verify() {
       try {
         HelixDataAccessor accessor =
-            new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(_client));
+            new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_client));
         Builder keyBuilder = accessor.keyBuilder();
         int numberOfPartitions =
             accessor.getProperty(keyBuilder.idealState(_resourceName)).getRecord().getListFields()
@@ -159,9 +148,9 @@ public class TestCustomizedIdealStateRebalancer extends
         if (instances == 0) {
           instances = cache.getLiveInstances().size();
         }
-        return verifyBalanceExternalView(
-            accessor.getProperty(keyBuilder.externalView(_resourceName)).getRecord(),
-            numberOfPartitions, masterValue, replicas, instances);
+        ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName));
+        return verifyBalanceExternalView(externalView.getRecord(), numberOfPartitions, masterValue,
+            replicas, instances);
       } catch (Exception e) {
         return false;
       }


[3/3] git commit: [HELIX-109] adding config classes

Posted by ki...@apache.org.
[HELIX-109] adding config classes


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/5972a44e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/5972a44e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/5972a44e

Branch: refs/heads/helix-logical-model
Commit: 5972a44e726dcb0d9ea3671eac5caf45fc72c0fc
Parents: c07569d
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Fri Sep 6 22:39:57 2013 -0700
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Fri Sep 6 22:39:57 2013 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/api/Cluster.java |  54 ++++--
 .../org/apache/helix/api/ClusterAccessor.java   |  63 +++++-
 .../org/apache/helix/api/ClusterConfig.java     | 180 +++++++++++++++++
 .../java/org/apache/helix/api/Participant.java  | 157 ++-------------
 .../org/apache/helix/api/ParticipantConfig.java | 194 +++++++++++++++++++
 .../org/apache/helix/api/RebalancerConfig.java  |  64 +++++-
 .../org/apache/helix/api/RebalancerRef.java     |   5 +-
 .../java/org/apache/helix/api/Resource.java     | 172 +++++-----------
 .../org/apache/helix/api/ResourceConfig.java    | 173 +++++++++++++++++
 .../controller/GenericHelixController.java      |  14 +-
 .../rebalancer/NewAutoRebalancer.java           |  42 ++--
 .../rebalancer/NewCustomRebalancer.java         |  13 +-
 .../rebalancer/NewSemiAutoRebalancer.java       |  13 +-
 .../util/NewConstraintBasedAssignment.java      |   5 +-
 .../stages/NewBestPossibleStateCalcStage.java   |   7 +-
 .../stages/NewBestPossibleStateOutput.java      |   9 +
 .../stages/NewCurrentStateComputationStage.java |  12 +-
 .../stages/NewExternalViewComputeStage.java     |  12 +-
 .../stages/NewMessageGenerationStage.java       |  22 ++-
 .../stages/NewMessageSelectionStage.java        |  14 +-
 .../stages/NewMessageThrottleStage.java         |  12 +-
 .../stages/NewResourceComputationStage.java     |  16 +-
 .../stages/NewTaskAssignmentStage.java          |  18 +-
 .../apache/helix/model/ClusterConstraints.java  |   8 +
 .../java/org/apache/helix/model/IdealState.java |  18 +-
 .../apache/helix/model/ResourceAssignment.java  |   8 +
 .../helix/tools/ClusterStateVerifier.java       |  84 +++++---
 .../org/apache/helix/api/TestNewStages.java     |  12 +-
 .../helix/controller/stages/BaseStageTest.java  |  67 ++++---
 .../TestBestPossibleCalcStageCompatibility.java |  73 ++++---
 .../stages/TestBestPossibleStateCalcStage.java  |  35 ++--
 .../stages/TestCompatibilityCheckStage.java     |  13 +-
 .../TestCurrentStateComputationStage.java       |  52 ++---
 .../stages/TestMessageThrottleStage.java        |  35 ++--
 .../stages/TestMsgSelectionStage.java           |  96 ++++++---
 .../stages/TestRebalancePipeline.java           |  76 ++++----
 .../stages/TestResourceComputationStage.java    |  79 ++++----
 .../helix/integration/TestAutoRebalance.java    |   6 +-
 .../TestCustomizedIdealStateRebalancer.java     |  59 +++---
 39 files changed, 1303 insertions(+), 689 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index e890fb4..ce2318a 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -25,13 +25,14 @@ import java.util.Map;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 
+import com.google.common.base.Function;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 
 /**
  * Represent a logical helix cluster
  */
 public class Cluster {
-  private final ClusterId _id;
 
   /**
    * map of resource-id to resource
@@ -60,9 +61,7 @@ public class Cluster {
 
   private final ControllerId _leaderId;
 
-  private final ClusterConfig _config = null;
-
-  private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+  private final ClusterConfig _config;
 
   /**
    * construct a cluster
@@ -74,9 +73,26 @@ public class Cluster {
    */
   public Cluster(ClusterId id, Map<ResourceId, Resource> resourceMap,
       Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
-      ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap) {
-
-    _id = id;
+      ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap, boolean isPaused) {
+
+    // build the config
+    // Guava's transform and "copy" functions really return views so the maps all reflect each other
+    Map<ResourceId, ResourceConfig> resourceConfigMap =
+        Maps.transformValues(resourceMap, new Function<Resource, ResourceConfig>() {
+          @Override
+          public ResourceConfig apply(Resource resource) {
+            return resource.getConfig();
+          }
+        });
+    Map<ParticipantId, ParticipantConfig> participantConfigMap =
+        Maps.transformValues(participantMap, new Function<Participant, ParticipantConfig>() {
+          @Override
+          public ParticipantConfig apply(Participant participant) {
+            return participant.getConfig();
+          }
+        });
+    _config =
+        new ClusterConfig(id, resourceConfigMap, participantConfigMap, constraintMap, isPaused);
 
     _resourceMap = ImmutableMap.copyOf(resourceMap);
 
@@ -94,8 +110,6 @@ public class Cluster {
 
     _leaderId = leaderId;
 
-    _constraintMap = ImmutableMap.copyOf(constraintMap);
-
     // TODO impl this when we persist controllers and spectators on zookeeper
     _controllerMap = ImmutableMap.copyOf(controllerMap);
     _spectatorMap = Collections.emptyMap();
@@ -106,7 +120,7 @@ public class Cluster {
    * @return cluster id
    */
   public ClusterId getId() {
-    return _id;
+    return _config.getId();
   }
 
   /**
@@ -167,17 +181,27 @@ public class Cluster {
   }
 
   /**
-   * @return
+   * Get all the constraints on the cluster
+   * @return map of constraint type to constraints
    */
   public Map<ConstraintType, ClusterConstraints> getConstraintMap() {
-    return _constraintMap;
+    return _config.getConstraintMap();
   }
 
   /**
-   * @param type
-   * @return
+   * Get a cluster constraint
+   * @param type the type of constrant to query
+   * @return cluster constraints, or null if none
    */
   public ClusterConstraints getConstraint(ConstraintType type) {
-    return _constraintMap.get(type);
+    return _config.getConstraintMap().get(type);
+  }
+
+  /**
+   * Check the pasued status of the cluster
+   * @return true if paused, false otherwise
+   */
+  public boolean isPaused() {
+    return _config.isPaused();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
index 04d5831..a9fcf79 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
@@ -24,12 +24,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
@@ -76,7 +78,7 @@ public class ClusterAccessor {
    */
   public void dropCluster() {
     LOG.info("Dropping cluster: " + _clusterId);
-    List<String> liveInstanceNames =_accessor.getChildNames(_keyBuilder.liveInstances());
+    List<String> liveInstanceNames = _accessor.getChildNames(_keyBuilder.liveInstances());
     if (liveInstanceNames.size() > 0) {
       throw new HelixException("Can't drop cluster: " + _clusterId
           + " because there are running participant: " + liveInstanceNames
@@ -97,6 +99,7 @@ public class ClusterAccessor {
    * @return cluster
    */
   public Cluster readCluster() {
+    // TODO many of these should live in resource, participant, etc accessors
     /**
      * map of instance-id to instance-config
      */
@@ -147,13 +150,20 @@ public class ClusterAccessor {
     Map<String, ClusterConstraints> constraintMap =
         _accessor.getChildValuesMap(_keyBuilder.constraints());
 
+    /**
+     * Map of resource id to external view
+     */
+    Map<String, ExternalView> externalViewMap =
+        _accessor.getChildValuesMap(_keyBuilder.externalViews());
+
     Map<ResourceId, Resource> resourceMap = new HashMap<ResourceId, Resource>();
     for (String resourceName : idealStateMap.keySet()) {
       IdealState idealState = idealStateMap.get(resourceName);
-
       // TODO pass resource assignment
       ResourceId resourceId = Id.resource(resourceName);
-      resourceMap.put(resourceId, new Resource(resourceId, idealState, null));
+      resourceMap.put(resourceId,
+          new Resource(resourceId, idealState, null, externalViewMap.get(resourceName),
+              liveInstanceMap.size()));
     }
 
     Map<ParticipantId, Participant> participantMap = new HashMap<ParticipantId, Participant>();
@@ -182,8 +192,11 @@ public class ClusterAccessor {
           constraintMap.get(constraintType));
     }
 
+    PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
+    boolean isPaused = pauseSignal != null;
+
     return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
-        clusterConstraintMap);
+        clusterConstraintMap, isPaused);
   }
 
   /**
@@ -204,7 +217,7 @@ public class ClusterAccessor {
    * add a resource to cluster
    * @param resource
    */
-  public void addResourceToCluster(Resource resource) {
+  public void addResourceToCluster(ResourceConfig resource) {
     StateModelDefId stateModelDefId = resource.getRebalancerConfig().getStateModelDefId();
     if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
       throw new HelixException("State model: " + stateModelDefId + " not found in cluster: "
@@ -217,8 +230,42 @@ public class ClusterAccessor {
           + ", because resource ideal state already exists in cluster: " + _clusterId);
     }
 
-    // TODO convert rebalancerConfig to idealState
-    _accessor.createProperty(_keyBuilder.idealState(resourceId.stringify()), null);
+    // Create an IdealState from a RebalancerConfig
+    RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+    IdealState idealState = new IdealState(resourceId);
+    idealState.setRebalanceMode(rebalancerConfig.getRebalancerMode());
+    idealState.setMaxPartitionsPerInstance(rebalancerConfig.getMaxPartitionsPerParticipant());
+    if (rebalancerConfig.canAssignAnyLiveParticipant()) {
+      idealState.setReplicas(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString());
+    } else {
+      idealState.setReplicas(Integer.toString(rebalancerConfig.getReplicaCount()));
+    }
+    idealState.setStateModelDefId(rebalancerConfig.getStateModelDefId());
+    for (PartitionId partitionId : resource.getPartitionSet()) {
+      List<ParticipantId> preferenceList = rebalancerConfig.getPreferenceList(partitionId);
+      Map<ParticipantId, State> preferenceMap = rebalancerConfig.getPreferenceMap(partitionId);
+      if (preferenceList != null) {
+        idealState.setPreferenceList(partitionId, preferenceList);
+      }
+      if (preferenceMap != null) {
+        idealState.setParticipantStateMap(partitionId, preferenceMap);
+      }
+    }
+    idealState.setBucketSize(rebalancerConfig.getBucketSize());
+    idealState.setBatchMessageMode(rebalancerConfig.getBatchMessageMode());
+    String groupTag = rebalancerConfig.getParticipantGroupTag();
+    if (groupTag != null) {
+      idealState.setInstanceGroupTag(groupTag);
+    }
+    RebalancerRef rebalancerRef = rebalancerConfig.getRebalancerRef();
+    if (rebalancerRef != null) {
+      idealState.setRebalancerRef(rebalancerRef);
+    }
+    StateModelFactoryId stateModelFactoryId = rebalancerConfig.getStateModelFactoryId();
+    if (stateModelFactoryId != null) {
+      idealState.setStateModelFactoryId(stateModelFactoryId);
+    }
+    _accessor.createProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
   }
 
   /**
@@ -244,7 +291,7 @@ public class ClusterAccessor {
    * add a participant to cluster
    * @param participant
    */
-  public void addParticipantToCluster(Participant participant) {
+  public void addParticipantToCluster(ParticipantConfig participant) {
     if (!isClusterStructureValid()) {
       throw new HelixException("Cluster: " + _clusterId + " structure is not valid");
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
index 8a2f629..1585aae 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
@@ -1,5 +1,14 @@
 package org.apache.helix.api;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+
+import com.google.common.collect.ImmutableMap;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,6 +28,177 @@ package org.apache.helix.api;
  * under the License.
  */
 
+/**
+ * Configuration properties of a cluster
+ */
 public class ClusterConfig {
+  private final ClusterId _id;
+  private final Map<ResourceId, ResourceConfig> _resourceMap;
+  private final Map<ParticipantId, ParticipantConfig> _participantMap;
+  private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+  private final boolean _isPaused;
+
+  /**
+   * Initialize a cluster configuration. Also see ClusterConfig.Builder
+   * @param id
+   * @param resourceMap
+   * @param participantMap
+   * @param constraintMap
+   */
+  public ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
+      Map<ParticipantId, ParticipantConfig> participantMap,
+      Map<ConstraintType, ClusterConstraints> constraintMap, boolean isPaused) {
+    _id = id;
+    _resourceMap = ImmutableMap.copyOf(resourceMap);
+    _participantMap = ImmutableMap.copyOf(participantMap);
+    _constraintMap = ImmutableMap.copyOf(constraintMap);
+    _isPaused = isPaused;
+  }
+
+  /**
+   * Get cluster id
+   * @return cluster id
+   */
+  public ClusterId getId() {
+    return _id;
+  }
+
+  /**
+   * Get resources in the cluster
+   * @return a map of resource id to resource, or empty map if none
+   */
+  public Map<ResourceId, ResourceConfig> getResourceMap() {
+    return _resourceMap;
+  }
+
+  /**
+   * Get all the constraints on the cluster
+   * @return map of constraint type to constraints
+   */
+  public Map<ConstraintType, ClusterConstraints> getConstraintMap() {
+    return _constraintMap;
+  }
+
+  /**
+   * Get participants of the cluster
+   * @return a map of participant id to participant, or empty map if none
+   */
+  public Map<ParticipantId, ParticipantConfig> getParticipantMap() {
+    return _participantMap;
+  }
+
+  /**
+   * Check the pasued status of the cluster
+   * @return true if paused, false otherwise
+   */
+  public boolean isPaused() {
+    return _isPaused;
+  }
+
+  /**
+   * Assembles a cluster configuration
+   */
+  public static class Builder {
+    private final ClusterId _id;
+    private final Map<ResourceId, ResourceConfig> _resourceMap;
+    private final Map<ParticipantId, ParticipantConfig> _participantMap;
+    private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+    private boolean _isPaused;
+
+    /**
+     * Initialize builder for a cluster
+     * @param id cluster id
+     */
+    public Builder(ClusterId id) {
+      _id = id;
+      _resourceMap = new HashMap<ResourceId, ResourceConfig>();
+      _participantMap = new HashMap<ParticipantId, ParticipantConfig>();
+      _constraintMap = new HashMap<ConstraintType, ClusterConstraints>();
+      _isPaused = false;
+    }
+
+    /**
+     * Add a resource to the cluster
+     * @param resource resource configuration
+     * @return Builder
+     */
+    public Builder addResource(ResourceConfig resource) {
+      _resourceMap.put(resource.getId(), resource);
+      return this;
+    }
+
+    /**
+     * Add multiple resources to the cluster
+     * @param resources resource configurations
+     * @return Builder
+     */
+    public Builder addResources(Collection<ResourceConfig> resources) {
+      for (ResourceConfig resource : resources) {
+        addResource(resource);
+      }
+      return this;
+    }
+
+    /**
+     * Add a participant to the cluster
+     * @param participant participant configuration
+     * @return Builder
+     */
+    public Builder addParticipant(ParticipantConfig participant) {
+      _participantMap.put(participant.getId(), participant);
+      return this;
+    }
+
+    /**
+     * Add multiple participants to the cluster
+     * @param participants participant configurations
+     * @return Builder
+     */
+    public Builder addParticipants(Collection<ParticipantConfig> participants) {
+      for (ParticipantConfig participant : participants) {
+        addParticipant(participant);
+      }
+      return this;
+    }
+
+    /**
+     * Add a constraint to the cluster
+     * @param constraint cluster constraint of a specific type
+     * @return Builder
+     */
+    public Builder addConstraint(ClusterConstraints constraint) {
+      _constraintMap.put(constraint.getType(), constraint);
+      return this;
+    }
+
+    /**
+     * Add multiple constraints to the cluster
+     * @param constraints cluster constraints of multiple distinct types
+     * @return Builder
+     */
+    public Builder addConstraints(Collection<ClusterConstraints> constraints) {
+      for (ClusterConstraints constraint : constraints) {
+        addConstraint(constraint);
+      }
+      return this;
+    }
+
+    /**
+     * Set the paused status of the cluster
+     * @param isPaused true if paused, false otherwise
+     * @return Builder
+     */
+    public Builder setPausedStatus(boolean isPaused) {
+      _isPaused = isPaused;
+      return this;
+    }
 
+    /**
+     * Create the cluster configuration
+     * @return ClusterConfig
+     */
+    public ClusterConfig build() {
+      return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _isPaused);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/Participant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Participant.java b/helix-core/src/main/java/org/apache/helix/api/Participant.java
index 10ceebd..0c0cd12 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Participant.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Participant.java
@@ -19,8 +19,6 @@ package org.apache.helix.api;
  * under the License.
  */
 
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -28,22 +26,12 @@ import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 
 /**
  * A cluster participant
  */
 public class Participant {
-  private final ParticipantId _id;
-  private final String _hostName;
-  private final int _port;
-  private final boolean _isEnabled;
-
-  /**
-   * set of disabled partition id's
-   */
-  private final Set<PartitionId> _disabledPartitionIdSet;
-  private final Set<String> _tags;
+  private final ParticipantConfig _config;
 
   private final RunningInstance _runningInstance;
 
@@ -64,12 +52,7 @@ public class Participant {
   public Participant(ParticipantId id, String hostName, int port, boolean isEnabled,
       Set<PartitionId> disabledPartitionIdSet, Set<String> tags, RunningInstance runningInstance,
       Map<ResourceId, CurrentState> currentStateMap, Map<MessageId, Message> messageMap) {
-    _id = id;
-    _hostName = hostName;
-    _port = port;
-    _isEnabled = isEnabled;
-    _disabledPartitionIdSet = ImmutableSet.copyOf(disabledPartitionIdSet);
-    _tags = ImmutableSet.copyOf(tags);
+    _config = new ParticipantConfig(id, hostName, port, isEnabled, disabledPartitionIdSet, tags);
     _runningInstance = runningInstance;
     _currentStateMap = ImmutableMap.copyOf(currentStateMap);
     _messageMap = ImmutableMap.copyOf(messageMap);
@@ -80,7 +63,7 @@ public class Participant {
    * @return host name, or null if not applicable
    */
   public String getHostName() {
-    return _hostName;
+    return _config.getHostName();
   }
 
   /**
@@ -88,7 +71,7 @@ public class Participant {
    * @return port number, or -1 if not applicable
    */
   public int getPort() {
-    return _port;
+    return _config.getPort();
   }
 
   /**
@@ -96,7 +79,7 @@ public class Participant {
    * @return true if enabled or false otherwise
    */
   public boolean isEnabled() {
-    return _isEnabled;
+    return _config.isEnabled();
   }
 
   /**
@@ -120,7 +103,7 @@ public class Participant {
    * @return set of disabled partition id's, or empty set if none
    */
   public Set<PartitionId> getDisablePartitionIds() {
-    return _disabledPartitionIdSet;
+    return _config.getDisablePartitionIds();
   }
 
   /**
@@ -128,7 +111,7 @@ public class Participant {
    * @return set of tags
    */
   public Set<String> getTags() {
-    return _tags;
+    return _config.getTags();
   }
 
   /**
@@ -137,7 +120,7 @@ public class Participant {
    * @return true if tagged, false otherwise
    */
   public boolean hasTag(String tag) {
-    return _tags.contains(tag);
+    return _config.hasTag(tag);
   }
 
   /**
@@ -156,125 +139,19 @@ public class Participant {
     return _currentStateMap;
   }
 
+  /**
+   * Get the participant id
+   * @return ParticipantId
+   */
   public ParticipantId getId() {
-    return _id;
+    return _config.getId();
   }
 
   /**
-   * Assemble a participant
+   * Get the participant configuration
+   * @return ParticipantConfig that backs this participant
    */
-  public static class Builder {
-    private final ParticipantId _id;
-    private final Set<PartitionId> _disabledPartitions;
-    private final Set<String> _tags;
-    private final Map<ResourceId, CurrentState> _currentStateMap;
-    private final Map<MessageId, Message> _messageMap;
-    private String _hostName;
-    private int _port;
-    private boolean _isEnabled;
-    private RunningInstance _runningInstance;
-
-    /**
-     * Build a participant with a given id
-     * @param id participant id
-     */
-    public Builder(ParticipantId id) {
-      _id = id;
-      _disabledPartitions = new HashSet<PartitionId>();
-      _tags = new HashSet<String>();
-      _currentStateMap = new HashMap<ResourceId, CurrentState>();
-      _messageMap = new HashMap<MessageId, Message>();
-      _isEnabled = true;
-    }
-
-    /**
-     * Set the participant host name
-     * @param hostName reachable host when live
-     * @return Builder
-     */
-    public Builder hostName(String hostName) {
-      _hostName = hostName;
-      return this;
-    }
-
-    /**
-     * Set the participant port
-     * @param port port number
-     * @return Builder
-     */
-    public Builder port(int port) {
-      _port = port;
-      return this;
-    }
-
-    /**
-     * Set whether or not the participant is enabled
-     * @param isEnabled true if enabled, false otherwise
-     * @return Builder
-     */
-    public Builder enabled(boolean isEnabled) {
-      _isEnabled = isEnabled;
-      return this;
-    }
-
-    /**
-     * Add a partition to disable for this participant
-     * @param partitionId the partition to disable
-     * @return Builder
-     */
-    public Builder addDisabledPartition(PartitionId partitionId) {
-      _disabledPartitions.add(partitionId);
-      return this;
-    }
-
-    /**
-     * Add an arbitrary tag for this participant
-     * @param tag the tag to add
-     * @return Builder
-     */
-    public Builder addTag(String tag) {
-      _tags.add(tag);
-      return this;
-    }
-
-    /**
-     * Add live properties to participants that are running
-     * @param runningInstance live participant properties
-     * @return Builder
-     */
-    public Builder runningInstance(RunningInstance runningInstance) {
-      _runningInstance = runningInstance;
-      return this;
-    }
-
-    /**
-     * Add a resource current state for this participant
-     * @param resourceId the resource the current state corresponds to
-     * @param currentState the current state
-     * @return Builder
-     */
-    public Builder addCurrentState(ResourceId resourceId, CurrentState currentState) {
-      _currentStateMap.put(resourceId, currentState);
-      return this;
-    }
-
-    /**
-     * Add a message for the participant
-     * @param message message to add
-     * @return Builder
-     */
-    public Builder addMessage(Message message) {
-      _messageMap.put(new MessageId(message.getId()), message);
-      return this;
-    }
-
-    /**
-     * Assemble the participant
-     * @return instantiated Participant
-     */
-    public Participant build() {
-      return new Participant(_id, _hostName, _port, _isEnabled, _disabledPartitions, _tags,
-          _runningInstance, _currentStateMap, _messageMap);
-    }
+  public ParticipantConfig getConfig() {
+    return _config;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java b/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
new file mode 100644
index 0000000..3c77a40
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
@@ -0,0 +1,194 @@
+package org.apache.helix.api;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Configuration properties of a Helix participant
+ */
+public class ParticipantConfig {
+  private final ParticipantId _id;
+  private final String _hostName;
+  private final int _port;
+  private final boolean _isEnabled;
+  private final Set<PartitionId> _disabledPartitions;
+  private final Set<String> _tags;
+
+  /**
+   * Initialize a participant configuration. Also see ParticipantConfig.Builder
+   * @param id participant id
+   * @param hostName host where participant can be reached
+   * @param port port to use to contact participant
+   * @param isEnabled true if enabled, false if disabled
+   * @param disabledPartitions set of partitions, if any to disable on this participant
+   * @param tags tags to set for the participant
+   */
+  public ParticipantConfig(ParticipantId id, String hostName, int port, boolean isEnabled,
+      Set<PartitionId> disabledPartitions, Set<String> tags) {
+    _id = id;
+    _hostName = hostName;
+    _port = port;
+    _isEnabled = isEnabled;
+    _disabledPartitions = ImmutableSet.copyOf(disabledPartitions);
+    _tags = ImmutableSet.copyOf(tags);
+  }
+
+  /**
+   * Get the host name of the participant
+   * @return host name, or null if not applicable
+   */
+  public String getHostName() {
+    return _hostName;
+  }
+
+  /**
+   * Get the port of the participant
+   * @return port number, or -1 if not applicable
+   */
+  public int getPort() {
+    return _port;
+  }
+
+  /**
+   * Get if the participant is enabled
+   * @return true if enabled or false otherwise
+   */
+  public boolean isEnabled() {
+    return _isEnabled;
+  }
+
+  /**
+   * Get disabled partition id's
+   * @return set of disabled partition id's, or empty set if none
+   */
+  public Set<PartitionId> getDisablePartitionIds() {
+    return _disabledPartitions;
+  }
+
+  /**
+   * Get tags
+   * @return set of tags
+   */
+  public Set<String> getTags() {
+    return _tags;
+  }
+
+  /**
+   * Check if participant has a tag
+   * @param tag tag to check
+   * @return true if tagged, false otherwise
+   */
+  public boolean hasTag(String tag) {
+    return _tags.contains(tag);
+  }
+
+  /**
+   * Get the participant id
+   * @return ParticipantId
+   */
+  public ParticipantId getId() {
+    return _id;
+  }
+
+  /**
+   * Assemble a participant
+   */
+  public static class Builder {
+    private final ParticipantId _id;
+    private String _hostName;
+    private int _port;
+    private boolean _isEnabled;
+    private final Set<PartitionId> _disabledPartitions;
+    private final Set<String> _tags;
+
+    /**
+     * Build a participant with a given id
+     * @param id participant id
+     */
+    public Builder(ParticipantId id) {
+      _id = id;
+      _disabledPartitions = new HashSet<PartitionId>();
+      _tags = new HashSet<String>();
+      _isEnabled = true;
+    }
+
+    /**
+     * Set the participant host name
+     * @param hostName reachable host when live
+     * @return Builder
+     */
+    public Builder hostName(String hostName) {
+      _hostName = hostName;
+      return this;
+    }
+
+    /**
+     * Set the participant port
+     * @param port port number
+     * @return Builder
+     */
+    public Builder port(int port) {
+      _port = port;
+      return this;
+    }
+
+    /**
+     * Set whether or not the participant is enabled
+     * @param isEnabled true if enabled, false otherwise
+     * @return Builder
+     */
+    public Builder enabled(boolean isEnabled) {
+      _isEnabled = isEnabled;
+      return this;
+    }
+
+    /**
+     * Add a partition to disable for this participant
+     * @param partitionId the partition to disable
+     * @return Builder
+     */
+    public Builder addDisabledPartition(PartitionId partitionId) {
+      _disabledPartitions.add(partitionId);
+      return this;
+    }
+
+    /**
+     * Add an arbitrary tag for this participant
+     * @param tag the tag to add
+     * @return Builder
+     */
+    public Builder addTag(String tag) {
+      _tags.add(tag);
+      return this;
+    }
+
+    /**
+     * Assemble the participant
+     * @return instantiated Participant
+     */
+    public ParticipantConfig build() {
+      return new ParticipantConfig(_id, _hostName, _port, _isEnabled, _disabledPartitions, _tags);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
index 4ac254d..257354d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
@@ -19,9 +19,11 @@ package org.apache.helix.api;
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.HelixConstants;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.ResourceAssignment;
@@ -40,6 +42,7 @@ public class RebalancerConfig {
   private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
   private final ResourceAssignment _resourceAssignment;
   private final int _replicaCount;
+  private final boolean _anyLiveParticipant;
   private final String _participantGroupTag;
   private final int _maxPartitionsPerParticipant;
   private final int _bucketSize;
@@ -51,11 +54,19 @@ public class RebalancerConfig {
    * @param idealState the physical ideal state
    * @param resourceAssignment last mapping of a resource
    */
-  public RebalancerConfig(IdealState idealState, ResourceAssignment resourceAssignment) {
+  public RebalancerConfig(IdealState idealState, ResourceAssignment resourceAssignment,
+      int liveParticipantCount) {
     _rebalancerMode = idealState.getRebalanceMode();
     _rebalancerRef = idealState.getRebalancerRef();
     _stateModelDefId = idealState.getStateModelDefId();
-    _replicaCount = Integer.parseInt(idealState.getReplicas());
+    String replicaCount = idealState.getReplicas();
+    if (replicaCount.equals(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString())) {
+      _replicaCount = liveParticipantCount;
+      _anyLiveParticipant = true;
+    } else {
+      _replicaCount = Integer.parseInt(idealState.getReplicas());
+      _anyLiveParticipant = false;
+    }
     _participantGroupTag = idealState.getInstanceGroupTag();
     _maxPartitionsPerParticipant = idealState.getMaxPartitionsPerInstance();
     _bucketSize = idealState.getBucketSize();
@@ -68,10 +79,14 @@ public class RebalancerConfig {
     ImmutableMap.Builder<PartitionId, Map<ParticipantId, State>> preferenceMaps =
         new ImmutableMap.Builder<PartitionId, Map<ParticipantId, State>>();
     for (PartitionId partitionId : idealState.getPartitionSet()) {
-      preferenceLists.put(partitionId,
-          ImmutableList.copyOf(idealState.getPreferenceList(partitionId)));
-      preferenceMaps.put(partitionId,
-          ImmutableMap.copyOf(idealState.getParticipantStateMap(partitionId)));
+      List<ParticipantId> preferenceList = idealState.getPreferenceList(partitionId);
+      if (preferenceList != null) {
+        preferenceLists.put(partitionId, ImmutableList.copyOf(preferenceList));
+      }
+      Map<ParticipantId, State> preferenceMap = idealState.getParticipantStateMap(partitionId);
+      if (preferenceMap != null) {
+        preferenceMaps.put(partitionId, ImmutableMap.copyOf(preferenceMap));
+      }
     }
     _preferenceLists = preferenceLists.build();
     _preferenceMaps = preferenceMaps.build();
@@ -118,7 +133,10 @@ public class RebalancerConfig {
    * @return the ordered preference list (early entries are more preferred)
    */
   public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
-    return _preferenceLists.get(partitionId);
+    if (_preferenceLists.containsKey(partitionId)) {
+      return _preferenceLists.get(partitionId);
+    }
+    return Collections.emptyList();
   }
 
   /**
@@ -127,7 +145,10 @@ public class RebalancerConfig {
    * @return a mapping of participant to state for each replica
    */
   public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
-    return _preferenceMaps.get(partitionId);
+    if (_preferenceMaps.containsKey(partitionId)) {
+      return _preferenceMaps.get(partitionId);
+    }
+    return Collections.emptyMap();
   }
 
   /**
@@ -179,10 +200,19 @@ public class RebalancerConfig {
   }
 
   /**
+   * Check if replicas can be assigned to any live participant
+   * @return true if they can, false if they cannot
+   */
+  public boolean canAssignAnyLiveParticipant() {
+    return _anyLiveParticipant;
+  }
+
+  /**
    * Assembles a RebalancerConfig
    */
   public static class Builder {
     private final IdealState _idealState;
+    private boolean _anyLiveParticipant;
     private ResourceAssignment _resourceAssignment;
 
     /**
@@ -191,6 +221,7 @@ public class RebalancerConfig {
      */
     public Builder(ResourceId resourceId) {
       _idealState = new IdealState(resourceId);
+      _anyLiveParticipant = false;
     }
 
     /**
@@ -283,11 +314,26 @@ public class RebalancerConfig {
     }
 
     /**
+     * Set whether any live participant should be used in rebalancing
+     * @param useAnyParticipant true if any live participant can be used, false otherwise
+     * @return
+     */
+    public Builder anyLiveParticipant(boolean useAnyParticipant) {
+      _anyLiveParticipant = true;
+      return this;
+    }
+
+    /**
      * Assemble a RebalancerConfig
      * @return a fully defined rebalancer configuration
      */
     public RebalancerConfig build() {
-      return new RebalancerConfig(_idealState, _resourceAssignment);
+      if (_anyLiveParticipant) {
+        return new RebalancerConfig(_idealState, _resourceAssignment, Integer.parseInt(_idealState
+            .getReplicas()));
+      } else {
+        return new RebalancerConfig(_idealState, _resourceAssignment, -1);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
index 5f22898..5c7ce56 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
@@ -62,9 +62,12 @@ public class RebalancerRef {
   /**
    * Get a rebalancer class reference
    * @param rebalancerClassName name of the class
-   * @return RebalancerRef
+   * @return RebalancerRef or null if name is null
    */
   public static RebalancerRef from(String rebalancerClassName) {
+    if (rebalancerClassName == null) {
+      return null;
+    }
     return new RebalancerRef(rebalancerClassName);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 0c8b730..8445617 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -20,7 +20,6 @@ package org.apache.helix.api;
  */
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -29,41 +28,33 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.ResourceAssignment;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
 /**
  * Represent a resource entity in helix cluster
  */
 public class Resource {
-  private final ResourceId _id;
-  private final RebalancerConfig _rebalancerConfig;
-  private final SchedulerTaskConfig _schedulerTaskConfig;
-
-  private final Map<PartitionId, Partition> _partitionMap;
-
+  private final ResourceConfig _config;
   private final ExternalView _externalView;
 
   /**
    * Construct a resource
-   * @param idealState
+   * @param id resource id
+   * @param idealState ideal state of the resource
    * @param currentStateMap map of participant-id to current state
+   * @param liveParticipantCount number of live participants in the system
    */
-  public Resource(ResourceId id, IdealState idealState, ResourceAssignment resourceAssignment) {
-    _id = id;
-    _rebalancerConfig = new RebalancerConfig(idealState, resourceAssignment);
-
+  public Resource(ResourceId id, IdealState idealState, ResourceAssignment resourceAssignment,
+      ExternalView externalView, int liveParticipantCount) {
     Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
-    Map<PartitionId, Map<String, String>> schedulerTaskConfig =
+    Map<PartitionId, Map<String, String>> schedulerTaskConfigMap =
         new HashMap<PartitionId, Map<String, String>>();
     Map<String, Integer> transitionTimeoutMap = new HashMap<String, Integer>();
     for (PartitionId partitionId : idealState.getPartitionSet()) {
       partitionMap.put(partitionId, new Partition(partitionId));
 
       // TODO refactor it
-      Map<String, String> taskConfigMap = idealState.getRecord().getMapField(partitionId.stringify());
+      Map<String, String> taskConfigMap = idealState.getInstanceStateMap(partitionId.stringify());
       if (taskConfigMap != null) {
-        schedulerTaskConfig.put(partitionId, taskConfigMap);
+        schedulerTaskConfigMap.put(partitionId, taskConfigMap);
       }
 
       // TODO refactor it
@@ -78,53 +69,38 @@ public class Resource {
         }
       }
     }
-    _partitionMap = ImmutableMap.copyOf(partitionMap);
-    _schedulerTaskConfig = new SchedulerTaskConfig(transitionTimeoutMap, schedulerTaskConfig);
+    SchedulerTaskConfig schedulerTaskConfig =
+        new SchedulerTaskConfig(transitionTimeoutMap, schedulerTaskConfigMap);
+    RebalancerConfig rebalancerConfig =
+        new RebalancerConfig(idealState, resourceAssignment, liveParticipantCount);
 
-    _externalView = null;
-  }
-
-  /**
-   * Construct a Resource
-   * @param id resource identifier
-   * @param partitionSet disjoint partitions of the resource
-   * @param externalView external view of the resource
-   * @param pendingExternalView pending external view based on unprocessed messages
-   * @param rebalancerConfig configuration properties for rebalancing this resource
-   */
-  public Resource(ResourceId id, Map<PartitionId, Partition> partitionMap,
-      ExternalView externalView,
-      RebalancerConfig rebalancerConfig, SchedulerTaskConfig schedulerTaskConfig) {
-    _id = id;
-    _partitionMap = ImmutableMap.copyOf(partitionMap);
+    _config = new ResourceConfig(id, partitionMap, schedulerTaskConfig, rebalancerConfig);
     _externalView = externalView;
-    _rebalancerConfig = rebalancerConfig;
-    _schedulerTaskConfig = schedulerTaskConfig;
   }
 
   /**
-   * Get the set of partitions of the resource
-   * @return set of partitions or empty set if none
+   * Get the partitions of the resource
+   * @return map of partition id to partition or empty map if none
    */
   public Map<PartitionId, Partition> getPartitionMap() {
-    return _partitionMap;
+    return _config.getPartitionMap();
   }
 
   /**
-   * @param partitionId
-   * @return
+   * Get a partition that the resource contains
+   * @param partitionId the partition id to look up
+   * @return Partition or null if none is present with the given id
    */
   public Partition getPartition(PartitionId partitionId) {
-    return _partitionMap.get(partitionId);
+    return _config.getPartition(partitionId);
   }
 
   /**
-   * @return
+   * Get the set of partition ids that the resource contains
+   * @return partition id set, or empty if none
    */
-  public Set<Partition> getPartitionSet() {
-    Set<Partition> partitionSet = new HashSet<Partition>();
-    partitionSet.addAll(_partitionMap.values());
-    return ImmutableSet.copyOf(partitionSet);
+  public Set<PartitionId> getPartitionSet() {
+    return _config.getPartitionSet();
   }
 
   /**
@@ -135,95 +111,35 @@ public class Resource {
     return _externalView;
   }
 
+  /**
+   * Get the resource properties configuring rebalancing
+   * @return RebalancerConfig properties
+   */
   public RebalancerConfig getRebalancerConfig() {
-    return _rebalancerConfig;
+    return _config.getRebalancerConfig();
   }
 
+  /**
+   * Get the resource id
+   * @return ResourceId
+   */
   public ResourceId getId() {
-    return _id;
+    return _config.getId();
   }
 
+  /**
+   * Get the properties configuring scheduler tasks
+   * @return SchedulerTaskConfig properties
+   */
   public SchedulerTaskConfig getSchedulerTaskConfig() {
-    return _schedulerTaskConfig;
+    return _config.getSchedulerTaskConfig();
   }
 
   /**
-   * Assembles a Resource
+   * Get the configuration of this resource
+   * @return ResourceConfig that backs this Resource
    */
-  public static class Builder {
-    private final ResourceId _id;
-    private final Map<PartitionId, Partition> _partitionMap;
-    private ExternalView _externalView;
-    private RebalancerConfig _rebalancerConfig;
-    private SchedulerTaskConfig _schedulerTaskConfig;
-
-    /**
-     * Build a Resource with an id
-     * @param id resource id
-     */
-    public Builder(ResourceId id) {
-      _id = id;
-      _partitionMap = new HashMap<PartitionId, Partition>();
-    }
-
-    /**
-     * Add a partition that the resource serves
-     * @param partition fully-qualified partition
-     * @return Builder
-     */
-    public Builder addPartition(Partition partition) {
-      _partitionMap.put(partition.getId(), partition);
-      return this;
-    }
-
-    /**
-     * Add a set of partitions
-     * @param partitions
-     * @return Builder
-     */
-    public Builder addPartitions(Set<Partition> partitions) {
-      for (Partition partition : partitions) {
-        addPartition(partition);
-      }
-      return this;
-    }
-
-    /**
-     * Set the external view of this resource
-     * @param extView currently served replica placement and state
-     * @return Builder
-     */
-    public Builder externalView(ExternalView extView) {
-      _externalView = extView;
-      return this;
-    }
-
-    /**
-     * Set the rebalancer configuration
-     * @param rebalancerConfig properties of interest for rebalancing
-     * @return Builder
-     */
-    public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
-      _rebalancerConfig = rebalancerConfig;
-      return this;
-    }
-
-    /**
-     * @param schedulerTaskConfig
-     * @return
-     */
-    public Builder schedulerTaskConfig(SchedulerTaskConfig schedulerTaskConfig) {
-      _schedulerTaskConfig = schedulerTaskConfig;
-      return this;
-    }
-
-    /**
-     * Create a Resource object
-     * @return instantiated Resource
-     */
-    public Resource build() {
-      return new Resource(_id, _partitionMap, _externalView, _rebalancerConfig,
-          _schedulerTaskConfig);
-    }
+  public ResourceConfig getConfig() {
+    return _config;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
new file mode 100644
index 0000000..5170c40
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
@@ -0,0 +1,173 @@
+package org.apache.helix.api;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Full configuration of a Helix resource. Typically used to add or modify resources on a cluster
+ */
+public class ResourceConfig {
+  private final ResourceId _id;
+  private final Map<PartitionId, Partition> _partitionMap;
+  private final RebalancerConfig _rebalancerConfig;
+  private final SchedulerTaskConfig _schedulerTaskConfig;
+
+  /**
+   * Instantiate a configuration. Consider using ResourceConfig.Builder
+   * @param id resource id
+   * @param partitionMap map of partition identifiers to partition objects
+   * @param schedulerTaskConfig configuration for scheduler tasks associated with the resource
+   * @param rebalancerConfig configuration for rebalancing the resource
+   */
+  public ResourceConfig(ResourceId id, Map<PartitionId, Partition> partitionMap,
+      SchedulerTaskConfig schedulerTaskConfig, RebalancerConfig rebalancerConfig) {
+    _id = id;
+    _partitionMap = ImmutableMap.copyOf(partitionMap);
+    _schedulerTaskConfig = schedulerTaskConfig;
+    _rebalancerConfig = rebalancerConfig;
+  }
+
+  /**
+   * Get the partitions of the resource
+   * @return map of partition id to partition or empty map if none
+   */
+  public Map<PartitionId, Partition> getPartitionMap() {
+    return _partitionMap;
+  }
+
+  /**
+   * Get a partition that the resource contains
+   * @param partitionId the partition id to look up
+   * @return Partition or null if none is present with the given id
+   */
+  public Partition getPartition(PartitionId partitionId) {
+    return _partitionMap.get(partitionId);
+  }
+
+  /**
+   * Get the set of partition ids that the resource contains
+   * @return partition id set, or empty if none
+   */
+  public Set<PartitionId> getPartitionSet() {
+    Set<PartitionId> partitionSet = new HashSet<PartitionId>();
+    partitionSet.addAll(_partitionMap.keySet());
+    return ImmutableSet.copyOf(partitionSet);
+  }
+
+  /**
+   * Get the resource properties configuring rebalancing
+   * @return RebalancerConfig properties
+   */
+  public RebalancerConfig getRebalancerConfig() {
+    return _rebalancerConfig;
+  }
+
+  /**
+   * Get the resource id
+   * @return ResourceId
+   */
+  public ResourceId getId() {
+    return _id;
+  }
+
+  /**
+   * Get the properties configuring scheduler tasks
+   * @return SchedulerTaskConfig properties
+   */
+  public SchedulerTaskConfig getSchedulerTaskConfig() {
+    return _schedulerTaskConfig;
+  }
+
+  /**
+   * Assembles a ResourceConfig
+   */
+  public static class Builder {
+    private final ResourceId _id;
+    private final Map<PartitionId, Partition> _partitionMap;
+    private RebalancerConfig _rebalancerConfig;
+    private SchedulerTaskConfig _schedulerTaskConfig;
+
+    /**
+     * Build a Resource with an id
+     * @param id resource id
+     */
+    public Builder(ResourceId id) {
+      _id = id;
+      _partitionMap = new HashMap<PartitionId, Partition>();
+    }
+
+    /**
+     * Add a partition that the resource serves
+     * @param partition fully-qualified partition
+     * @return Builder
+     */
+    public Builder addPartition(Partition partition) {
+      _partitionMap.put(partition.getId(), partition);
+      return this;
+    }
+
+    /**
+     * Add a collection of partitions
+     * @param partitions
+     * @return Builder
+     */
+    public Builder addPartitions(Collection<Partition> partitions) {
+      for (Partition partition : partitions) {
+        addPartition(partition);
+      }
+      return this;
+    }
+
+    /**
+     * Set the rebalancer configuration
+     * @param rebalancerConfig properties of interest for rebalancing
+     * @return Builder
+     */
+    public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
+      _rebalancerConfig = rebalancerConfig;
+      return this;
+    }
+
+    /**
+     * @param schedulerTaskConfig
+     * @return
+     */
+    public Builder schedulerTaskConfig(SchedulerTaskConfig schedulerTaskConfig) {
+      _schedulerTaskConfig = schedulerTaskConfig;
+      return this;
+    }
+
+    /**
+     * Create a Resource object
+     * @return instantiated Resource
+     */
+    public ResourceConfig build() {
+      return new ResourceConfig(_id, _partitionMap, _schedulerTaskConfig, _rebalancerConfig);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 6570dc4..4df2201 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -44,14 +44,7 @@ import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.PipelineRegistry;
-import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.CompatibilityCheckStage;
-import org.apache.helix.controller.stages.CurrentStateComputationStage;
-import org.apache.helix.controller.stages.ExternalViewComputeStage;
-import org.apache.helix.controller.stages.MessageGenerationPhase;
-import org.apache.helix.controller.stages.MessageSelectionStage;
-import org.apache.helix.controller.stages.MessageThrottleStage;
 import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.NewCompatibilityCheckStage;
 import org.apache.helix.controller.stages.NewCurrentStateComputationStage;
@@ -62,10 +55,6 @@ import org.apache.helix.controller.stages.NewMessageThrottleStage;
 import org.apache.helix.controller.stages.NewReadClusterDataStage;
 import org.apache.helix.controller.stages.NewResourceComputationStage;
 import org.apache.helix.controller.stages.NewTaskAssignmentStage;
-import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.controller.stages.RebalanceIdealStateStage;
-import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.controller.stages.TaskAssignmentStage;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HealthStat;
@@ -205,8 +194,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       registry.register("idealStateChange", dataRefresh, rebalancePipeline);
       registry.register("currentStateChange", dataRefresh, rebalancePipeline, externalViewPipeline);
       registry.register("configChange", dataRefresh, rebalancePipeline);
-      registry.register("liveInstanceChange", dataRefresh, rebalancePipeline,
-          externalViewPipeline);
+      registry.register("liveInstanceChange", dataRefresh, rebalancePipeline, externalViewPipeline);
 
       registry.register("messageChange", dataRefresh, rebalancePipeline);
       registry.register("externalView", dataRefresh);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
index 8821082..a28166c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
@@ -29,9 +29,9 @@ import java.util.Set;
 
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Id;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
@@ -46,6 +46,7 @@ import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Function;
 import com.google.common.base.Functions;
 import com.google.common.collect.Lists;
 
@@ -69,12 +70,17 @@ public class NewAutoRebalancer implements NewRebalancer {
   public ResourceAssignment computeResourceMapping(Resource resource, Cluster cluster,
       StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
     // Compute a preference list based on the current ideal state
-    List<Partition> partitions = new ArrayList<Partition>(resource.getPartitionSet());
+    List<PartitionId> partitions = new ArrayList<PartitionId>(resource.getPartitionSet());
     List<String> partitionNames = Lists.transform(partitions, Functions.toStringFunction());
     RebalancerConfig config = resource.getRebalancerConfig();
     Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
     Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
-    int replicas = config.getReplicaCount();
+    int replicas = -1;
+    if (config.canAssignAnyLiveParticipant()) {
+      replicas = liveParticipants.size();
+    } else {
+      replicas = config.getReplicaCount();
+    }
 
     LinkedHashMap<String, Integer> stateCountMap =
         ConstraintBasedAssignment.stateCount(stateModelDef, liveParticipants.size(), replicas);
@@ -129,17 +135,25 @@ public class NewAutoRebalancer implements NewRebalancer {
       LOG.debug("Processing resource:" + resource.getId());
     }
     ResourceAssignment partitionMapping = new ResourceAssignment(resource.getId());
-    for (Partition partition : partitions) {
+    for (PartitionId partition : partitions) {
       Set<ParticipantId> disabledParticipantsForPartition =
-          NewConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition.getId());
+          NewConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition);
       List<ParticipantId> preferenceList =
-          NewConstraintBasedAssignment.getPreferenceList(cluster, partition.getId(), config);
+          Lists.transform(newMapping.getListField(partition.stringify()),
+              new Function<String, ParticipantId>() {
+                @Override
+                public ParticipantId apply(String participantName) {
+                  return Id.participant(participantName);
+                }
+              });
+      preferenceList =
+          NewConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
       Map<ParticipantId, State> bestStateForPartition =
           NewConstraintBasedAssignment.computeAutoBestStateForPartition(liveParticipants,
               stateModelDef, preferenceList,
-              currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId()),
+              currentStateOutput.getCurrentStateMap(resource.getId(), partition),
               disabledParticipantsForPartition);
-      partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
   }
@@ -149,23 +163,23 @@ public class NewAutoRebalancer implements NewRebalancer {
     Map<PartitionId, Map<ParticipantId, State>> map =
         new HashMap<PartitionId, Map<ParticipantId, State>>();
 
-    for (Partition partition : resource.getPartitionSet()) {
+    for (PartitionId partition : resource.getPartitionSet()) {
       Map<ParticipantId, State> curStateMap =
-          currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId());
-      map.put(partition.getId(), new HashMap<ParticipantId, State>());
+          currentStateOutput.getCurrentStateMap(resource.getId(), partition);
+      map.put(partition, new HashMap<ParticipantId, State>());
       for (ParticipantId node : curStateMap.keySet()) {
         State state = curStateMap.get(node);
         if (stateCountMap.containsKey(state)) {
-          map.get(partition.getId()).put(node, state);
+          map.get(partition).put(node, state);
         }
       }
 
       Map<ParticipantId, State> pendingStateMap =
-          currentStateOutput.getPendingStateMap(resource.getId(), partition.getId());
+          currentStateOutput.getPendingStateMap(resource.getId(), partition);
       for (ParticipantId node : pendingStateMap.keySet()) {
         State state = pendingStateMap.get(node);
         if (stateCountMap.containsKey(state)) {
-          map.get(partition.getId()).put(node, state);
+          map.get(partition).put(node, state);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
index 8d000f5..0dd346b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
@@ -27,7 +27,7 @@ import org.apache.helix.HelixDefinedState;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.State;
@@ -58,17 +58,16 @@ public class NewCustomRebalancer implements NewRebalancer {
     }
     ResourceAssignment partitionMapping = new ResourceAssignment(resource.getId());
     RebalancerConfig config = resource.getRebalancerConfig();
-    for (Partition partition : resource.getPartitionSet()) {
+    for (PartitionId partition : resource.getPartitionSet()) {
       Map<ParticipantId, State> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId());
+          currentStateOutput.getCurrentStateMap(resource.getId(), partition);
       Set<ParticipantId> disabledInstancesForPartition =
           NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
-              partition.getId());
+              partition);
       Map<ParticipantId, State> bestStateForPartition =
           computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap(), stateModelDef,
-              config.getPreferenceMap(partition.getId()), currentStateMap,
-              disabledInstancesForPartition);
-      partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
+              config.getPreferenceMap(partition), currentStateMap, disabledInstancesForPartition);
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
index 27bb513..0b1611c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
@@ -25,7 +25,7 @@ import java.util.Set;
 
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.State;
@@ -56,19 +56,20 @@ public class NewSemiAutoRebalancer implements NewRebalancer {
     }
     ResourceAssignment partitionMapping = new ResourceAssignment(resource.getId());
     RebalancerConfig config = resource.getRebalancerConfig();
-    for (Partition partition : resource.getPartitionSet()) {
+    for (PartitionId partition : resource.getPartitionSet()) {
       Map<ParticipantId, State> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId());
+          currentStateOutput.getCurrentStateMap(resource.getId(), partition);
       Set<ParticipantId> disabledInstancesForPartition =
           NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
-              partition.getId());
+              partition);
       List<ParticipantId> preferenceList =
-          NewConstraintBasedAssignment.getPreferenceList(cluster, partition.getId(), config);
+          NewConstraintBasedAssignment.getPreferenceList(cluster, partition,
+              config.getPreferenceList(partition));
       Map<ParticipantId, State> bestStateForPartition =
           NewConstraintBasedAssignment.computeAutoBestStateForPartition(
               cluster.getLiveParticipantMap(), stateModelDef, preferenceList, currentStateMap,
               disabledInstancesForPartition);
-      partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
index 224853b..07856ca 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
@@ -34,7 +34,6 @@ import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.State;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -76,9 +75,7 @@ public class NewConstraintBasedAssignment {
    * @return list with most preferred participants first
    */
   public static List<ParticipantId> getPreferenceList(Cluster cluster, PartitionId partitionId,
-      RebalancerConfig config) {
-    List<ParticipantId> prefList = config.getPreferenceList(partitionId);
-
+      List<ParticipantId> prefList) {
     if (prefList != null && prefList.size() == 1
         && StateModelToken.ANY_LIVEINSTANCE.toString().equals(prefList.get(0).stringify())) {
       prefList = new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
index bc14297..52a5af2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -23,11 +23,11 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Id;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -58,7 +58,8 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
 
     NewCurrentStateOutput currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
     Cluster cluster = event.getAttribute("ClusterDataCache");
 
     if (currentStateOutput == null || resourceMap == null || cluster == null) {
@@ -106,7 +107,7 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
 
   // TODO check this
   private NewBestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
-      Map<ResourceId, Resource> resourceMap, NewCurrentStateOutput currentStateOutput) {
+      Map<ResourceId, ResourceConfig> resourceMap, NewCurrentStateOutput currentStateOutput) {
     NewBestPossibleStateOutput output = new NewBestPossibleStateOutput();
     Map<StateModelDefId, StateModelDefinition> stateModelDefs =
         event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
index d5ee850..8c5005d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
@@ -2,6 +2,7 @@ package org.apache.helix.controller.stages;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.model.ResourceAssignment;
@@ -31,4 +32,12 @@ public class NewBestPossibleStateOutput {
   public ResourceAssignment getResourceAssignment(ResourceId resourceId) {
     return _resourceAssignmentMap.get(resourceId);
   }
+
+  /**
+   * Get all of the resources currently assigned
+   * @return set of assigned resource ids
+   */
+  public Set<ResourceId> getAssignedResources() {
+    return _resourceAssignmentMap.keySet();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
index 0f8c33e..e0cd22f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
@@ -28,7 +28,7 @@ import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.Partition;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.SessionId;
 import org.apache.helix.api.State;
@@ -48,7 +48,8 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
     Cluster cluster = event.getAttribute("ClusterDataCache");
-    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
 
     if (cluster == null || resourceMap == null) {
       throw new StageException("Missing attributes in event:" + event
@@ -72,7 +73,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
         }
 
         ResourceId resourceId = message.getResourceId();
-        Resource resource = resourceMap.get(resourceId);
+        ResourceConfig resource = resourceMap.get(resourceId);
         if (resource == null) {
           continue;
         }
@@ -92,8 +93,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
             for (PartitionId partitionId : partitionNames) {
               Partition partition = resource.getPartition(partitionId);
               if (partition != null) {
-                currentStateOutput.setPendingState(resourceId, partitionId,
-                    participantId,
+                currentStateOutput.setPendingState(resourceId, partitionId, participantId,
                     message.getToState());
               } else {
                 // log
@@ -113,7 +113,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
 
         ResourceId resourceId = curState.getResourceId();
         StateModelDefId stateModelDefId = curState.getStateModelDefId();
-        Resource resource = resourceMap.get(resourceId);
+        ResourceConfig resource = resourceMap.get(resourceId);
         if (resource == null) {
           continue;
         }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
index 1a99346..68169ae 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
@@ -29,7 +29,6 @@ import java.util.TreeMap;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
-import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
@@ -40,7 +39,7 @@ import org.apache.helix.api.Id;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
-import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.State;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -50,7 +49,6 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
 import org.apache.helix.model.StatusUpdate;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
@@ -64,7 +62,8 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
     log.info("START ExternalViewComputeStage.process()");
 
     HelixManager manager = event.getAttribute("helixmanager");
-    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
     Cluster cluster = event.getAttribute("ClusterDataCache");
 
     if (manager == null || resourceMap == null || cluster == null) {
@@ -89,7 +88,7 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
       // view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
       // if resource ideal state has bucket size, set it
       // otherwise resource has been dropped, use bucket size from current state instead
-      Resource resource = resourceMap.get(resourceId);
+      ResourceConfig resource = resourceMap.get(resourceId);
       RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
       if (rebalancerConfig.getBucketSize() > 0) {
         view.setBucketSize(rebalancerConfig.getBucketSize());
@@ -139,8 +138,7 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
         // message, and then remove the partitions from the ideal state
         if (rebalancerConfig != null
             && rebalancerConfig.getStateModelDefId().stringify()
-                .equalsIgnoreCase(
-                DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+                .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
           // TODO fix it
           // updateScheduledTaskStatus(view, manager, idealState);
         }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
index 0a72dc0..0ae72cf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
@@ -30,10 +30,10 @@ import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Id;
 import org.apache.helix.api.MessageId;
 import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.SessionId;
 import org.apache.helix.api.State;
@@ -42,12 +42,10 @@ import org.apache.helix.api.StateModelFactoryId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
@@ -63,7 +61,8 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
     Cluster cluster = event.getAttribute("ClusterDataCache");
     Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
         event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
-    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
     NewCurrentStateOutput currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
     NewBestPossibleStateOutput bestPossibleStateOutput =
@@ -77,7 +76,7 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
     NewMessageOutput output = new NewMessageOutput();
 
     for (ResourceId resourceId : resourceMap.keySet()) {
-      Resource resource = resourceMap.get(resourceId);
+      ResourceConfig resource = resourceMap.get(resourceId);
       int bucketSize = resource.getRebalancerConfig().getBucketSize();
 
       StateModelDefinition stateModelDef =
@@ -145,10 +144,13 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
             if (rebalancerConfig != null
                 && rebalancerConfig.getStateModelDefId().stringify()
                     .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-              if (resource.getPartitionSet().size() > 0) {
-                // TODO refactor it
-                message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
-                    resource.getSchedulerTaskConfig().getTaskConfig(partitionId));
+              if (resource.getPartitionMap().size() > 0) {
+                // TODO refactor it -- we need a way to read in scheduler tasks a priori
+                Resource activeResource = cluster.getResource(resourceId);
+                if (activeResource != null) {
+                  message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
+                      activeResource.getSchedulerTaskConfig().getTaskConfig(partitionId));
+                }
               }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
index 9745c64..ed06c5f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
@@ -28,10 +28,12 @@ import java.util.TreeMap;
 
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantConfig;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.State;
 import org.apache.helix.api.StateModelDefId;
@@ -91,11 +93,11 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
     Cluster cluster = event.getAttribute("ClusterDataCache");
     Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
         event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
-    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
     NewCurrentStateOutput currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    NewMessageOutput messageGenOutput =
-        event.getAttribute(AttributeName.MESSAGES_ALL.toString());
+    NewMessageOutput messageGenOutput = event.getAttribute(AttributeName.MESSAGES_ALL.toString());
     if (cluster == null || resourceMap == null || currentStateOutput == null
         || messageGenOutput == null) {
       throw new StageException("Missing attributes in event:" + event
@@ -105,7 +107,7 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
     NewMessageOutput output = new NewMessageOutput();
 
     for (ResourceId resourceId : resourceMap.keySet()) {
-      Resource resource = resourceMap.get(resourceId);
+      ResourceConfig resource = resourceMap.get(resourceId);
       StateModelDefinition stateModelDef =
           stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
 
@@ -119,8 +121,8 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
         List<Message> messages = messageGenOutput.getMessages(resourceId, partitionId);
         List<Message> selectedMessages =
             selectMessages(cluster.getLiveParticipantMap(),
-            currentStateOutput.getCurrentStateMap(resourceId, partitionId),
-            currentStateOutput.getPendingStateMap(resourceId, partitionId), messages,
+                currentStateOutput.getCurrentStateMap(resourceId, partitionId),
+                currentStateOutput.getPendingStateMap(resourceId, partitionId), messages,
                 stateConstraints, stateTransitionPriorities, stateModelDef.getInitialState());
         output.setMessages(resourceId, partitionId, selectedMessages);
       }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
index 5bea5b4..cfbd45c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
@@ -30,17 +30,16 @@ import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.ClusterConstraints;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
 import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
-import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ClusterConstraints.ConstraintValue;
+import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.Message;
 import org.apache.log4j.Logger;
 
 public class NewMessageThrottleStage extends AbstractBaseStage {
@@ -120,7 +119,8 @@ public class NewMessageThrottleStage extends AbstractBaseStage {
     Cluster cluster = event.getAttribute("ClusterDataCache");
     NewMessageOutput msgSelectionOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
-    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
 
     if (cluster == null || resourceMap == null || msgSelectionOutput == null) {
       throw new StageException("Missing attributes in event: " + event
@@ -145,7 +145,7 @@ public class NewMessageThrottleStage extends AbstractBaseStage {
     // go through all new messages, throttle if necessary
     // assume messages should be sorted by state transition priority in messageSelection stage
     for (ResourceId resourceId : resourceMap.keySet()) {
-      Resource resource = resourceMap.get(resourceId);
+      ResourceConfig resource = resourceMap.get(resourceId);
       // TODO fix it
       for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
         List<Message> messages = msgSelectionOutput.getMessages(resourceId, partitionId);


[2/3] [HELIX-109] adding config classes

Posted by ki...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
index af23eb2..a6d9db4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -19,8 +19,10 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
@@ -28,6 +30,7 @@ import org.apache.helix.api.Partition;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.StateModelFactoryId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -51,16 +54,17 @@ public class NewResourceComputationStage extends AbstractBaseStage {
       throw new StageException("Missing attributes in event:" + event + ". Requires Cluster");
     }
 
-    Map<ResourceId, Resource.Builder> resourceBuilderMap =
-        new LinkedHashMap<ResourceId, Resource.Builder>();
+    Map<ResourceId, ResourceConfig.Builder> resourceBuilderMap =
+        new LinkedHashMap<ResourceId, ResourceConfig.Builder>();
     // include all resources in ideal-state
     for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
       Resource resource = cluster.getResource(resourceId);
       RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
 
-      Resource.Builder resourceBuilder = new Resource.Builder(resourceId);
+      ResourceConfig.Builder resourceBuilder = new ResourceConfig.Builder(resourceId);
       resourceBuilder.rebalancerConfig(rebalancerConfig);
-      resourceBuilder.addPartitions(resource.getPartitionSet());
+      Set<Partition> partitionSet = new HashSet<Partition>(resource.getPartitionMap().values());
+      resourceBuilder.addPartitions(partitionSet);
       resourceBuilderMap.put(resourceId, resourceBuilder);
     }
 
@@ -87,7 +91,7 @@ public class NewResourceComputationStage extends AbstractBaseStage {
           rebalancerConfigBuilder.bucketSize(currentState.getBucketSize());
           rebalancerConfigBuilder.batchMessageMode(currentState.getBatchMessageMode());
 
-          Resource.Builder resourceBuilder = new Resource.Builder(resourceId);
+          ResourceConfig.Builder resourceBuilder = new ResourceConfig.Builder(resourceId);
           resourceBuilder.rebalancerConfig(rebalancerConfigBuilder.build());
           resourceBuilderMap.put(resourceId, resourceBuilder);
         }
@@ -99,7 +103,7 @@ public class NewResourceComputationStage extends AbstractBaseStage {
     }
 
     // convert builder-map to resource-map
-    Map<ResourceId, Resource> resourceMap = new LinkedHashMap<ResourceId, Resource>();
+    Map<ResourceId, ResourceConfig> resourceMap = new LinkedHashMap<ResourceId, ResourceConfig>();
     for (ResourceId resourceId : resourceBuilderMap.keySet()) {
       resourceMap.put(resourceId, resourceBuilderMap.get(resourceId).build());
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
index 2b8a0c8..f5bb47f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
@@ -35,13 +35,11 @@ import org.apache.helix.api.Id;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
 import org.apache.log4j.Logger;
 
 public class NewTaskAssignmentStage extends AbstractBaseStage {
@@ -53,9 +51,9 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
     logger.info("START TaskAssignmentStage.process()");
 
     HelixManager manager = event.getAttribute("helixmanager");
-    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-    NewMessageOutput messageOutput =
-        event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
+    NewMessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
     Cluster cluster = event.getAttribute("ClusterDataCache");
     Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
 
@@ -68,7 +66,7 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
     List<Message> messagesToSend = new ArrayList<Message>();
     for (ResourceId resourceId : resourceMap.keySet()) {
-      Resource resource = resourceMap.get(resourceId);
+      ResourceConfig resource = resourceMap.get(resourceId);
       for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
         List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
         messagesToSend.addAll(messages);
@@ -86,8 +84,8 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
   }
 
   List<Message> batchMessage(Builder keyBuilder, List<Message> messages,
-      Map<ResourceId, Resource> resourceMap, Map<ParticipantId, Participant> liveParticipantMap,
-      HelixManagerProperties properties) {
+      Map<ResourceId, ResourceConfig> resourceMap,
+      Map<ParticipantId, Participant> liveParticipantMap, HelixManagerProperties properties) {
     // group messages by its CurrentState path + "/" + fromState + "/" + toState
     Map<String, Message> batchMessages = new HashMap<String, Message>();
     List<Message> outputMessages = new ArrayList<Message>();
@@ -96,7 +94,7 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
     while (iter.hasNext()) {
       Message message = iter.next();
       ResourceId resourceId = message.getResourceId();
-      Resource resource = resourceMap.get(resourceId);
+      ResourceConfig resource = resourceMap.get(resourceId);
 
       ParticipantId participantId = Id.participant(message.getTgtName());
       Participant liveParticipant = liveParticipantMap.get(participantId);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
index 3b46c13..ef47a12 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
@@ -76,6 +76,14 @@ public class ClusterConstraints extends HelixProperty {
   }
 
   /**
+   * Get the type of constraint this object represents
+   * @return constraint type
+   */
+  public ConstraintType getType() {
+    return ConstraintType.valueOf(getId());
+  }
+
+  /**
    * Instantiate constraints from a pre-populated ZNRecord
    * @param record ZNRecord containing all constraints
    */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 24ec7c9..16b3fa1 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -339,10 +339,13 @@ public class IdealState extends HelixProperty {
     Map<String, String> instanceStateMap = getInstanceStateMap(partitionId.stringify());
     ImmutableMap.Builder<ParticipantId, State> builder =
         new ImmutableMap.Builder<ParticipantId, State>();
-    for (String participantId : instanceStateMap.keySet()) {
-      builder.put(Id.participant(participantId), State.from(instanceStateMap.get(participantId)));
+    if (instanceStateMap != null) {
+      for (String participantId : instanceStateMap.keySet()) {
+        builder.put(Id.participant(participantId), State.from(instanceStateMap.get(participantId)));
+      }
+      return builder.build();
     }
-    return builder.build();
+    return null;
   }
 
   /**
@@ -433,10 +436,13 @@ public class IdealState extends HelixProperty {
   public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
     ImmutableList.Builder<ParticipantId> builder = new ImmutableList.Builder<ParticipantId>();
     List<String> preferenceStringList = getPreferenceList(partitionId.stringify());
-    for (String participantName : preferenceStringList) {
-      builder.add(Id.participant(participantName));
+    if (preferenceStringList != null) {
+      for (String participantName : preferenceStringList) {
+        builder.add(Id.participant(participantName));
+      }
+      return builder.build();
     }
-    return builder.build();
+    return null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
index 8577578..2b06c2b 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -84,6 +84,14 @@ public class ResourceAssignment extends HelixProperty {
   }
 
   /**
+   * Get the entire map of a resource
+   * @return map of partition to participant to state
+   */
+  public Map<PartitionId, Map<ParticipantId, State>> getResourceMap() {
+    return replicaMapsFromStringMaps(_record.getMapFields());
+  }
+
+  /**
    * Get the participant, state pairs for a partition
    * @param partition the Partition to look up
    * @return map of (participant id, state)

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index 7ceee85..b371c6a 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -45,6 +45,12 @@ import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.ClusterAccessor;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.controller.stages.AttributeName;
@@ -53,6 +59,10 @@ import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.NewBestPossibleStateOutput;
+import org.apache.helix.controller.stages.NewCurrentStateComputationStage;
+import org.apache.helix.controller.stages.NewResourceComputationStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -60,6 +70,8 @@ import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.store.PropertyJsonComparator;
@@ -156,7 +168,7 @@ public class ClusterStateVerifier {
         HelixDataAccessor accessor =
             new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
 
-        return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates);
+        return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates, clusterName);
       } catch (Exception e) {
         LOG.error("exception in verification", e);
       }
@@ -222,10 +234,11 @@ public class ClusterStateVerifier {
   }
 
   static boolean verifyBestPossAndExtView(HelixDataAccessor accessor,
-      Map<String, Map<String, String>> errStates) {
+      Map<String, Map<String, String>> errStates, String clusterName) {
     try {
       Builder keyBuilder = accessor.keyBuilder();
       // read cluster once and do verification
+      // TODO: stop using ClusterDataCache
       ClusterDataCache cache = new ClusterDataCache();
       cache.refresh(accessor);
 
@@ -250,10 +263,31 @@ public class ClusterStateVerifier {
         }
       }
 
+      Map<String, StateModelDefinition> stateModelDefs =
+          accessor.getChildValuesMap(keyBuilder.stateModelDefs());
+      Map<StateModelDefId, StateModelDefinition> convertedDefs =
+          new HashMap<StateModelDefId, StateModelDefinition>();
+      for (String defName : stateModelDefs.keySet()) {
+        convertedDefs.put(Id.stateModelDef(defName), stateModelDefs.get(defName));
+      }
+      ClusterAccessor clusterAccessor = new ClusterAccessor(Id.cluster(clusterName), accessor);
+      Cluster cluster = clusterAccessor.readCluster();
       // calculate best possible state
-      BestPossibleStateOutput bestPossOutput = ClusterStateVerifier.calcBestPossState(cache);
-      Map<String, Map<Partition, Map<String, String>>> bestPossStateMap =
-          bestPossOutput.getStateMap();
+      NewBestPossibleStateOutput bestPossOutput =
+          ClusterStateVerifier.calcBestPossState(cluster, convertedDefs);
+      Map<String, Map<String, Map<String, String>>> bestPossStateMap =
+          new HashMap<String, Map<String, Map<String, String>>>();
+      for (ResourceId resourceId : bestPossOutput.getAssignedResources()) {
+        ResourceAssignment resourceAssignment = bestPossOutput.getResourceAssignment(resourceId);
+        Map<String, Map<String, String>> resourceMap = new HashMap<String, Map<String, String>>();
+        for (PartitionId partitionId : resourceAssignment.getMappedPartitions()) {
+          Map<String, String> replicaMap =
+              ResourceAssignment.stringMapFromReplicaMap(resourceAssignment
+                  .getReplicaMap(partitionId));
+          resourceMap.put(partitionId.stringify(), replicaMap);
+        }
+        bestPossStateMap.put(resourceId.stringify(), resourceMap);
+      }
 
       // set error states
       if (errStates != null) {
@@ -263,13 +297,12 @@ public class ClusterStateVerifier {
             String instanceName = partErrStates.get(partitionName);
 
             if (!bestPossStateMap.containsKey(resourceName)) {
-              bestPossStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+              bestPossStateMap.put(resourceName, new HashMap<String, Map<String, String>>());
             }
-            Partition partition = new Partition(partitionName);
-            if (!bestPossStateMap.get(resourceName).containsKey(partition)) {
-              bestPossStateMap.get(resourceName).put(partition, new HashMap<String, String>());
+            if (!bestPossStateMap.get(resourceName).containsKey(partitionName)) {
+              bestPossStateMap.get(resourceName).put(partitionName, new HashMap<String, String>());
             }
-            bestPossStateMap.get(resourceName).get(partition)
+            bestPossStateMap.get(resourceName).get(partitionName)
                 .put(instanceName, HelixDefinedState.ERROR.toString());
           }
         }
@@ -285,11 +318,12 @@ public class ClusterStateVerifier {
         }
 
         // step 0: remove empty map and DROPPED state from best possible state
-        Map<Partition, Map<String, String>> bpStateMap =
-            bestPossOutput.getResourceMap(resourceName);
-        Iterator<Entry<Partition, Map<String, String>>> iter = bpStateMap.entrySet().iterator();
+        Map<String, Map<String, String>> bpStateMap =
+            ResourceAssignment.stringMapsFromReplicaMaps(bestPossOutput.getResourceAssignment(
+                Id.resource(resourceName)).getResourceMap());
+        Iterator<Entry<String, Map<String, String>>> iter = bpStateMap.entrySet().iterator();
         while (iter.hasNext()) {
-          Map.Entry<Partition, Map<String, String>> entry = iter.next();
+          Map.Entry<String, Map<String, String>> entry = iter.next();
           Map<String, String> instanceStateMap = entry.getValue();
           if (instanceStateMap.isEmpty()) {
             iter.remove();
@@ -310,7 +344,9 @@ public class ClusterStateVerifier {
 
         // step 1: externalView and bestPossibleState has equal size
         int extViewSize = extView.getRecord().getMapFields().size();
-        int bestPossStateSize = bestPossOutput.getResourceMap(resourceName).size();
+        int bestPossStateSize =
+            bestPossOutput.getResourceAssignment(Id.resource(resourceName)).getMappedPartitions()
+                .size();
         if (extViewSize != bestPossStateSize) {
           LOG.info("exterView size (" + extViewSize + ") is different from bestPossState size ("
               + bestPossStateSize + ") for resource: " + resourceName);
@@ -328,7 +364,8 @@ public class ClusterStateVerifier {
         for (String partition : extView.getRecord().getMapFields().keySet()) {
           Map<String, String> evInstanceStateMap = extView.getRecord().getMapField(partition);
           Map<String, String> bpInstanceStateMap =
-              bestPossOutput.getInstanceStateMap(resourceName, new Partition(partition));
+              ResourceAssignment.stringMapFromReplicaMap(bestPossOutput.getResourceAssignment(
+                  Id.resource(resourceName)).getReplicaMap(Id.partition(partition)));
 
           boolean result =
               ClusterStateVerifier.<String, String> compareMap(evInstanceStateMap,
@@ -404,24 +441,27 @@ public class ClusterStateVerifier {
   /**
    * calculate the best possible state note that DROPPED states are not checked since when
    * kick off the BestPossibleStateCalcStage we are providing an empty current state map
+   * @param convertedDefs
    * @param cache
    * @return
    * @throws Exception
    */
 
-  static BestPossibleStateOutput calcBestPossState(ClusterDataCache cache) throws Exception {
+  static NewBestPossibleStateOutput calcBestPossState(Cluster cluster,
+      Map<StateModelDefId, StateModelDefinition> convertedDefs) throws Exception {
     ClusterEvent event = new ClusterEvent("sampleEvent");
-    event.addAttribute("ClusterDataCache", cache);
+    event.addAttribute("ClusterDataCache", cluster);
+    event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), convertedDefs);
 
-    ResourceComputationStage rcState = new ResourceComputationStage();
-    CurrentStateComputationStage csStage = new CurrentStateComputationStage();
-    BestPossibleStateCalcStage bpStage = new BestPossibleStateCalcStage();
+    NewResourceComputationStage rcState = new NewResourceComputationStage();
+    NewCurrentStateComputationStage csStage = new NewCurrentStateComputationStage();
+    NewBestPossibleStateCalcStage bpStage = new NewBestPossibleStateCalcStage();
 
     runStage(event, rcState);
     runStage(event, csStage);
     runStage(event, bpStage);
 
-    BestPossibleStateOutput output =
+    NewBestPossibleStateOutput output =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
 
     // System.out.println("output:" + output);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index cc26596..ce2781f 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -52,6 +52,9 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+
 public class TestNewStages extends ZkUnitTestBase {
   final int n = 2;
   final int p = 8;
@@ -115,7 +118,14 @@ public class TestNewStages extends ZkUnitTestBase {
     Cluster cluster = clusterAccessor.readCluster();
     ClusterEvent event = new ClusterEvent(testName);
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), new NewCurrentStateOutput());
-    event.addAttribute(AttributeName.RESOURCES.toString(), cluster.getResourceMap());
+    Map<ResourceId, ResourceConfig> resourceConfigMap =
+        Maps.transformValues(cluster.getResourceMap(), new Function<Resource, ResourceConfig>() {
+          @Override
+          public ResourceConfig apply(Resource resource) {
+            return resource.getConfig();
+          }
+        });
+    event.addAttribute(AttributeName.RESOURCES.toString(), resourceConfigMap);
     event.addAttribute("ClusterDataCache", cluster);
     Map<StateModelDefId, StateModelDefinition> stateModelMap =
         new HashMap<StateModelDefId, StateModelDefinition>();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index 6dcf725..382f036 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -29,15 +29,19 @@ import java.util.UUID;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.Mocks;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.StateModelConfigGenerator;
 import org.testng.annotations.AfterClass;
@@ -107,11 +111,16 @@ public class BaseStageTest {
   protected void setupLiveInstances(int numLiveInstances) {
     // setup liveInstances
     for (int i = 0; i < numLiveInstances; i++) {
-      LiveInstance liveInstance = new LiveInstance("localhost_" + i);
+      String instanceName = "localhost_" + i;
+      InstanceConfig instanceConfig = new InstanceConfig(Id.participant(instanceName));
+      instanceConfig.setHostName("localhost");
+      instanceConfig.setPort(Integer.toString(i));
+      LiveInstance liveInstance = new LiveInstance(instanceName);
       liveInstance.setSessionId("session_" + i);
 
       Builder keyBuilder = accessor.keyBuilder();
-      accessor.setProperty(keyBuilder.liveInstance("localhost_" + i), liveInstance);
+      accessor.setProperty(keyBuilder.instanceConfig(instanceName), instanceConfig);
+      accessor.setProperty(keyBuilder.liveInstance(instanceName), liveInstance);
     }
   }
 
@@ -128,32 +137,38 @@ public class BaseStageTest {
     stage.postProcess();
   }
 
-  protected void setupStateModel() {
-    ZNRecord masterSlave = new StateModelConfigGenerator().generateConfigForMasterSlave();
-
+  protected Map<StateModelDefId, StateModelDefinition> setupStateModel() {
     Builder keyBuilder = accessor.keyBuilder();
-    accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), new StateModelDefinition(
-        masterSlave));
+    Map<StateModelDefId, StateModelDefinition> defs =
+        new HashMap<StateModelDefId, StateModelDefinition>();
+
+    ZNRecord masterSlave = StateModelConfigGenerator.generateConfigForMasterSlave();
+    StateModelDefinition masterSlaveDef = new StateModelDefinition(masterSlave);
+    defs.put(Id.stateModelDef(masterSlaveDef.getId()), masterSlaveDef);
+    accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), masterSlaveDef);
+
+    ZNRecord leaderStandby = StateModelConfigGenerator.generateConfigForLeaderStandby();
+    StateModelDefinition leaderStandbyDef = new StateModelDefinition(leaderStandby);
+    defs.put(Id.stateModelDef(leaderStandbyDef.getId()), leaderStandbyDef);
+    accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), leaderStandbyDef);
 
-    ZNRecord leaderStandby = new StateModelConfigGenerator().generateConfigForLeaderStandby();
-    accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), new StateModelDefinition(
-        leaderStandby));
+    ZNRecord onlineOffline = StateModelConfigGenerator.generateConfigForOnlineOffline();
+    StateModelDefinition onlineOfflineDef = new StateModelDefinition(onlineOffline);
+    defs.put(Id.stateModelDef(onlineOfflineDef.getId()), onlineOfflineDef);
+    accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), onlineOfflineDef);
 
-    ZNRecord onlineOffline = new StateModelConfigGenerator().generateConfigForOnlineOffline();
-    accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), new StateModelDefinition(
-        onlineOffline));
+    return defs;
   }
 
-  protected Map<String, Resource> getResourceMap() {
-    Map<String, Resource> resourceMap = new HashMap<String, Resource>();
-    Resource testResource = new Resource("testResourceName");
-    testResource.setStateModelDefRef("MasterSlave");
-    testResource.addPartition("testResourceName_0");
-    testResource.addPartition("testResourceName_1");
-    testResource.addPartition("testResourceName_2");
-    testResource.addPartition("testResourceName_3");
-    testResource.addPartition("testResourceName_4");
-    resourceMap.put("testResourceName", testResource);
+  protected Map<ResourceId, ResourceConfig> getResourceMap() {
+    Map<ResourceId, ResourceConfig> resourceMap = new HashMap<ResourceId, ResourceConfig>();
+    ResourceConfig.Builder builder = new ResourceConfig.Builder(Id.resource("testResourceName"));
+    builder.addPartition(new Partition(Id.partition("testResourceName_0")));
+    builder.addPartition(new Partition(Id.partition("testResourceName_1")));
+    builder.addPartition(new Partition(Id.partition("testResourceName_2")));
+    builder.addPartition(new Partition(Id.partition("testResourceName_3")));
+    builder.addPartition(new Partition(Id.partition("testResourceName_4")));
+    resourceMap.put(Id.resource("testResourceName"), builder.build());
 
     return resourceMap;
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
index 2453bd8..82b70b1 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
@@ -24,12 +24,17 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.IdealStateModeProperty;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
@@ -41,68 +46,78 @@ import org.testng.annotations.Test;
 public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
   @Test
   public void testSemiAutoModeCompatibility() {
-    System.out.println("START TestBestPossibleStateCalcStage at "
-        + new Date(System.currentTimeMillis()));
+    System.out
+        .println("START TestBestPossibleStateCalcStageCompatibility_testSemiAutoModeCompatibility at "
+            + new Date(System.currentTimeMillis()));
 
     String[] resources = new String[] {
       "testResourceName"
     };
     setupIdealStateDeprecated(5, resources, 10, 1, IdealStateModeProperty.AUTO);
     setupLiveInstances(5);
-    setupStateModel();
+    Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
 
-    Map<String, Resource> resourceMap = getResourceMap();
-    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
+    NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+    event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefs);
 
-    ReadClusterDataStage stage1 = new ReadClusterDataStage();
+    NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
     runStage(event, stage1);
-    BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
+    NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
     runStage(event, stage2);
 
-    BestPossibleStateOutput output =
+    NewBestPossibleStateOutput output =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     for (int p = 0; p < 5; p++) {
-      Partition resource = new Partition("testResourceName_" + p);
-      AssertJUnit.assertEquals("MASTER", output.getInstanceStateMap("testResourceName", resource)
-          .get("localhost_" + (p + 1) % 5));
+      Map<ParticipantId, State> replicaMap =
+          output.getResourceAssignment(Id.resource("testResourceName")).getReplicaMap(
+              Id.partition("testResourceName_" + p));
+      AssertJUnit.assertEquals(State.from("MASTER"),
+          replicaMap.get(Id.participant("localhost_" + (p + 1) % 5)));
     }
-    System.out.println("END TestBestPossibleStateCalcStage at "
-        + new Date(System.currentTimeMillis()));
+    System.out
+        .println("END TestBestPossibleStateCalcStageCompatibility_testSemiAutoModeCompatibility at "
+            + new Date(System.currentTimeMillis()));
   }
 
   @Test
   public void testCustomModeCompatibility() {
-    System.out.println("START TestBestPossibleStateCalcStage at "
-        + new Date(System.currentTimeMillis()));
+    System.out
+        .println("START TestBestPossibleStateCalcStageCompatibility_testCustomModeCompatibility at "
+            + new Date(System.currentTimeMillis()));
 
     String[] resources = new String[] {
       "testResourceName"
     };
     setupIdealStateDeprecated(5, resources, 10, 1, IdealStateModeProperty.CUSTOMIZED);
     setupLiveInstances(5);
-    setupStateModel();
+    Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
 
-    Map<String, Resource> resourceMap = getResourceMap();
-    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
+    NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+    event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefs);
 
-    ReadClusterDataStage stage1 = new ReadClusterDataStage();
+    NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
     runStage(event, stage1);
-    BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
+    NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
     runStage(event, stage2);
 
-    BestPossibleStateOutput output =
+    NewBestPossibleStateOutput output =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     for (int p = 0; p < 5; p++) {
-      Partition resource = new Partition("testResourceName_" + p);
-      AssertJUnit.assertNull(output.getInstanceStateMap("testResourceName", resource).get(
-          "localhost_" + (p + 1) % 5));
+      Map<ParticipantId, State> replicaMap =
+          output.getResourceAssignment(Id.resource("testResourceName")).getReplicaMap(
+              Id.partition("testResourceName_" + p));
+      AssertJUnit.assertEquals(State.from("MASTER"),
+          replicaMap.get(Id.participant("localhost_" + (p + 1) % 5)));
     }
-    System.out.println("END TestBestPossibleStateCalcStage at "
-        + new Date(System.currentTimeMillis()));
+    System.out
+        .println("END TestBestPossibleStateCalcStageCompatibility_testCustomModeCompatibility at "
+            + new Date(System.currentTimeMillis()));
   }
 
   protected List<IdealState> setupIdealStateDeprecated(int nodes, String[] resources,

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
index 82c7b37..1a76615 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
@@ -22,14 +22,14 @@ package org.apache.helix.controller.stages;
 import java.util.Date;
 import java.util.Map;
 
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
@@ -45,24 +45,27 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest {
     };
     setupIdealState(5, resources, 10, 1, RebalanceMode.SEMI_AUTO);
     setupLiveInstances(5);
-    setupStateModel();
+    Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
 
-    Map<String, Resource> resourceMap = getResourceMap();
-    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
+    NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+    event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefs);
 
-    ReadClusterDataStage stage1 = new ReadClusterDataStage();
+    NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
     runStage(event, stage1);
-    BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
+    NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
     runStage(event, stage2);
 
-    BestPossibleStateOutput output =
+    NewBestPossibleStateOutput output =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     for (int p = 0; p < 5; p++) {
-      Partition resource = new Partition("testResourceName_" + p);
-      AssertJUnit.assertEquals("MASTER", output.getInstanceStateMap("testResourceName", resource)
-          .get("localhost_" + (p + 1) % 5));
+      Map<ParticipantId, State> replicaMap =
+          output.getResourceAssignment(Id.resource("testResourceName")).getReplicaMap(
+              Id.partition("testResourceName_" + p));
+      AssertJUnit.assertEquals(State.from("MASTER"),
+          replicaMap.get(Id.participant("localhost_" + (p + 1) % 5)));
     }
     System.out.println("END TestBestPossibleStateCalcStage at "
         + new Date(System.currentTimeMillis()));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
index bce7c2d..47875fc 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
@@ -28,6 +28,7 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
 import org.testng.Assert;
@@ -64,6 +65,8 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
     LiveInstance liveInstance = new LiveInstance(record);
     liveInstance.setSessionId("session_0");
     accessor.setProperty(keyBuilder.liveInstance("localhost_0"), liveInstance);
+    InstanceConfig config = new InstanceConfig(liveInstance.getInstanceName());
+    accessor.setProperty(keyBuilder.instanceConfig(config.getInstanceName()), config);
 
     if (controllerVersion != null) {
       ((Mocks.MockManager) manager).setVersion(controllerVersion);
@@ -74,13 +77,13 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
           .put("minimum_supported_version.participant", minSupportedParticipantVersion);
     }
     event.addAttribute("helixmanager", manager);
-    runStage(event, new ReadClusterDataStage());
+    runStage(event, new NewReadClusterDataStage());
   }
 
   @Test
   public void testCompatible() {
     prepare("0.4.0", "0.4.0");
-    CompatibilityCheckStage stage = new CompatibilityCheckStage();
+    NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
     StageContext context = new StageContext();
     stage.init(context);
     stage.preProcess();
@@ -95,7 +98,7 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
   @Test
   public void testNullParticipantVersion() {
     prepare("0.4.0", null);
-    CompatibilityCheckStage stage = new CompatibilityCheckStage();
+    NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
     StageContext context = new StageContext();
     stage.init(context);
     stage.preProcess();
@@ -111,7 +114,7 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
   @Test
   public void testNullControllerVersion() {
     prepare(null, "0.4.0");
-    CompatibilityCheckStage stage = new CompatibilityCheckStage();
+    NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
     StageContext context = new StageContext();
     stage.init(context);
     stage.preProcess();
@@ -127,7 +130,7 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
   @Test
   public void testIncompatible() {
     prepare("0.6.1-incubating-SNAPSHOT", "0.3.4", "0.4");
-    CompatibilityCheckStage stage = new CompatibilityCheckStage();
+    NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
     StageContext context = new StageContext();
     stage.init(context);
     stage.preProcess();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index ecad444..3f567ae 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -24,11 +24,11 @@ import java.util.Map;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Id;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.State;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
@@ -36,32 +36,32 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
 
   @Test
   public void testEmptyCS() {
-    Map<String, Resource> resourceMap = getResourceMap();
+    Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
-    CurrentStateComputationStage stage = new CurrentStateComputationStage();
-    runStage(event, new ReadClusterDataStage());
+    NewCurrentStateComputationStage stage = new NewCurrentStateComputationStage();
+    runStage(event, new NewReadClusterDataStage());
     runStage(event, stage);
-    CurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    NewCurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.toString());
     AssertJUnit.assertEquals(
-        output.getCurrentStateMap("testResourceName", new Partition("testResourceName_0")).size(),
-        0);
+        output.getCurrentStateMap(Id.resource("testResourceName"),
+            Id.partition("testResourceName_0")).size(), 0);
   }
 
   @Test
   public void testSimpleCS() {
     // setup resource
-    Map<String, Resource> resourceMap = getResourceMap();
+    Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
 
     setupLiveInstances(5);
 
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
-    CurrentStateComputationStage stage = new CurrentStateComputationStage();
-    runStage(event, new ReadClusterDataStage());
+    NewCurrentStateComputationStage stage = new NewCurrentStateComputationStage();
+    runStage(event, new NewReadClusterDataStage());
     runStage(event, stage);
-    CurrentStateOutput output1 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    NewCurrentStateOutput output1 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
     AssertJUnit.assertEquals(
-        output1.getCurrentStateMap("testResourceName", new Partition("testResourceName_0")).size(),
-        0);
+        output1.getCurrentStateMap(Id.resource("testResourceName"),
+            Id.partition("testResourceName_0")).size(), 0);
 
     // Add a state transition messages
     Message message = new Message(Message.MessageType.STATE_TRANSITION, Id.message("msg1"));
@@ -75,13 +75,13 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
     Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.message("localhost_" + 3, message.getId()), message);
 
-    runStage(event, new ReadClusterDataStage());
+    runStage(event, new NewReadClusterDataStage());
     runStage(event, stage);
-    CurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    String pendingState =
-        output2.getPendingState("testResourceName", new Partition("testResourceName_1"),
-            "localhost_3");
-    AssertJUnit.assertEquals(pendingState, "SLAVE");
+    NewCurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    State pendingState =
+        output2.getPendingState(Id.resource("testResourceName"),
+            Id.partition("testResourceName_1"), Id.participant("localhost_3"));
+    AssertJUnit.assertEquals(pendingState, State.from("SLAVE"));
 
     ZNRecord record1 = new ZNRecord("testResourceName");
     // Add a current state that matches sessionId and one that does not match
@@ -100,13 +100,13 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
     accessor.setProperty(
         keyBuilder.currentState("localhost_3", "session_dead", "testResourceName"),
         stateWithDeadSession);
-    runStage(event, new ReadClusterDataStage());
+    runStage(event, new NewReadClusterDataStage());
     runStage(event, stage);
-    CurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    String currentState =
-        output3.getCurrentState("testResourceName", new Partition("testResourceName_1"),
-            "localhost_3");
-    AssertJUnit.assertEquals(currentState, "OFFLINE");
+    NewCurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    State currentState =
+        output3.getCurrentState(Id.resource("testResourceName"),
+            Id.partition("testResourceName_1"), Id.participant("localhost_3"));
+    AssertJUnit.assertEquals(currentState, State.from("OFFLINE"));
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index 26bbc20..bcd8f4a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -41,7 +41,6 @@ import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -56,7 +55,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
     HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     HelixManager manager = new DummyClusterManager(clusterName, accessor);
 
     // ideal state: node0 is MASTER, node1 is SLAVE
@@ -74,7 +73,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     ClusterEvent event = new ClusterEvent("testEvent");
     event.addAttribute("helixmanager", manager);
 
-    MessageThrottleStage throttleStage = new MessageThrottleStage();
+    NewMessageThrottleStage throttleStage = new NewMessageThrottleStage();
     try {
       runStage(event, throttleStage);
       Assert.fail("Should throw exception since DATA_CACHE is null");
@@ -83,7 +82,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     }
 
     Pipeline dataRefresh = new Pipeline();
-    dataRefresh.addStage(new ReadClusterDataStage());
+    dataRefresh.addStage(new NewReadClusterDataStage());
     runPipeline(event, dataRefresh);
 
     try {
@@ -92,7 +91,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     } catch (Exception e) {
       // OK
     }
-    runStage(event, new ResourceComputationStage());
+    runStage(event, new NewResourceComputationStage());
 
     try {
       runStage(event, throttleStage);
@@ -100,22 +99,22 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     } catch (Exception e) {
       // OK
     }
-    MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput();
+    NewMessageOutput msgSelectOutput = new NewMessageOutput();
     List<Message> selectMessages = new ArrayList<Message>();
     Message msg =
         createMessage(MessageType.STATE_TRANSITION, Id.message("msgId-001"), "OFFLINE", "SLAVE",
             "TestDB", "localhost_0");
     selectMessages.add(msg);
 
-    msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
+    msgSelectOutput.setMessages(Id.resource("TestDB"), Id.partition("TestDB_0"), selectMessages);
     event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
 
     runStage(event, throttleStage);
 
-    MessageThrottleStageOutput msgThrottleOutput =
+    NewMessageOutput msgThrottleOutput =
         event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
-    Assert.assertEquals(msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0")).size(),
-        1);
+    Assert.assertEquals(
+        msgThrottleOutput.getMessages(Id.resource("TestDB"), Id.partition("TestDB_0")).size(), 1);
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
@@ -127,7 +126,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
     HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     HelixManager manager = new DummyClusterManager(clusterName, accessor);
 
     // ideal state: node0 is MASTER, node1 is SLAVE
@@ -212,7 +211,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     ClusterConstraints constraint =
         accessor.getProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()));
 
-    MessageThrottleStage throttleStage = new MessageThrottleStage();
+    NewMessageThrottleStage throttleStage = new NewMessageThrottleStage();
 
     // test constraintSelection
     // message1: hit contraintSelection rule1 and rule2
@@ -262,10 +261,10 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     event.addAttribute("helixmanager", manager);
 
     Pipeline dataRefresh = new Pipeline();
-    dataRefresh.addStage(new ReadClusterDataStage());
+    dataRefresh.addStage(new NewReadClusterDataStage());
     runPipeline(event, dataRefresh);
-    runStage(event, new ResourceComputationStage());
-    MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput();
+    runStage(event, new NewResourceComputationStage());
+    NewMessageOutput msgSelectOutput = new NewMessageOutput();
 
     Message msg3 =
         createMessage(MessageType.STATE_TRANSITION, Id.message("msgId-003"), "OFFLINE", "SLAVE",
@@ -291,15 +290,15 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     selectMessages.add(msg5); // should be throttled
     selectMessages.add(msg6); // should be throttled
 
-    msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
+    msgSelectOutput.setMessages(Id.resource("TestDB"), Id.partition("TestDB_0"), selectMessages);
     event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
 
     runStage(event, throttleStage);
 
-    MessageThrottleStageOutput msgThrottleOutput =
+    NewMessageOutput msgThrottleOutput =
         event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
     List<Message> throttleMessages =
-        msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0"));
+        msgThrottleOutput.getMessages(Id.resource("TestDB"), Id.partition("TestDB_0"));
     Assert.assertEquals(throttleMessages.size(), 4);
     Assert.assertTrue(throttleMessages.contains(msg1));
     Assert.assertTrue(throttleMessages.contains(msg2));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
index 97d5ec1..825aa05 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
@@ -20,15 +20,25 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.TestHelper;
+import org.apache.helix.api.HelixVersion;
 import org.apache.helix.api.Id;
-import org.apache.helix.controller.stages.MessageSelectionStage.Bounds;
-import org.apache.helix.model.LiveInstance;
+import org.apache.helix.api.MessageId;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.RunningInstance;
+import org.apache.helix.api.State;
+import org.apache.helix.controller.stages.NewMessageSelectionStage.Bounds;
+import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -38,15 +48,27 @@ public class TestMsgSelectionStage {
   public void testMasterXfer() {
     System.out.println("START testMasterXfer at " + new Date(System.currentTimeMillis()));
 
-    Map<String, LiveInstance> liveInstances = new HashMap<String, LiveInstance>();
-    liveInstances.put("localhost_0", new LiveInstance("localhost_0"));
-    liveInstances.put("localhost_1", new LiveInstance("localhost_1"));
-
-    Map<String, String> currentStates = new HashMap<String, String>();
-    currentStates.put("localhost_0", "SLAVE");
-    currentStates.put("localhost_1", "MASTER");
-
-    Map<String, String> pendingStates = new HashMap<String, String>();
+    Map<ParticipantId, Participant> liveInstances = new HashMap<ParticipantId, Participant>();
+    Set<PartitionId> disabledPartitions = Collections.emptySet();
+    Set<String> tags = Collections.emptySet();
+    Map<ResourceId, CurrentState> currentStateMap = Collections.emptyMap();
+    Map<MessageId, Message> messageMap = Collections.emptyMap();
+    RunningInstance runningInstance0 =
+        new RunningInstance(Id.session("session_0"), HelixVersion.from("1.2.3.4"), Id.process("0"));
+    RunningInstance runningInstance1 =
+        new RunningInstance(Id.session("session_1"), HelixVersion.from("1.2.3.4"), Id.process("1"));
+    liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_0"),
+        "localhost", 0, true, disabledPartitions, tags, runningInstance0, currentStateMap,
+        messageMap));
+    liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_1"),
+        "localhost", 1, true, disabledPartitions, tags, runningInstance1, currentStateMap,
+        messageMap));
+
+    Map<ParticipantId, State> currentStates = new HashMap<ParticipantId, State>();
+    currentStates.put(Id.participant("localhost_0"), State.from("SLAVE"));
+    currentStates.put(Id.participant("localhost_1"), State.from("MASTER"));
+
+    Map<ParticipantId, State> pendingStates = new HashMap<ParticipantId, State>();
 
     List<Message> messages = new ArrayList<Message>();
     messages.add(TestHelper.createMessage(Id.message("msgId_0"), "SLAVE", "MASTER", "localhost_0",
@@ -54,17 +76,17 @@ public class TestMsgSelectionStage {
     messages.add(TestHelper.createMessage(Id.message("msgId_1"), "MASTER", "SLAVE", "localhost_1",
         "TestDB", "TestDB_0"));
 
-    Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
-    stateConstraints.put("MASTER", new Bounds(0, 1));
-    stateConstraints.put("SLAVE", new Bounds(0, 2));
+    Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
+    stateConstraints.put(State.from("MASTER"), new Bounds(0, 1));
+    stateConstraints.put(State.from("SLAVE"), new Bounds(0, 2));
 
     Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
     stateTransitionPriorities.put("MASTER-SLAVE", 0);
     stateTransitionPriorities.put("SLAVE-MASTER", 1);
 
     List<Message> selectedMsg =
-        new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates,
-            messages, stateConstraints, stateTransitionPriorities, "OFFLINE");
+        new NewMessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates,
+            messages, stateConstraints, stateTransitionPriorities, State.from("OFFLINE"));
 
     Assert.assertEquals(selectedMsg.size(), 1);
     Assert.assertEquals(selectedMsg.get(0).getMsgId(), Id.message("msgId_1"));
@@ -76,32 +98,44 @@ public class TestMsgSelectionStage {
     System.out.println("START testMasterXferAfterMasterResume at "
         + new Date(System.currentTimeMillis()));
 
-    Map<String, LiveInstance> liveInstances = new HashMap<String, LiveInstance>();
-    liveInstances.put("localhost_0", new LiveInstance("localhost_0"));
-    liveInstances.put("localhost_1", new LiveInstance("localhost_1"));
-
-    Map<String, String> currentStates = new HashMap<String, String>();
-    currentStates.put("localhost_0", "SLAVE");
-    currentStates.put("localhost_1", "SLAVE");
-
-    Map<String, String> pendingStates = new HashMap<String, String>();
-    pendingStates.put("localhost_1", "MASTER");
+    Map<ParticipantId, Participant> liveInstances = new HashMap<ParticipantId, Participant>();
+    Set<PartitionId> disabledPartitions = Collections.emptySet();
+    Set<String> tags = Collections.emptySet();
+    Map<ResourceId, CurrentState> currentStateMap = Collections.emptyMap();
+    Map<MessageId, Message> messageMap = Collections.emptyMap();
+    RunningInstance runningInstance0 =
+        new RunningInstance(Id.session("session_0"), HelixVersion.from("1.2.3.4"), Id.process("0"));
+    RunningInstance runningInstance1 =
+        new RunningInstance(Id.session("session_1"), HelixVersion.from("1.2.3.4"), Id.process("1"));
+    liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_0"),
+        "localhost", 0, true, disabledPartitions, tags, runningInstance0, currentStateMap,
+        messageMap));
+    liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_1"),
+        "localhost", 1, true, disabledPartitions, tags, runningInstance1, currentStateMap,
+        messageMap));
+
+    Map<ParticipantId, State> currentStates = new HashMap<ParticipantId, State>();
+    currentStates.put(Id.participant("localhost_0"), State.from("SLAVE"));
+    currentStates.put(Id.participant("localhost_1"), State.from("SLAVE"));
+
+    Map<ParticipantId, State> pendingStates = new HashMap<ParticipantId, State>();
+    pendingStates.put(Id.participant("localhost_1"), State.from("MASTER"));
 
     List<Message> messages = new ArrayList<Message>();
     messages.add(TestHelper.createMessage(Id.message("msgId_0"), "SLAVE", "MASTER", "localhost_0",
         "TestDB", "TestDB_0"));
 
-    Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
-    stateConstraints.put("MASTER", new Bounds(0, 1));
-    stateConstraints.put("SLAVE", new Bounds(0, 2));
+    Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
+    stateConstraints.put(State.from("MASTER"), new Bounds(0, 1));
+    stateConstraints.put(State.from("SLAVE"), new Bounds(0, 2));
 
     Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
     stateTransitionPriorities.put("MASTER-SLAVE", 0);
     stateTransitionPriorities.put("SLAVE-MASTER", 1);
 
     List<Message> selectedMsg =
-        new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates,
-            messages, stateConstraints, stateTransitionPriorities, "OFFLINE");
+        new NewMessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates,
+            messages, stateConstraints, stateTransitionPriorities, State.from("OFFLINE"));
 
     Assert.assertEquals(selectedMsg.size(), 0);
     System.out.println("END testMasterXferAfterMasterResume at "

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 7cd942e..a3f38ea 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -39,7 +39,6 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.Attributes;
-import org.apache.helix.model.Partition;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -76,17 +75,17 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     // cluster data cache refresh pipeline
     Pipeline dataRefresh = new Pipeline();
-    dataRefresh.addStage(new ReadClusterDataStage());
+    dataRefresh.addStage(new NewReadClusterDataStage());
 
     // rebalance pipeline
     Pipeline rebalancePipeline = new Pipeline();
-    rebalancePipeline.addStage(new ResourceComputationStage());
-    rebalancePipeline.addStage(new CurrentStateComputationStage());
-    rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new MessageGenerationPhase());
-    rebalancePipeline.addStage(new MessageSelectionStage());
-    rebalancePipeline.addStage(new MessageThrottleStage());
-    rebalancePipeline.addStage(new TaskAssignmentStage());
+    rebalancePipeline.addStage(new NewResourceComputationStage());
+    rebalancePipeline.addStage(new NewCurrentStateComputationStage());
+    rebalancePipeline.addStage(new NewBestPossibleStateCalcStage());
+    rebalancePipeline.addStage(new NewMessageGenerationStage());
+    rebalancePipeline.addStage(new NewMessageSelectionStage());
+    rebalancePipeline.addStage(new NewMessageThrottleStage());
+    rebalancePipeline.addStage(new NewTaskAssignmentStage());
 
     // round1: set node0 currentState to OFFLINE and node1 currentState to OFFLINE
     setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
@@ -96,10 +95,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
-    MessageSelectionStageOutput msgSelOutput =
-        event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    NewMessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
     List<Message> messages =
-        msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+        msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 1, "Should output 1 message: OFFLINE-SLAVE for node0");
     Message message = messages.get(0);
     Assert.assertEquals(message.getFromState().toString(), "OFFLINE");
@@ -114,7 +112,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
-    messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+    messages =
+        msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node1");
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
@@ -220,17 +219,17 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     // cluster data cache refresh pipeline
     Pipeline dataRefresh = new Pipeline();
-    dataRefresh.addStage(new ReadClusterDataStage());
+    dataRefresh.addStage(new NewReadClusterDataStage());
 
     // rebalance pipeline
     Pipeline rebalancePipeline = new Pipeline();
-    rebalancePipeline.addStage(new ResourceComputationStage());
-    rebalancePipeline.addStage(new CurrentStateComputationStage());
-    rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new MessageGenerationPhase());
-    rebalancePipeline.addStage(new MessageSelectionStage());
-    rebalancePipeline.addStage(new MessageThrottleStage());
-    rebalancePipeline.addStage(new TaskAssignmentStage());
+    rebalancePipeline.addStage(new NewResourceComputationStage());
+    rebalancePipeline.addStage(new NewCurrentStateComputationStage());
+    rebalancePipeline.addStage(new NewBestPossibleStateCalcStage());
+    rebalancePipeline.addStage(new NewMessageGenerationStage());
+    rebalancePipeline.addStage(new NewMessageSelectionStage());
+    rebalancePipeline.addStage(new NewMessageThrottleStage());
+    rebalancePipeline.addStage(new NewTaskAssignmentStage());
 
     // round1: set node0 currentState to OFFLINE and node1 currentState to SLAVE
     setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
@@ -240,10 +239,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
-    MessageSelectionStageOutput msgSelOutput =
-        event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    NewMessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
     List<Message> messages =
-        msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+        msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 1, "Should output 1 message: OFFLINE-SLAVE for node0");
     Message message = messages.get(0);
     Assert.assertEquals(message.getFromState().toString(), "OFFLINE");
@@ -258,7 +256,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
-    messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+    messages =
+        msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 1,
         "Should output only 1 message: OFFLINE->DROPPED for localhost_1");
 
@@ -275,7 +274,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
-    messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+    messages =
+        msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 1,
         "Should output 1 message: OFFLINE->DROPPED for localhost_0");
     message = messages.get(0);
@@ -315,17 +315,17 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     // cluster data cache refresh pipeline
     Pipeline dataRefresh = new Pipeline();
-    dataRefresh.addStage(new ReadClusterDataStage());
+    dataRefresh.addStage(new NewReadClusterDataStage());
 
     // rebalance pipeline
     Pipeline rebalancePipeline = new Pipeline();
-    rebalancePipeline.addStage(new ResourceComputationStage());
-    rebalancePipeline.addStage(new CurrentStateComputationStage());
-    rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new MessageGenerationPhase());
-    rebalancePipeline.addStage(new MessageSelectionStage());
-    rebalancePipeline.addStage(new MessageThrottleStage());
-    rebalancePipeline.addStage(new TaskAssignmentStage());
+    rebalancePipeline.addStage(new NewResourceComputationStage());
+    rebalancePipeline.addStage(new NewCurrentStateComputationStage());
+    rebalancePipeline.addStage(new NewBestPossibleStateCalcStage());
+    rebalancePipeline.addStage(new NewMessageGenerationStage());
+    rebalancePipeline.addStage(new NewMessageSelectionStage());
+    rebalancePipeline.addStage(new NewMessageThrottleStage());
+    rebalancePipeline.addStage(new NewTaskAssignmentStage());
 
     // round1: set node1 currentState to SLAVE
     setCurrentState(clusterName, "localhost_1", resourceName, resourceName + "_0", "session_1",
@@ -333,10 +333,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
-    MessageSelectionStageOutput msgSelOutput =
-        event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    NewMessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
     List<Message> messages =
-        msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+        msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 1, "Should output 1 message: SLAVE-MASTER for node1");
     Message message = messages.get(0);
     Assert.assertEquals(message.getFromState().toString(), "SLAVE");
@@ -354,7 +353,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
-    messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+    messages =
+        msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node0");
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
index 86bd060..d4f3de6 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
@@ -28,6 +28,8 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Id;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.State;
 import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
@@ -35,7 +37,6 @@ import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Resource;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
@@ -63,19 +64,21 @@ public class TestResourceComputationStage extends BaseStageTest {
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
     Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.idealState(resourceName), idealState);
-    ResourceComputationStage stage = new ResourceComputationStage();
-    runStage(event, new ReadClusterDataStage());
+    NewResourceComputationStage stage = new NewResourceComputationStage();
+    runStage(event, new NewReadClusterDataStage());
     runStage(event, stage);
 
-    Map<String, Resource> resource = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resource =
+        event.getAttribute(AttributeName.RESOURCES.toString());
     AssertJUnit.assertEquals(1, resource.size());
 
-    AssertJUnit.assertEquals(resource.keySet().iterator().next(), resourceName);
-    AssertJUnit.assertEquals(resource.values().iterator().next().getResourceName(), resourceName);
-    AssertJUnit.assertEquals(resource.values().iterator().next().getStateModelDefRef(),
-        idealState.getStateModelDefRef());
+    AssertJUnit.assertEquals(resource.keySet().iterator().next(), Id.resource(resourceName));
     AssertJUnit
-        .assertEquals(resource.values().iterator().next().getPartitions().size(), partitions);
+        .assertEquals(resource.values().iterator().next().getId(), Id.resource(resourceName));
+    AssertJUnit.assertEquals(resource.values().iterator().next().getRebalancerConfig()
+        .getStateModelDefId(), idealState.getStateModelDefId());
+    AssertJUnit.assertEquals(resource.values().iterator().next().getPartitionSet().size(),
+        partitions);
   }
 
   @Test
@@ -85,21 +88,23 @@ public class TestResourceComputationStage extends BaseStageTest {
         "testResource1", "testResource2"
     };
     List<IdealState> idealStates = setupIdealState(5, resources, 10, 1, RebalanceMode.SEMI_AUTO);
-    ResourceComputationStage stage = new ResourceComputationStage();
-    runStage(event, new ReadClusterDataStage());
+    NewResourceComputationStage stage = new NewResourceComputationStage();
+    runStage(event, new NewReadClusterDataStage());
     runStage(event, stage);
 
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
     AssertJUnit.assertEquals(resources.length, resourceMap.size());
 
     for (int i = 0; i < resources.length; i++) {
       String resourceName = resources[i];
+      ResourceId resourceId = Id.resource(resourceName);
       IdealState idealState = idealStates.get(i);
       AssertJUnit.assertTrue(resourceMap.containsKey(resourceName));
-      AssertJUnit.assertEquals(resourceMap.get(resourceName).getResourceName(), resourceName);
-      AssertJUnit.assertEquals(resourceMap.get(resourceName).getStateModelDefRef(),
-          idealState.getStateModelDefRef());
-      AssertJUnit.assertEquals(resourceMap.get(resourceName).getPartitions().size(),
+      AssertJUnit.assertEquals(resourceMap.get(resourceId).getId(), resourceId);
+      AssertJUnit.assertEquals(resourceMap.get(resourceId).getRebalancerConfig()
+          .getStateModelDefId(), idealState.getStateModelDefRef());
+      AssertJUnit.assertEquals(resourceMap.get(resourceId).getPartitionSet().size(),
           idealState.getNumPartitions());
     }
   }
@@ -151,41 +156,47 @@ public class TestResourceComputationStage extends BaseStageTest {
     accessor.setProperty(keyBuilder.currentState(instanceName, sessionId, oldResource),
         currentState);
 
-    ResourceComputationStage stage = new ResourceComputationStage();
-    runStage(event, new ReadClusterDataStage());
+    NewResourceComputationStage stage = new NewResourceComputationStage();
+    runStage(event, new NewReadClusterDataStage());
     runStage(event, stage);
 
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
     // +1 because it will have one for current state
     AssertJUnit.assertEquals(resources.length + 1, resourceMap.size());
 
     for (int i = 0; i < resources.length; i++) {
       String resourceName = resources[i];
+      ResourceId resourceId = Id.resource(resourceName);
       IdealState idealState = idealStates.get(i);
-      AssertJUnit.assertTrue(resourceMap.containsKey(resourceName));
-      AssertJUnit.assertEquals(resourceMap.get(resourceName).getResourceName(), resourceName);
-      AssertJUnit.assertEquals(resourceMap.get(resourceName).getStateModelDefRef(),
-          idealState.getStateModelDefRef());
-      AssertJUnit.assertEquals(resourceMap.get(resourceName).getPartitions().size(),
+      AssertJUnit.assertTrue(resourceMap.containsKey(resourceId));
+      AssertJUnit.assertEquals(resourceMap.get(resourceId).getId(), resourceId);
+      AssertJUnit.assertEquals(resourceMap.get(resourceId).getRebalancerConfig()
+          .getStateModelDefId(), idealState.getStateModelDefId());
+      AssertJUnit.assertEquals(resourceMap.get(resourceId).getPartitionSet().size(),
           idealState.getNumPartitions());
     }
     // Test the data derived from CurrentState
-    AssertJUnit.assertTrue(resourceMap.containsKey(oldResource));
-    AssertJUnit.assertEquals(resourceMap.get(oldResource).getResourceName(), oldResource);
-    AssertJUnit.assertEquals(resourceMap.get(oldResource).getStateModelDefRef(),
-        currentState.getStateModelDefRef());
-    AssertJUnit.assertEquals(resourceMap.get(oldResource).getPartitions().size(), currentState
-        .getPartitionStateStringMap().size());
-    AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_0"));
-    AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_1"));
-    AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_2"));
+    ResourceId oldResourceId = Id.resource(oldResource);
+    AssertJUnit.assertTrue(resourceMap.containsKey(oldResourceId));
+    AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getId(), oldResourceId);
+    AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getRebalancerConfig()
+        .getStateModelDefId(), currentState.getStateModelDefId());
+    AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getPartitionSet().size(), currentState
+        .getPartitionStateMap().size());
+    AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getPartition(
+        Id.partition("testResourceOld_0")));
+    AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getPartition(
+        Id.partition("testResourceOld_1")));
+    AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getPartition(
+        Id.partition("testResourceOld_2")));
 
   }
 
   @Test
   public void testNull() {
     ClusterEvent event = new ClusterEvent("sampleEvent");
-    ResourceComputationStage stage = new ResourceComputationStage();
+    NewResourceComputationStage stage = new NewResourceComputationStage();
     StageContext context = new StageContext();
     stage.init(context);
     stage.preProcess();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
index 747a185..d8afec5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -26,10 +26,10 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper;
 import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -176,7 +176,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
           TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
       _startCMResultMap.put(storageNodeName, resultx);
     }
-    Thread.sleep(1000);
+    Thread.sleep(5000);
     result =
         ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
             CLUSTER_NAME, TEST_DB));