You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/07 07:40:39 UTC

[2/3] [HELIX-109] adding config classes

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
index af23eb2..a6d9db4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -19,8 +19,10 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
@@ -28,6 +30,7 @@ import org.apache.helix.api.Partition;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.StateModelFactoryId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -51,16 +54,17 @@ public class NewResourceComputationStage extends AbstractBaseStage {
       throw new StageException("Missing attributes in event:" + event + ". Requires Cluster");
     }
 
-    Map<ResourceId, Resource.Builder> resourceBuilderMap =
-        new LinkedHashMap<ResourceId, Resource.Builder>();
+    Map<ResourceId, ResourceConfig.Builder> resourceBuilderMap =
+        new LinkedHashMap<ResourceId, ResourceConfig.Builder>();
     // include all resources in ideal-state
     for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
       Resource resource = cluster.getResource(resourceId);
       RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
 
-      Resource.Builder resourceBuilder = new Resource.Builder(resourceId);
+      ResourceConfig.Builder resourceBuilder = new ResourceConfig.Builder(resourceId);
       resourceBuilder.rebalancerConfig(rebalancerConfig);
-      resourceBuilder.addPartitions(resource.getPartitionSet());
+      Set<Partition> partitionSet = new HashSet<Partition>(resource.getPartitionMap().values());
+      resourceBuilder.addPartitions(partitionSet);
       resourceBuilderMap.put(resourceId, resourceBuilder);
     }
 
@@ -87,7 +91,7 @@ public class NewResourceComputationStage extends AbstractBaseStage {
           rebalancerConfigBuilder.bucketSize(currentState.getBucketSize());
           rebalancerConfigBuilder.batchMessageMode(currentState.getBatchMessageMode());
 
-          Resource.Builder resourceBuilder = new Resource.Builder(resourceId);
+          ResourceConfig.Builder resourceBuilder = new ResourceConfig.Builder(resourceId);
           resourceBuilder.rebalancerConfig(rebalancerConfigBuilder.build());
           resourceBuilderMap.put(resourceId, resourceBuilder);
         }
@@ -99,7 +103,7 @@ public class NewResourceComputationStage extends AbstractBaseStage {
     }
 
     // convert builder-map to resource-map
-    Map<ResourceId, Resource> resourceMap = new LinkedHashMap<ResourceId, Resource>();
+    Map<ResourceId, ResourceConfig> resourceMap = new LinkedHashMap<ResourceId, ResourceConfig>();
     for (ResourceId resourceId : resourceBuilderMap.keySet()) {
       resourceMap.put(resourceId, resourceBuilderMap.get(resourceId).build());
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
index 2b8a0c8..f5bb47f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
@@ -35,13 +35,11 @@ import org.apache.helix.api.Id;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
 import org.apache.log4j.Logger;
 
 public class NewTaskAssignmentStage extends AbstractBaseStage {
@@ -53,9 +51,9 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
     logger.info("START TaskAssignmentStage.process()");
 
     HelixManager manager = event.getAttribute("helixmanager");
-    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-    NewMessageOutput messageOutput =
-        event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
+    NewMessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
     Cluster cluster = event.getAttribute("ClusterDataCache");
     Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
 
@@ -68,7 +66,7 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
     List<Message> messagesToSend = new ArrayList<Message>();
     for (ResourceId resourceId : resourceMap.keySet()) {
-      Resource resource = resourceMap.get(resourceId);
+      ResourceConfig resource = resourceMap.get(resourceId);
       for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
         List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
         messagesToSend.addAll(messages);
@@ -86,8 +84,8 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
   }
 
   List<Message> batchMessage(Builder keyBuilder, List<Message> messages,
-      Map<ResourceId, Resource> resourceMap, Map<ParticipantId, Participant> liveParticipantMap,
-      HelixManagerProperties properties) {
+      Map<ResourceId, ResourceConfig> resourceMap,
+      Map<ParticipantId, Participant> liveParticipantMap, HelixManagerProperties properties) {
     // group messages by its CurrentState path + "/" + fromState + "/" + toState
     Map<String, Message> batchMessages = new HashMap<String, Message>();
     List<Message> outputMessages = new ArrayList<Message>();
@@ -96,7 +94,7 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
     while (iter.hasNext()) {
       Message message = iter.next();
       ResourceId resourceId = message.getResourceId();
-      Resource resource = resourceMap.get(resourceId);
+      ResourceConfig resource = resourceMap.get(resourceId);
 
       ParticipantId participantId = Id.participant(message.getTgtName());
       Participant liveParticipant = liveParticipantMap.get(participantId);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
index 3b46c13..ef47a12 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
@@ -76,6 +76,14 @@ public class ClusterConstraints extends HelixProperty {
   }
 
   /**
+   * Get the type of constraint this object represents
+   * @return constraint type
+   */
+  public ConstraintType getType() {
+    return ConstraintType.valueOf(getId());
+  }
+
+  /**
    * Instantiate constraints from a pre-populated ZNRecord
    * @param record ZNRecord containing all constraints
    */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 24ec7c9..16b3fa1 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -339,10 +339,13 @@ public class IdealState extends HelixProperty {
     Map<String, String> instanceStateMap = getInstanceStateMap(partitionId.stringify());
     ImmutableMap.Builder<ParticipantId, State> builder =
         new ImmutableMap.Builder<ParticipantId, State>();
-    for (String participantId : instanceStateMap.keySet()) {
-      builder.put(Id.participant(participantId), State.from(instanceStateMap.get(participantId)));
+    if (instanceStateMap != null) {
+      for (String participantId : instanceStateMap.keySet()) {
+        builder.put(Id.participant(participantId), State.from(instanceStateMap.get(participantId)));
+      }
+      return builder.build();
     }
-    return builder.build();
+    return null;
   }
 
   /**
@@ -433,10 +436,13 @@ public class IdealState extends HelixProperty {
   public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
     ImmutableList.Builder<ParticipantId> builder = new ImmutableList.Builder<ParticipantId>();
     List<String> preferenceStringList = getPreferenceList(partitionId.stringify());
-    for (String participantName : preferenceStringList) {
-      builder.add(Id.participant(participantName));
+    if (preferenceStringList != null) {
+      for (String participantName : preferenceStringList) {
+        builder.add(Id.participant(participantName));
+      }
+      return builder.build();
     }
-    return builder.build();
+    return null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
index 8577578..2b06c2b 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -84,6 +84,14 @@ public class ResourceAssignment extends HelixProperty {
   }
 
   /**
+   * Get the entire map of a resource
+   * @return map of partition to participant to state
+   */
+  public Map<PartitionId, Map<ParticipantId, State>> getResourceMap() {
+    return replicaMapsFromStringMaps(_record.getMapFields());
+  }
+
+  /**
    * Get the participant, state pairs for a partition
    * @param partition the Partition to look up
    * @return map of (participant id, state)

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index 7ceee85..b371c6a 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -45,6 +45,12 @@ import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.ClusterAccessor;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.controller.stages.AttributeName;
@@ -53,6 +59,10 @@ import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.NewBestPossibleStateOutput;
+import org.apache.helix.controller.stages.NewCurrentStateComputationStage;
+import org.apache.helix.controller.stages.NewResourceComputationStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -60,6 +70,8 @@ import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.store.PropertyJsonComparator;
@@ -156,7 +168,7 @@ public class ClusterStateVerifier {
         HelixDataAccessor accessor =
             new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
 
-        return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates);
+        return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates, clusterName);
       } catch (Exception e) {
         LOG.error("exception in verification", e);
       }
@@ -222,10 +234,11 @@ public class ClusterStateVerifier {
   }
 
   static boolean verifyBestPossAndExtView(HelixDataAccessor accessor,
-      Map<String, Map<String, String>> errStates) {
+      Map<String, Map<String, String>> errStates, String clusterName) {
     try {
       Builder keyBuilder = accessor.keyBuilder();
       // read cluster once and do verification
+      // TODO: stop using ClusterDataCache
       ClusterDataCache cache = new ClusterDataCache();
       cache.refresh(accessor);
 
@@ -250,10 +263,31 @@ public class ClusterStateVerifier {
         }
       }
 
+      Map<String, StateModelDefinition> stateModelDefs =
+          accessor.getChildValuesMap(keyBuilder.stateModelDefs());
+      Map<StateModelDefId, StateModelDefinition> convertedDefs =
+          new HashMap<StateModelDefId, StateModelDefinition>();
+      for (String defName : stateModelDefs.keySet()) {
+        convertedDefs.put(Id.stateModelDef(defName), stateModelDefs.get(defName));
+      }
+      ClusterAccessor clusterAccessor = new ClusterAccessor(Id.cluster(clusterName), accessor);
+      Cluster cluster = clusterAccessor.readCluster();
       // calculate best possible state
-      BestPossibleStateOutput bestPossOutput = ClusterStateVerifier.calcBestPossState(cache);
-      Map<String, Map<Partition, Map<String, String>>> bestPossStateMap =
-          bestPossOutput.getStateMap();
+      NewBestPossibleStateOutput bestPossOutput =
+          ClusterStateVerifier.calcBestPossState(cluster, convertedDefs);
+      Map<String, Map<String, Map<String, String>>> bestPossStateMap =
+          new HashMap<String, Map<String, Map<String, String>>>();
+      for (ResourceId resourceId : bestPossOutput.getAssignedResources()) {
+        ResourceAssignment resourceAssignment = bestPossOutput.getResourceAssignment(resourceId);
+        Map<String, Map<String, String>> resourceMap = new HashMap<String, Map<String, String>>();
+        for (PartitionId partitionId : resourceAssignment.getMappedPartitions()) {
+          Map<String, String> replicaMap =
+              ResourceAssignment.stringMapFromReplicaMap(resourceAssignment
+                  .getReplicaMap(partitionId));
+          resourceMap.put(partitionId.stringify(), replicaMap);
+        }
+        bestPossStateMap.put(resourceId.stringify(), resourceMap);
+      }
 
       // set error states
       if (errStates != null) {
@@ -263,13 +297,12 @@ public class ClusterStateVerifier {
             String instanceName = partErrStates.get(partitionName);
 
             if (!bestPossStateMap.containsKey(resourceName)) {
-              bestPossStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+              bestPossStateMap.put(resourceName, new HashMap<String, Map<String, String>>());
             }
-            Partition partition = new Partition(partitionName);
-            if (!bestPossStateMap.get(resourceName).containsKey(partition)) {
-              bestPossStateMap.get(resourceName).put(partition, new HashMap<String, String>());
+            if (!bestPossStateMap.get(resourceName).containsKey(partitionName)) {
+              bestPossStateMap.get(resourceName).put(partitionName, new HashMap<String, String>());
             }
-            bestPossStateMap.get(resourceName).get(partition)
+            bestPossStateMap.get(resourceName).get(partitionName)
                 .put(instanceName, HelixDefinedState.ERROR.toString());
           }
         }
@@ -285,11 +318,12 @@ public class ClusterStateVerifier {
         }
 
         // step 0: remove empty map and DROPPED state from best possible state
-        Map<Partition, Map<String, String>> bpStateMap =
-            bestPossOutput.getResourceMap(resourceName);
-        Iterator<Entry<Partition, Map<String, String>>> iter = bpStateMap.entrySet().iterator();
+        Map<String, Map<String, String>> bpStateMap =
+            ResourceAssignment.stringMapsFromReplicaMaps(bestPossOutput.getResourceAssignment(
+                Id.resource(resourceName)).getResourceMap());
+        Iterator<Entry<String, Map<String, String>>> iter = bpStateMap.entrySet().iterator();
         while (iter.hasNext()) {
-          Map.Entry<Partition, Map<String, String>> entry = iter.next();
+          Map.Entry<String, Map<String, String>> entry = iter.next();
           Map<String, String> instanceStateMap = entry.getValue();
           if (instanceStateMap.isEmpty()) {
             iter.remove();
@@ -310,7 +344,9 @@ public class ClusterStateVerifier {
 
         // step 1: externalView and bestPossibleState has equal size
         int extViewSize = extView.getRecord().getMapFields().size();
-        int bestPossStateSize = bestPossOutput.getResourceMap(resourceName).size();
+        int bestPossStateSize =
+            bestPossOutput.getResourceAssignment(Id.resource(resourceName)).getMappedPartitions()
+                .size();
         if (extViewSize != bestPossStateSize) {
           LOG.info("exterView size (" + extViewSize + ") is different from bestPossState size ("
               + bestPossStateSize + ") for resource: " + resourceName);
@@ -328,7 +364,8 @@ public class ClusterStateVerifier {
         for (String partition : extView.getRecord().getMapFields().keySet()) {
           Map<String, String> evInstanceStateMap = extView.getRecord().getMapField(partition);
           Map<String, String> bpInstanceStateMap =
-              bestPossOutput.getInstanceStateMap(resourceName, new Partition(partition));
+              ResourceAssignment.stringMapFromReplicaMap(bestPossOutput.getResourceAssignment(
+                  Id.resource(resourceName)).getReplicaMap(Id.partition(partition)));
 
           boolean result =
               ClusterStateVerifier.<String, String> compareMap(evInstanceStateMap,
@@ -404,24 +441,27 @@ public class ClusterStateVerifier {
   /**
    * calculate the best possible state note that DROPPED states are not checked since when
    * kick off the BestPossibleStateCalcStage we are providing an empty current state map
+   * @param convertedDefs
    * @param cache
    * @return
    * @throws Exception
    */
 
-  static BestPossibleStateOutput calcBestPossState(ClusterDataCache cache) throws Exception {
+  static NewBestPossibleStateOutput calcBestPossState(Cluster cluster,
+      Map<StateModelDefId, StateModelDefinition> convertedDefs) throws Exception {
     ClusterEvent event = new ClusterEvent("sampleEvent");
-    event.addAttribute("ClusterDataCache", cache);
+    event.addAttribute("ClusterDataCache", cluster);
+    event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), convertedDefs);
 
-    ResourceComputationStage rcState = new ResourceComputationStage();
-    CurrentStateComputationStage csStage = new CurrentStateComputationStage();
-    BestPossibleStateCalcStage bpStage = new BestPossibleStateCalcStage();
+    NewResourceComputationStage rcState = new NewResourceComputationStage();
+    NewCurrentStateComputationStage csStage = new NewCurrentStateComputationStage();
+    NewBestPossibleStateCalcStage bpStage = new NewBestPossibleStateCalcStage();
 
     runStage(event, rcState);
     runStage(event, csStage);
     runStage(event, bpStage);
 
-    BestPossibleStateOutput output =
+    NewBestPossibleStateOutput output =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
 
     // System.out.println("output:" + output);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index cc26596..ce2781f 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -52,6 +52,9 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+
 public class TestNewStages extends ZkUnitTestBase {
   final int n = 2;
   final int p = 8;
@@ -115,7 +118,14 @@ public class TestNewStages extends ZkUnitTestBase {
     Cluster cluster = clusterAccessor.readCluster();
     ClusterEvent event = new ClusterEvent(testName);
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), new NewCurrentStateOutput());
-    event.addAttribute(AttributeName.RESOURCES.toString(), cluster.getResourceMap());
+    Map<ResourceId, ResourceConfig> resourceConfigMap =
+        Maps.transformValues(cluster.getResourceMap(), new Function<Resource, ResourceConfig>() {
+          @Override
+          public ResourceConfig apply(Resource resource) {
+            return resource.getConfig();
+          }
+        });
+    event.addAttribute(AttributeName.RESOURCES.toString(), resourceConfigMap);
     event.addAttribute("ClusterDataCache", cluster);
     Map<StateModelDefId, StateModelDefinition> stateModelMap =
         new HashMap<StateModelDefId, StateModelDefinition>();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index 6dcf725..382f036 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -29,15 +29,19 @@ import java.util.UUID;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.Mocks;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.StateModelConfigGenerator;
 import org.testng.annotations.AfterClass;
@@ -107,11 +111,16 @@ public class BaseStageTest {
   protected void setupLiveInstances(int numLiveInstances) {
     // setup liveInstances
     for (int i = 0; i < numLiveInstances; i++) {
-      LiveInstance liveInstance = new LiveInstance("localhost_" + i);
+      String instanceName = "localhost_" + i;
+      InstanceConfig instanceConfig = new InstanceConfig(Id.participant(instanceName));
+      instanceConfig.setHostName("localhost");
+      instanceConfig.setPort(Integer.toString(i));
+      LiveInstance liveInstance = new LiveInstance(instanceName);
       liveInstance.setSessionId("session_" + i);
 
       Builder keyBuilder = accessor.keyBuilder();
-      accessor.setProperty(keyBuilder.liveInstance("localhost_" + i), liveInstance);
+      accessor.setProperty(keyBuilder.instanceConfig(instanceName), instanceConfig);
+      accessor.setProperty(keyBuilder.liveInstance(instanceName), liveInstance);
     }
   }
 
@@ -128,32 +137,38 @@ public class BaseStageTest {
     stage.postProcess();
   }
 
-  protected void setupStateModel() {
-    ZNRecord masterSlave = new StateModelConfigGenerator().generateConfigForMasterSlave();
-
+  protected Map<StateModelDefId, StateModelDefinition> setupStateModel() {
     Builder keyBuilder = accessor.keyBuilder();
-    accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), new StateModelDefinition(
-        masterSlave));
+    Map<StateModelDefId, StateModelDefinition> defs =
+        new HashMap<StateModelDefId, StateModelDefinition>();
+
+    ZNRecord masterSlave = StateModelConfigGenerator.generateConfigForMasterSlave();
+    StateModelDefinition masterSlaveDef = new StateModelDefinition(masterSlave);
+    defs.put(Id.stateModelDef(masterSlaveDef.getId()), masterSlaveDef);
+    accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), masterSlaveDef);
+
+    ZNRecord leaderStandby = StateModelConfigGenerator.generateConfigForLeaderStandby();
+    StateModelDefinition leaderStandbyDef = new StateModelDefinition(leaderStandby);
+    defs.put(Id.stateModelDef(leaderStandbyDef.getId()), leaderStandbyDef);
+    accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), leaderStandbyDef);
 
-    ZNRecord leaderStandby = new StateModelConfigGenerator().generateConfigForLeaderStandby();
-    accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), new StateModelDefinition(
-        leaderStandby));
+    ZNRecord onlineOffline = StateModelConfigGenerator.generateConfigForOnlineOffline();
+    StateModelDefinition onlineOfflineDef = new StateModelDefinition(onlineOffline);
+    defs.put(Id.stateModelDef(onlineOfflineDef.getId()), onlineOfflineDef);
+    accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), onlineOfflineDef);
 
-    ZNRecord onlineOffline = new StateModelConfigGenerator().generateConfigForOnlineOffline();
-    accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), new StateModelDefinition(
-        onlineOffline));
+    return defs;
   }
 
-  protected Map<String, Resource> getResourceMap() {
-    Map<String, Resource> resourceMap = new HashMap<String, Resource>();
-    Resource testResource = new Resource("testResourceName");
-    testResource.setStateModelDefRef("MasterSlave");
-    testResource.addPartition("testResourceName_0");
-    testResource.addPartition("testResourceName_1");
-    testResource.addPartition("testResourceName_2");
-    testResource.addPartition("testResourceName_3");
-    testResource.addPartition("testResourceName_4");
-    resourceMap.put("testResourceName", testResource);
+  protected Map<ResourceId, ResourceConfig> getResourceMap() {
+    Map<ResourceId, ResourceConfig> resourceMap = new HashMap<ResourceId, ResourceConfig>();
+    ResourceConfig.Builder builder = new ResourceConfig.Builder(Id.resource("testResourceName"));
+    builder.addPartition(new Partition(Id.partition("testResourceName_0")));
+    builder.addPartition(new Partition(Id.partition("testResourceName_1")));
+    builder.addPartition(new Partition(Id.partition("testResourceName_2")));
+    builder.addPartition(new Partition(Id.partition("testResourceName_3")));
+    builder.addPartition(new Partition(Id.partition("testResourceName_4")));
+    resourceMap.put(Id.resource("testResourceName"), builder.build());
 
     return resourceMap;
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
index 2453bd8..82b70b1 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
@@ -24,12 +24,17 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.IdealStateModeProperty;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
@@ -41,68 +46,78 @@ import org.testng.annotations.Test;
 public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
   @Test
   public void testSemiAutoModeCompatibility() {
-    System.out.println("START TestBestPossibleStateCalcStage at "
-        + new Date(System.currentTimeMillis()));
+    System.out
+        .println("START TestBestPossibleStateCalcStageCompatibility_testSemiAutoModeCompatibility at "
+            + new Date(System.currentTimeMillis()));
 
     String[] resources = new String[] {
       "testResourceName"
     };
     setupIdealStateDeprecated(5, resources, 10, 1, IdealStateModeProperty.AUTO);
     setupLiveInstances(5);
-    setupStateModel();
+    Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
 
-    Map<String, Resource> resourceMap = getResourceMap();
-    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
+    NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+    event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefs);
 
-    ReadClusterDataStage stage1 = new ReadClusterDataStage();
+    NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
     runStage(event, stage1);
-    BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
+    NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
     runStage(event, stage2);
 
-    BestPossibleStateOutput output =
+    NewBestPossibleStateOutput output =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     for (int p = 0; p < 5; p++) {
-      Partition resource = new Partition("testResourceName_" + p);
-      AssertJUnit.assertEquals("MASTER", output.getInstanceStateMap("testResourceName", resource)
-          .get("localhost_" + (p + 1) % 5));
+      Map<ParticipantId, State> replicaMap =
+          output.getResourceAssignment(Id.resource("testResourceName")).getReplicaMap(
+              Id.partition("testResourceName_" + p));
+      AssertJUnit.assertEquals(State.from("MASTER"),
+          replicaMap.get(Id.participant("localhost_" + (p + 1) % 5)));
     }
-    System.out.println("END TestBestPossibleStateCalcStage at "
-        + new Date(System.currentTimeMillis()));
+    System.out
+        .println("END TestBestPossibleStateCalcStageCompatibility_testSemiAutoModeCompatibility at "
+            + new Date(System.currentTimeMillis()));
   }
 
   @Test
   public void testCustomModeCompatibility() {
-    System.out.println("START TestBestPossibleStateCalcStage at "
-        + new Date(System.currentTimeMillis()));
+    System.out
+        .println("START TestBestPossibleStateCalcStageCompatibility_testCustomModeCompatibility at "
+            + new Date(System.currentTimeMillis()));
 
     String[] resources = new String[] {
       "testResourceName"
     };
     setupIdealStateDeprecated(5, resources, 10, 1, IdealStateModeProperty.CUSTOMIZED);
     setupLiveInstances(5);
-    setupStateModel();
+    Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
 
-    Map<String, Resource> resourceMap = getResourceMap();
-    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
+    NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+    event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefs);
 
-    ReadClusterDataStage stage1 = new ReadClusterDataStage();
+    NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
     runStage(event, stage1);
-    BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
+    NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
     runStage(event, stage2);
 
-    BestPossibleStateOutput output =
+    NewBestPossibleStateOutput output =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     for (int p = 0; p < 5; p++) {
-      Partition resource = new Partition("testResourceName_" + p);
-      AssertJUnit.assertNull(output.getInstanceStateMap("testResourceName", resource).get(
-          "localhost_" + (p + 1) % 5));
+      Map<ParticipantId, State> replicaMap =
+          output.getResourceAssignment(Id.resource("testResourceName")).getReplicaMap(
+              Id.partition("testResourceName_" + p));
+      AssertJUnit.assertEquals(State.from("MASTER"),
+          replicaMap.get(Id.participant("localhost_" + (p + 1) % 5)));
     }
-    System.out.println("END TestBestPossibleStateCalcStage at "
-        + new Date(System.currentTimeMillis()));
+    System.out
+        .println("END TestBestPossibleStateCalcStageCompatibility_testCustomModeCompatibility at "
+            + new Date(System.currentTimeMillis()));
   }
 
   protected List<IdealState> setupIdealStateDeprecated(int nodes, String[] resources,

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
index 82c7b37..1a76615 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
@@ -22,14 +22,14 @@ package org.apache.helix.controller.stages;
 import java.util.Date;
 import java.util.Map;
 
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
@@ -45,24 +45,27 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest {
     };
     setupIdealState(5, resources, 10, 1, RebalanceMode.SEMI_AUTO);
     setupLiveInstances(5);
-    setupStateModel();
+    Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
 
-    Map<String, Resource> resourceMap = getResourceMap();
-    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
+    NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+    event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefs);
 
-    ReadClusterDataStage stage1 = new ReadClusterDataStage();
+    NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
     runStage(event, stage1);
-    BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
+    NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
     runStage(event, stage2);
 
-    BestPossibleStateOutput output =
+    NewBestPossibleStateOutput output =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     for (int p = 0; p < 5; p++) {
-      Partition resource = new Partition("testResourceName_" + p);
-      AssertJUnit.assertEquals("MASTER", output.getInstanceStateMap("testResourceName", resource)
-          .get("localhost_" + (p + 1) % 5));
+      Map<ParticipantId, State> replicaMap =
+          output.getResourceAssignment(Id.resource("testResourceName")).getReplicaMap(
+              Id.partition("testResourceName_" + p));
+      AssertJUnit.assertEquals(State.from("MASTER"),
+          replicaMap.get(Id.participant("localhost_" + (p + 1) % 5)));
     }
     System.out.println("END TestBestPossibleStateCalcStage at "
         + new Date(System.currentTimeMillis()));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
index bce7c2d..47875fc 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
@@ -28,6 +28,7 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
 import org.testng.Assert;
@@ -64,6 +65,8 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
     LiveInstance liveInstance = new LiveInstance(record);
     liveInstance.setSessionId("session_0");
     accessor.setProperty(keyBuilder.liveInstance("localhost_0"), liveInstance);
+    InstanceConfig config = new InstanceConfig(liveInstance.getInstanceName());
+    accessor.setProperty(keyBuilder.instanceConfig(config.getInstanceName()), config);
 
     if (controllerVersion != null) {
       ((Mocks.MockManager) manager).setVersion(controllerVersion);
@@ -74,13 +77,13 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
           .put("minimum_supported_version.participant", minSupportedParticipantVersion);
     }
     event.addAttribute("helixmanager", manager);
-    runStage(event, new ReadClusterDataStage());
+    runStage(event, new NewReadClusterDataStage());
   }
 
   @Test
   public void testCompatible() {
     prepare("0.4.0", "0.4.0");
-    CompatibilityCheckStage stage = new CompatibilityCheckStage();
+    NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
     StageContext context = new StageContext();
     stage.init(context);
     stage.preProcess();
@@ -95,7 +98,7 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
   @Test
   public void testNullParticipantVersion() {
     prepare("0.4.0", null);
-    CompatibilityCheckStage stage = new CompatibilityCheckStage();
+    NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
     StageContext context = new StageContext();
     stage.init(context);
     stage.preProcess();
@@ -111,7 +114,7 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
   @Test
   public void testNullControllerVersion() {
     prepare(null, "0.4.0");
-    CompatibilityCheckStage stage = new CompatibilityCheckStage();
+    NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
     StageContext context = new StageContext();
     stage.init(context);
     stage.preProcess();
@@ -127,7 +130,7 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
   @Test
   public void testIncompatible() {
     prepare("0.6.1-incubating-SNAPSHOT", "0.3.4", "0.4");
-    CompatibilityCheckStage stage = new CompatibilityCheckStage();
+    NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
     StageContext context = new StageContext();
     stage.init(context);
     stage.preProcess();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index ecad444..3f567ae 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -24,11 +24,11 @@ import java.util.Map;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Id;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.State;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
@@ -36,32 +36,32 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
 
   @Test
   public void testEmptyCS() {
-    Map<String, Resource> resourceMap = getResourceMap();
+    Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
-    CurrentStateComputationStage stage = new CurrentStateComputationStage();
-    runStage(event, new ReadClusterDataStage());
+    NewCurrentStateComputationStage stage = new NewCurrentStateComputationStage();
+    runStage(event, new NewReadClusterDataStage());
     runStage(event, stage);
-    CurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    NewCurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.toString());
     AssertJUnit.assertEquals(
-        output.getCurrentStateMap("testResourceName", new Partition("testResourceName_0")).size(),
-        0);
+        output.getCurrentStateMap(Id.resource("testResourceName"),
+            Id.partition("testResourceName_0")).size(), 0);
   }
 
   @Test
   public void testSimpleCS() {
     // setup resource
-    Map<String, Resource> resourceMap = getResourceMap();
+    Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
 
     setupLiveInstances(5);
 
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
-    CurrentStateComputationStage stage = new CurrentStateComputationStage();
-    runStage(event, new ReadClusterDataStage());
+    NewCurrentStateComputationStage stage = new NewCurrentStateComputationStage();
+    runStage(event, new NewReadClusterDataStage());
     runStage(event, stage);
-    CurrentStateOutput output1 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    NewCurrentStateOutput output1 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
     AssertJUnit.assertEquals(
-        output1.getCurrentStateMap("testResourceName", new Partition("testResourceName_0")).size(),
-        0);
+        output1.getCurrentStateMap(Id.resource("testResourceName"),
+            Id.partition("testResourceName_0")).size(), 0);
 
     // Add a state transition messages
     Message message = new Message(Message.MessageType.STATE_TRANSITION, Id.message("msg1"));
@@ -75,13 +75,13 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
     Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.message("localhost_" + 3, message.getId()), message);
 
-    runStage(event, new ReadClusterDataStage());
+    runStage(event, new NewReadClusterDataStage());
     runStage(event, stage);
-    CurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    String pendingState =
-        output2.getPendingState("testResourceName", new Partition("testResourceName_1"),
-            "localhost_3");
-    AssertJUnit.assertEquals(pendingState, "SLAVE");
+    NewCurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    State pendingState =
+        output2.getPendingState(Id.resource("testResourceName"),
+            Id.partition("testResourceName_1"), Id.participant("localhost_3"));
+    AssertJUnit.assertEquals(pendingState, State.from("SLAVE"));
 
     ZNRecord record1 = new ZNRecord("testResourceName");
     // Add a current state that matches sessionId and one that does not match
@@ -100,13 +100,13 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
     accessor.setProperty(
         keyBuilder.currentState("localhost_3", "session_dead", "testResourceName"),
         stateWithDeadSession);
-    runStage(event, new ReadClusterDataStage());
+    runStage(event, new NewReadClusterDataStage());
     runStage(event, stage);
-    CurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    String currentState =
-        output3.getCurrentState("testResourceName", new Partition("testResourceName_1"),
-            "localhost_3");
-    AssertJUnit.assertEquals(currentState, "OFFLINE");
+    NewCurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    State currentState =
+        output3.getCurrentState(Id.resource("testResourceName"),
+            Id.partition("testResourceName_1"), Id.participant("localhost_3"));
+    AssertJUnit.assertEquals(currentState, State.from("OFFLINE"));
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index 26bbc20..bcd8f4a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -41,7 +41,6 @@ import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -56,7 +55,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
     HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     HelixManager manager = new DummyClusterManager(clusterName, accessor);
 
     // ideal state: node0 is MASTER, node1 is SLAVE
@@ -74,7 +73,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     ClusterEvent event = new ClusterEvent("testEvent");
     event.addAttribute("helixmanager", manager);
 
-    MessageThrottleStage throttleStage = new MessageThrottleStage();
+    NewMessageThrottleStage throttleStage = new NewMessageThrottleStage();
     try {
       runStage(event, throttleStage);
       Assert.fail("Should throw exception since DATA_CACHE is null");
@@ -83,7 +82,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     }
 
     Pipeline dataRefresh = new Pipeline();
-    dataRefresh.addStage(new ReadClusterDataStage());
+    dataRefresh.addStage(new NewReadClusterDataStage());
     runPipeline(event, dataRefresh);
 
     try {
@@ -92,7 +91,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     } catch (Exception e) {
       // OK
     }
-    runStage(event, new ResourceComputationStage());
+    runStage(event, new NewResourceComputationStage());
 
     try {
       runStage(event, throttleStage);
@@ -100,22 +99,22 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     } catch (Exception e) {
       // OK
     }
-    MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput();
+    NewMessageOutput msgSelectOutput = new NewMessageOutput();
     List<Message> selectMessages = new ArrayList<Message>();
     Message msg =
         createMessage(MessageType.STATE_TRANSITION, Id.message("msgId-001"), "OFFLINE", "SLAVE",
             "TestDB", "localhost_0");
     selectMessages.add(msg);
 
-    msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
+    msgSelectOutput.setMessages(Id.resource("TestDB"), Id.partition("TestDB_0"), selectMessages);
     event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
 
     runStage(event, throttleStage);
 
-    MessageThrottleStageOutput msgThrottleOutput =
+    NewMessageOutput msgThrottleOutput =
         event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
-    Assert.assertEquals(msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0")).size(),
-        1);
+    Assert.assertEquals(
+        msgThrottleOutput.getMessages(Id.resource("TestDB"), Id.partition("TestDB_0")).size(), 1);
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
@@ -127,7 +126,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
     HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     HelixManager manager = new DummyClusterManager(clusterName, accessor);
 
     // ideal state: node0 is MASTER, node1 is SLAVE
@@ -212,7 +211,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     ClusterConstraints constraint =
         accessor.getProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()));
 
-    MessageThrottleStage throttleStage = new MessageThrottleStage();
+    NewMessageThrottleStage throttleStage = new NewMessageThrottleStage();
 
     // test constraintSelection
     // message1: hit contraintSelection rule1 and rule2
@@ -262,10 +261,10 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     event.addAttribute("helixmanager", manager);
 
     Pipeline dataRefresh = new Pipeline();
-    dataRefresh.addStage(new ReadClusterDataStage());
+    dataRefresh.addStage(new NewReadClusterDataStage());
     runPipeline(event, dataRefresh);
-    runStage(event, new ResourceComputationStage());
-    MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput();
+    runStage(event, new NewResourceComputationStage());
+    NewMessageOutput msgSelectOutput = new NewMessageOutput();
 
     Message msg3 =
         createMessage(MessageType.STATE_TRANSITION, Id.message("msgId-003"), "OFFLINE", "SLAVE",
@@ -291,15 +290,15 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     selectMessages.add(msg5); // should be throttled
     selectMessages.add(msg6); // should be throttled
 
-    msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
+    msgSelectOutput.setMessages(Id.resource("TestDB"), Id.partition("TestDB_0"), selectMessages);
     event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
 
     runStage(event, throttleStage);
 
-    MessageThrottleStageOutput msgThrottleOutput =
+    NewMessageOutput msgThrottleOutput =
         event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
     List<Message> throttleMessages =
-        msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0"));
+        msgThrottleOutput.getMessages(Id.resource("TestDB"), Id.partition("TestDB_0"));
     Assert.assertEquals(throttleMessages.size(), 4);
     Assert.assertTrue(throttleMessages.contains(msg1));
     Assert.assertTrue(throttleMessages.contains(msg2));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
index 97d5ec1..825aa05 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
@@ -20,15 +20,25 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.TestHelper;
+import org.apache.helix.api.HelixVersion;
 import org.apache.helix.api.Id;
-import org.apache.helix.controller.stages.MessageSelectionStage.Bounds;
-import org.apache.helix.model.LiveInstance;
+import org.apache.helix.api.MessageId;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.RunningInstance;
+import org.apache.helix.api.State;
+import org.apache.helix.controller.stages.NewMessageSelectionStage.Bounds;
+import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -38,15 +48,27 @@ public class TestMsgSelectionStage {
   public void testMasterXfer() {
     System.out.println("START testMasterXfer at " + new Date(System.currentTimeMillis()));
 
-    Map<String, LiveInstance> liveInstances = new HashMap<String, LiveInstance>();
-    liveInstances.put("localhost_0", new LiveInstance("localhost_0"));
-    liveInstances.put("localhost_1", new LiveInstance("localhost_1"));
-
-    Map<String, String> currentStates = new HashMap<String, String>();
-    currentStates.put("localhost_0", "SLAVE");
-    currentStates.put("localhost_1", "MASTER");
-
-    Map<String, String> pendingStates = new HashMap<String, String>();
+    Map<ParticipantId, Participant> liveInstances = new HashMap<ParticipantId, Participant>();
+    Set<PartitionId> disabledPartitions = Collections.emptySet();
+    Set<String> tags = Collections.emptySet();
+    Map<ResourceId, CurrentState> currentStateMap = Collections.emptyMap();
+    Map<MessageId, Message> messageMap = Collections.emptyMap();
+    RunningInstance runningInstance0 =
+        new RunningInstance(Id.session("session_0"), HelixVersion.from("1.2.3.4"), Id.process("0"));
+    RunningInstance runningInstance1 =
+        new RunningInstance(Id.session("session_1"), HelixVersion.from("1.2.3.4"), Id.process("1"));
+    liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_0"),
+        "localhost", 0, true, disabledPartitions, tags, runningInstance0, currentStateMap,
+        messageMap));
+    liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_1"),
+        "localhost", 1, true, disabledPartitions, tags, runningInstance1, currentStateMap,
+        messageMap));
+
+    Map<ParticipantId, State> currentStates = new HashMap<ParticipantId, State>();
+    currentStates.put(Id.participant("localhost_0"), State.from("SLAVE"));
+    currentStates.put(Id.participant("localhost_1"), State.from("MASTER"));
+
+    Map<ParticipantId, State> pendingStates = new HashMap<ParticipantId, State>();
 
     List<Message> messages = new ArrayList<Message>();
     messages.add(TestHelper.createMessage(Id.message("msgId_0"), "SLAVE", "MASTER", "localhost_0",
@@ -54,17 +76,17 @@ public class TestMsgSelectionStage {
     messages.add(TestHelper.createMessage(Id.message("msgId_1"), "MASTER", "SLAVE", "localhost_1",
         "TestDB", "TestDB_0"));
 
-    Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
-    stateConstraints.put("MASTER", new Bounds(0, 1));
-    stateConstraints.put("SLAVE", new Bounds(0, 2));
+    Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
+    stateConstraints.put(State.from("MASTER"), new Bounds(0, 1));
+    stateConstraints.put(State.from("SLAVE"), new Bounds(0, 2));
 
     Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
     stateTransitionPriorities.put("MASTER-SLAVE", 0);
     stateTransitionPriorities.put("SLAVE-MASTER", 1);
 
     List<Message> selectedMsg =
-        new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates,
-            messages, stateConstraints, stateTransitionPriorities, "OFFLINE");
+        new NewMessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates,
+            messages, stateConstraints, stateTransitionPriorities, State.from("OFFLINE"));
 
     Assert.assertEquals(selectedMsg.size(), 1);
     Assert.assertEquals(selectedMsg.get(0).getMsgId(), Id.message("msgId_1"));
@@ -76,32 +98,44 @@ public class TestMsgSelectionStage {
     System.out.println("START testMasterXferAfterMasterResume at "
         + new Date(System.currentTimeMillis()));
 
-    Map<String, LiveInstance> liveInstances = new HashMap<String, LiveInstance>();
-    liveInstances.put("localhost_0", new LiveInstance("localhost_0"));
-    liveInstances.put("localhost_1", new LiveInstance("localhost_1"));
-
-    Map<String, String> currentStates = new HashMap<String, String>();
-    currentStates.put("localhost_0", "SLAVE");
-    currentStates.put("localhost_1", "SLAVE");
-
-    Map<String, String> pendingStates = new HashMap<String, String>();
-    pendingStates.put("localhost_1", "MASTER");
+    Map<ParticipantId, Participant> liveInstances = new HashMap<ParticipantId, Participant>();
+    Set<PartitionId> disabledPartitions = Collections.emptySet();
+    Set<String> tags = Collections.emptySet();
+    Map<ResourceId, CurrentState> currentStateMap = Collections.emptyMap();
+    Map<MessageId, Message> messageMap = Collections.emptyMap();
+    RunningInstance runningInstance0 =
+        new RunningInstance(Id.session("session_0"), HelixVersion.from("1.2.3.4"), Id.process("0"));
+    RunningInstance runningInstance1 =
+        new RunningInstance(Id.session("session_1"), HelixVersion.from("1.2.3.4"), Id.process("1"));
+    liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_0"),
+        "localhost", 0, true, disabledPartitions, tags, runningInstance0, currentStateMap,
+        messageMap));
+    liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_1"),
+        "localhost", 1, true, disabledPartitions, tags, runningInstance1, currentStateMap,
+        messageMap));
+
+    Map<ParticipantId, State> currentStates = new HashMap<ParticipantId, State>();
+    currentStates.put(Id.participant("localhost_0"), State.from("SLAVE"));
+    currentStates.put(Id.participant("localhost_1"), State.from("SLAVE"));
+
+    Map<ParticipantId, State> pendingStates = new HashMap<ParticipantId, State>();
+    pendingStates.put(Id.participant("localhost_1"), State.from("MASTER"));
 
     List<Message> messages = new ArrayList<Message>();
     messages.add(TestHelper.createMessage(Id.message("msgId_0"), "SLAVE", "MASTER", "localhost_0",
         "TestDB", "TestDB_0"));
 
-    Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
-    stateConstraints.put("MASTER", new Bounds(0, 1));
-    stateConstraints.put("SLAVE", new Bounds(0, 2));
+    Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
+    stateConstraints.put(State.from("MASTER"), new Bounds(0, 1));
+    stateConstraints.put(State.from("SLAVE"), new Bounds(0, 2));
 
     Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
     stateTransitionPriorities.put("MASTER-SLAVE", 0);
     stateTransitionPriorities.put("SLAVE-MASTER", 1);
 
     List<Message> selectedMsg =
-        new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates,
-            messages, stateConstraints, stateTransitionPriorities, "OFFLINE");
+        new NewMessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates,
+            messages, stateConstraints, stateTransitionPriorities, State.from("OFFLINE"));
 
     Assert.assertEquals(selectedMsg.size(), 0);
     System.out.println("END testMasterXferAfterMasterResume at "

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 7cd942e..a3f38ea 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -39,7 +39,6 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.Attributes;
-import org.apache.helix.model.Partition;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -76,17 +75,17 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     // cluster data cache refresh pipeline
     Pipeline dataRefresh = new Pipeline();
-    dataRefresh.addStage(new ReadClusterDataStage());
+    dataRefresh.addStage(new NewReadClusterDataStage());
 
     // rebalance pipeline
     Pipeline rebalancePipeline = new Pipeline();
-    rebalancePipeline.addStage(new ResourceComputationStage());
-    rebalancePipeline.addStage(new CurrentStateComputationStage());
-    rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new MessageGenerationPhase());
-    rebalancePipeline.addStage(new MessageSelectionStage());
-    rebalancePipeline.addStage(new MessageThrottleStage());
-    rebalancePipeline.addStage(new TaskAssignmentStage());
+    rebalancePipeline.addStage(new NewResourceComputationStage());
+    rebalancePipeline.addStage(new NewCurrentStateComputationStage());
+    rebalancePipeline.addStage(new NewBestPossibleStateCalcStage());
+    rebalancePipeline.addStage(new NewMessageGenerationStage());
+    rebalancePipeline.addStage(new NewMessageSelectionStage());
+    rebalancePipeline.addStage(new NewMessageThrottleStage());
+    rebalancePipeline.addStage(new NewTaskAssignmentStage());
 
     // round1: set node0 currentState to OFFLINE and node1 currentState to OFFLINE
     setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
@@ -96,10 +95,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
-    MessageSelectionStageOutput msgSelOutput =
-        event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    NewMessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
     List<Message> messages =
-        msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+        msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 1, "Should output 1 message: OFFLINE-SLAVE for node0");
     Message message = messages.get(0);
     Assert.assertEquals(message.getFromState().toString(), "OFFLINE");
@@ -114,7 +112,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
-    messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+    messages =
+        msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node1");
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
@@ -220,17 +219,17 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     // cluster data cache refresh pipeline
     Pipeline dataRefresh = new Pipeline();
-    dataRefresh.addStage(new ReadClusterDataStage());
+    dataRefresh.addStage(new NewReadClusterDataStage());
 
     // rebalance pipeline
     Pipeline rebalancePipeline = new Pipeline();
-    rebalancePipeline.addStage(new ResourceComputationStage());
-    rebalancePipeline.addStage(new CurrentStateComputationStage());
-    rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new MessageGenerationPhase());
-    rebalancePipeline.addStage(new MessageSelectionStage());
-    rebalancePipeline.addStage(new MessageThrottleStage());
-    rebalancePipeline.addStage(new TaskAssignmentStage());
+    rebalancePipeline.addStage(new NewResourceComputationStage());
+    rebalancePipeline.addStage(new NewCurrentStateComputationStage());
+    rebalancePipeline.addStage(new NewBestPossibleStateCalcStage());
+    rebalancePipeline.addStage(new NewMessageGenerationStage());
+    rebalancePipeline.addStage(new NewMessageSelectionStage());
+    rebalancePipeline.addStage(new NewMessageThrottleStage());
+    rebalancePipeline.addStage(new NewTaskAssignmentStage());
 
     // round1: set node0 currentState to OFFLINE and node1 currentState to SLAVE
     setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
@@ -240,10 +239,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
-    MessageSelectionStageOutput msgSelOutput =
-        event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    NewMessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
     List<Message> messages =
-        msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+        msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 1, "Should output 1 message: OFFLINE-SLAVE for node0");
     Message message = messages.get(0);
     Assert.assertEquals(message.getFromState().toString(), "OFFLINE");
@@ -258,7 +256,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
-    messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+    messages =
+        msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 1,
         "Should output only 1 message: OFFLINE->DROPPED for localhost_1");
 
@@ -275,7 +274,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
-    messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+    messages =
+        msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 1,
         "Should output 1 message: OFFLINE->DROPPED for localhost_0");
     message = messages.get(0);
@@ -315,17 +315,17 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     // cluster data cache refresh pipeline
     Pipeline dataRefresh = new Pipeline();
-    dataRefresh.addStage(new ReadClusterDataStage());
+    dataRefresh.addStage(new NewReadClusterDataStage());
 
     // rebalance pipeline
     Pipeline rebalancePipeline = new Pipeline();
-    rebalancePipeline.addStage(new ResourceComputationStage());
-    rebalancePipeline.addStage(new CurrentStateComputationStage());
-    rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-    rebalancePipeline.addStage(new MessageGenerationPhase());
-    rebalancePipeline.addStage(new MessageSelectionStage());
-    rebalancePipeline.addStage(new MessageThrottleStage());
-    rebalancePipeline.addStage(new TaskAssignmentStage());
+    rebalancePipeline.addStage(new NewResourceComputationStage());
+    rebalancePipeline.addStage(new NewCurrentStateComputationStage());
+    rebalancePipeline.addStage(new NewBestPossibleStateCalcStage());
+    rebalancePipeline.addStage(new NewMessageGenerationStage());
+    rebalancePipeline.addStage(new NewMessageSelectionStage());
+    rebalancePipeline.addStage(new NewMessageThrottleStage());
+    rebalancePipeline.addStage(new NewTaskAssignmentStage());
 
     // round1: set node1 currentState to SLAVE
     setCurrentState(clusterName, "localhost_1", resourceName, resourceName + "_0", "session_1",
@@ -333,10 +333,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
-    MessageSelectionStageOutput msgSelOutput =
-        event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    NewMessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
     List<Message> messages =
-        msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+        msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 1, "Should output 1 message: SLAVE-MASTER for node1");
     Message message = messages.get(0);
     Assert.assertEquals(message.getFromState().toString(), "SLAVE");
@@ -354,7 +353,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);
     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
-    messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+    messages =
+        msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node0");
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
index 86bd060..d4f3de6 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
@@ -28,6 +28,8 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Id;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.State;
 import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
@@ -35,7 +37,6 @@ import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Resource;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
@@ -63,19 +64,21 @@ public class TestResourceComputationStage extends BaseStageTest {
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
     Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.idealState(resourceName), idealState);
-    ResourceComputationStage stage = new ResourceComputationStage();
-    runStage(event, new ReadClusterDataStage());
+    NewResourceComputationStage stage = new NewResourceComputationStage();
+    runStage(event, new NewReadClusterDataStage());
     runStage(event, stage);
 
-    Map<String, Resource> resource = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resource =
+        event.getAttribute(AttributeName.RESOURCES.toString());
     AssertJUnit.assertEquals(1, resource.size());
 
-    AssertJUnit.assertEquals(resource.keySet().iterator().next(), resourceName);
-    AssertJUnit.assertEquals(resource.values().iterator().next().getResourceName(), resourceName);
-    AssertJUnit.assertEquals(resource.values().iterator().next().getStateModelDefRef(),
-        idealState.getStateModelDefRef());
+    AssertJUnit.assertEquals(resource.keySet().iterator().next(), Id.resource(resourceName));
     AssertJUnit
-        .assertEquals(resource.values().iterator().next().getPartitions().size(), partitions);
+        .assertEquals(resource.values().iterator().next().getId(), Id.resource(resourceName));
+    AssertJUnit.assertEquals(resource.values().iterator().next().getRebalancerConfig()
+        .getStateModelDefId(), idealState.getStateModelDefId());
+    AssertJUnit.assertEquals(resource.values().iterator().next().getPartitionSet().size(),
+        partitions);
   }
 
   @Test
@@ -85,21 +88,23 @@ public class TestResourceComputationStage extends BaseStageTest {
         "testResource1", "testResource2"
     };
     List<IdealState> idealStates = setupIdealState(5, resources, 10, 1, RebalanceMode.SEMI_AUTO);
-    ResourceComputationStage stage = new ResourceComputationStage();
-    runStage(event, new ReadClusterDataStage());
+    NewResourceComputationStage stage = new NewResourceComputationStage();
+    runStage(event, new NewReadClusterDataStage());
     runStage(event, stage);
 
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
     AssertJUnit.assertEquals(resources.length, resourceMap.size());
 
     for (int i = 0; i < resources.length; i++) {
       String resourceName = resources[i];
+      ResourceId resourceId = Id.resource(resourceName);
       IdealState idealState = idealStates.get(i);
       AssertJUnit.assertTrue(resourceMap.containsKey(resourceName));
-      AssertJUnit.assertEquals(resourceMap.get(resourceName).getResourceName(), resourceName);
-      AssertJUnit.assertEquals(resourceMap.get(resourceName).getStateModelDefRef(),
-          idealState.getStateModelDefRef());
-      AssertJUnit.assertEquals(resourceMap.get(resourceName).getPartitions().size(),
+      AssertJUnit.assertEquals(resourceMap.get(resourceId).getId(), resourceId);
+      AssertJUnit.assertEquals(resourceMap.get(resourceId).getRebalancerConfig()
+          .getStateModelDefId(), idealState.getStateModelDefRef());
+      AssertJUnit.assertEquals(resourceMap.get(resourceId).getPartitionSet().size(),
           idealState.getNumPartitions());
     }
   }
@@ -151,41 +156,47 @@ public class TestResourceComputationStage extends BaseStageTest {
     accessor.setProperty(keyBuilder.currentState(instanceName, sessionId, oldResource),
         currentState);
 
-    ResourceComputationStage stage = new ResourceComputationStage();
-    runStage(event, new ReadClusterDataStage());
+    NewResourceComputationStage stage = new NewResourceComputationStage();
+    runStage(event, new NewReadClusterDataStage());
     runStage(event, stage);
 
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
     // +1 because it will have one for current state
     AssertJUnit.assertEquals(resources.length + 1, resourceMap.size());
 
     for (int i = 0; i < resources.length; i++) {
       String resourceName = resources[i];
+      ResourceId resourceId = Id.resource(resourceName);
       IdealState idealState = idealStates.get(i);
-      AssertJUnit.assertTrue(resourceMap.containsKey(resourceName));
-      AssertJUnit.assertEquals(resourceMap.get(resourceName).getResourceName(), resourceName);
-      AssertJUnit.assertEquals(resourceMap.get(resourceName).getStateModelDefRef(),
-          idealState.getStateModelDefRef());
-      AssertJUnit.assertEquals(resourceMap.get(resourceName).getPartitions().size(),
+      AssertJUnit.assertTrue(resourceMap.containsKey(resourceId));
+      AssertJUnit.assertEquals(resourceMap.get(resourceId).getId(), resourceId);
+      AssertJUnit.assertEquals(resourceMap.get(resourceId).getRebalancerConfig()
+          .getStateModelDefId(), idealState.getStateModelDefId());
+      AssertJUnit.assertEquals(resourceMap.get(resourceId).getPartitionSet().size(),
           idealState.getNumPartitions());
     }
     // Test the data derived from CurrentState
-    AssertJUnit.assertTrue(resourceMap.containsKey(oldResource));
-    AssertJUnit.assertEquals(resourceMap.get(oldResource).getResourceName(), oldResource);
-    AssertJUnit.assertEquals(resourceMap.get(oldResource).getStateModelDefRef(),
-        currentState.getStateModelDefRef());
-    AssertJUnit.assertEquals(resourceMap.get(oldResource).getPartitions().size(), currentState
-        .getPartitionStateStringMap().size());
-    AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_0"));
-    AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_1"));
-    AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_2"));
+    ResourceId oldResourceId = Id.resource(oldResource);
+    AssertJUnit.assertTrue(resourceMap.containsKey(oldResourceId));
+    AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getId(), oldResourceId);
+    AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getRebalancerConfig()
+        .getStateModelDefId(), currentState.getStateModelDefId());
+    AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getPartitionSet().size(), currentState
+        .getPartitionStateMap().size());
+    AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getPartition(
+        Id.partition("testResourceOld_0")));
+    AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getPartition(
+        Id.partition("testResourceOld_1")));
+    AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getPartition(
+        Id.partition("testResourceOld_2")));
 
   }
 
   @Test
   public void testNull() {
     ClusterEvent event = new ClusterEvent("sampleEvent");
-    ResourceComputationStage stage = new ResourceComputationStage();
+    NewResourceComputationStage stage = new NewResourceComputationStage();
     StageContext context = new StageContext();
     stage.init(context);
     stage.preProcess();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
index 747a185..d8afec5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -26,10 +26,10 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper;
 import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -176,7 +176,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
           TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
       _startCMResultMap.put(storageNodeName, resultx);
     }
-    Thread.sleep(1000);
+    Thread.sleep(5000);
     result =
         ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
             CLUSTER_NAME, TEST_DB));