You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/09/14 01:32:21 UTC

[1/3] git commit: [HELIX-242] Re-integrate the scheduler rebalancing into the new controller pipeline

Updated Branches:
  refs/heads/helix-logical-model d8ef5a2d7 -> 0a8baa12f


[HELIX-242] Re-integrate the scheduler rebalancing into the new controller pipeline


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/4e5e3e1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/4e5e3e1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/4e5e3e1d

Branch: refs/heads/helix-logical-model
Commit: 4e5e3e1d18f8f3180acab64b17e8a3cc566639a2
Parents: d8ef5a2
Author: zzhang <zz...@apache.org>
Authored: Fri Sep 13 16:21:44 2013 -0700
Committer: zzhang <zz...@apache.org>
Committed: Fri Sep 13 16:21:44 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/api/Resource.java     |  65 ++-
 .../apache/helix/api/SchedulerTaskConfig.java   |  61 ++-
 .../org/apache/helix/api/StateModelDefId.java   |   7 +
 .../stages/NewExternalViewComputeStage.java     |  77 ++--
 .../stages/NewMessageGenerationStage.java       |  23 +-
 .../java/org/apache/helix/model/Message.java    |  60 ++-
 .../statemachine/ScheduledTaskStateModel.java   |   1 +
 .../helix/integration/TestSchedulerMessage.java | 409 ++++---------------
 .../integration/TestSchedulerMsgContraints.java | 253 ++++++++++++
 .../integration/TestSchedulerMsgUsingQueue.java | 181 ++++++++
 10 files changed, 697 insertions(+), 440 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index a33f0d6..2ee7a4f 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -47,9 +47,7 @@ public class Resource {
       ExternalView externalView, UserConfig userConfig,
       Map<PartitionId, UserConfig> partitionUserConfigs, int liveParticipantCount) {
     Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
-    Map<PartitionId, Map<String, String>> schedulerTaskConfigMap =
         new HashMap<PartitionId, Map<String, String>>();
-    Map<String, Integer> transitionTimeoutMap = new HashMap<String, Integer>();
     Set<PartitionId> partitionSet = idealState.getPartitionSet();
     if (partitionSet.isEmpty() && idealState.getNumPartitions() > 0) {
       partitionSet = new HashSet<PartitionId>();
@@ -57,6 +55,7 @@ public class Resource {
         partitionSet.add(Id.partition(id, Integer.toString(i)));
       }
     }
+
     for (PartitionId partitionId : partitionSet) {
       UserConfig partitionUserConfig = partitionUserConfigs.get(partitionId);
       if (partitionUserConfig == null) {
@@ -64,27 +63,10 @@ public class Resource {
       }
       partitionMap.put(partitionId, new Partition(partitionId, partitionUserConfig));
 
-      // TODO refactor it
-      Map<String, String> taskConfigMap = idealState.getInstanceStateMap(partitionId.stringify());
-      if (taskConfigMap != null) {
-        schedulerTaskConfigMap.put(partitionId, taskConfigMap);
-      }
-
-      // TODO refactor it
-      for (String simpleKey : idealState.getRecord().getSimpleFields().keySet()) {
-        if (simpleKey.indexOf("_" + Message.Attributes.TIMEOUT) != -1) {
-          try {
-            String timeoutStr = idealState.getRecord().getSimpleField(simpleKey);
-            int timeout = Integer.parseInt(timeoutStr);
-            transitionTimeoutMap.put(simpleKey, timeout);
-          } catch (Exception e) {
-            // ignore
-          }
-        }
-      }
     }
-    SchedulerTaskConfig schedulerTaskConfig =
-        new SchedulerTaskConfig(transitionTimeoutMap, schedulerTaskConfigMap);
+
+    SchedulerTaskConfig schedulerTaskConfig = schedulerTaskConfig(idealState);
+
     RebalancerConfig rebalancerConfig =
         new RebalancerConfig(partitionMap, idealState, resourceAssignment, liveParticipantCount);
 
@@ -95,6 +77,45 @@ public class Resource {
   }
 
   /**
+   * Extract scheduler-task config from ideal-state if state-model-def is SchedulerTaskQueue
+   * @param idealState
+   * @return scheduler-task config or null if state-model-def is not SchedulerTaskQueue
+   */
+  SchedulerTaskConfig schedulerTaskConfig(IdealState idealState) {
+    if (!idealState.getStateModelDefId().equalsIgnoreCase(StateModelDefId.SchedulerTaskQueue)) {
+      return null;
+    }
+
+    // TODO refactor get timeout
+    Map<String, Integer> transitionTimeoutMap = new HashMap<String, Integer>();
+    for (String simpleKey : idealState.getRecord().getSimpleFields().keySet()) {
+      if (simpleKey.indexOf(Message.Attributes.TIMEOUT.name()) != -1) {
+        try {
+          String timeoutStr = idealState.getRecord().getSimpleField(simpleKey);
+          int timeout = Integer.parseInt(timeoutStr);
+          transitionTimeoutMap.put(simpleKey, timeout);
+        } catch (Exception e) {
+          // ignore
+        }
+      }
+    }
+
+    Map<PartitionId, Message> innerMsgMap = new HashMap<PartitionId, Message>();
+    for (PartitionId partitionId : idealState.getPartitionSet()) {
+      // TODO refactor: scheduler-task-queue state model uses map-field to store inner-messages
+      // this is different from all other state-models
+      Map<String, String> innerMsgStrMap =
+          idealState.getRecord().getMapField(partitionId.stringify());
+      if (innerMsgStrMap != null) {
+        Message innerMsg = Message.toMessage(innerMsgStrMap);
+        innerMsgMap.put(partitionId, innerMsg);
+      }
+    }
+
+    return new SchedulerTaskConfig(transitionTimeoutMap, innerMsgMap);
+  }
+
+  /**
    * Get the partitions of the resource
    * @return map of partition id to partition or empty map if none
    */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/main/java/org/apache/helix/api/SchedulerTaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/SchedulerTaskConfig.java b/helix-core/src/main/java/org/apache/helix/api/SchedulerTaskConfig.java
index ac7cb3a..e7d0779 100644
--- a/helix-core/src/main/java/org/apache/helix/api/SchedulerTaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/SchedulerTaskConfig.java
@@ -1,6 +1,7 @@
 package org.apache.helix.api;
 
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.model.Message;
 
@@ -10,38 +11,58 @@ public class SchedulerTaskConfig {
   // TODO refactor using Transition logical model
   private final Map<String, Integer> _transitionTimeoutMap;
 
-  // TODO refactor this when understand inner message format
-  private final Map<PartitionId, Map<String, String>> _schedulerTaskConfig;
+  private final Map<PartitionId, Message> _innerMessageMap;
 
   public SchedulerTaskConfig(Map<String, Integer> transitionTimeoutMap,
-      Map<PartitionId, Map<String, String>> schedulerTaskConfig) {
+      Map<PartitionId, Message> innerMsgMap) {
     _transitionTimeoutMap = ImmutableMap.copyOf(transitionTimeoutMap);
-    _schedulerTaskConfig = ImmutableMap.copyOf(schedulerTaskConfig);
+    _innerMessageMap = ImmutableMap.copyOf(innerMsgMap);
   }
 
-  public Map<String, String> getTaskConfig(PartitionId partitionId) {
-    return _schedulerTaskConfig.get(partitionId);
+  /**
+   * Get inner message for a partition
+   * @param partitionId
+   * @return inner message
+   */
+  public Message getInnerMessage(PartitionId partitionId) {
+    return _innerMessageMap.get(partitionId);
   }
 
-  public Integer getTransitionTimeout(String transition) {
-    return _transitionTimeoutMap.get(transition);
+  /**
+   * Get timeout for a transition
+   * @param transition
+   * @return timeout or -1 if not available
+   */
+  public int getTransitionTimeout(String transition) {
+    Integer timeout = _transitionTimeoutMap.get(transition);
+    if (timeout == null) {
+      return -1;
+    }
+
+    return timeout;
   }
 
-  public Integer getTimeout(String transition, PartitionId partitionId) {
+  /**
+   * Get timeout for an inner message
+   * @param transition
+   * @param partitionId
+   * @return timeout or -1 if not available
+   */
+  public int getTimeout(String transition, PartitionId partitionId) {
     Integer timeout = getTransitionTimeout(transition);
     if (timeout == null) {
-      Map<String, String> taskConfig = getTaskConfig(partitionId);
-      if (taskConfig != null) {
-        String timeoutStr = taskConfig.get(Message.Attributes.TIMEOUT.toString());
-        if (timeoutStr != null) {
-          try {
-            timeout = Integer.parseInt(timeoutStr);
-          } catch (Exception e) {
-            // ignore
-          }
-        }
-      }
+      Message innerMessage = getInnerMessage(partitionId);
+      timeout = innerMessage.getTimeout();
     }
+
     return timeout;
   }
+
+  /**
+   * Get partition-id set
+   * @return partition-id set
+   */
+  public Set<PartitionId> getPartitionSet() {
+    return _innerMessageMap.keySet();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/main/java/org/apache/helix/api/StateModelDefId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/StateModelDefId.java b/helix-core/src/main/java/org/apache/helix/api/StateModelDefId.java
index 0ac4cb9..f77cec8 100644
--- a/helix-core/src/main/java/org/apache/helix/api/StateModelDefId.java
+++ b/helix-core/src/main/java/org/apache/helix/api/StateModelDefId.java
@@ -1,5 +1,7 @@
 package org.apache.helix.api;
 
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -20,6 +22,8 @@ package org.apache.helix.api;
  */
 
 public class StateModelDefId extends Id {
+  public static final StateModelDefId SchedulerTaskQueue = Id
+      .stateModelDef(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE);
   private final String _id;
 
   public StateModelDefId(String id) {
@@ -31,4 +35,7 @@ public class StateModelDefId extends Id {
     return _id;
   }
 
+  public boolean equalsIgnoreCase(StateModelDefId that) {
+    return _id.equalsIgnoreCase(that._id);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
index 4a037f3..6361dcf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
@@ -41,7 +41,9 @@ import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.SchedulerTaskConfig;
 import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
@@ -54,12 +56,12 @@ import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
 public class NewExternalViewComputeStage extends AbstractBaseStage {
-  private static Logger log = Logger.getLogger(ExternalViewComputeStage.class);
+  private static Logger LOG = Logger.getLogger(ExternalViewComputeStage.class);
 
   @Override
   public void process(ClusterEvent event) throws Exception {
     long startTime = System.currentTimeMillis();
-    log.info("START ExternalViewComputeStage.process()");
+    LOG.info("START ExternalViewComputeStage.process()");
 
     HelixManager manager = event.getAttribute("helixmanager");
     Map<ResourceId, ResourceConfig> resourceMap =
@@ -80,6 +82,7 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
     List<ExternalView> newExtViews = new ArrayList<ExternalView>();
     List<PropertyKey> keys = new ArrayList<PropertyKey>();
 
+    // TODO use external-view accessor
     Map<String, ExternalView> curExtViews =
         dataAccessor.getChildValuesMap(keyBuilder.externalViews());
 
@@ -90,6 +93,8 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
       // otherwise resource has been dropped, use bucket size from current state instead
       ResourceConfig resource = resourceMap.get(resourceId);
       RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+      SchedulerTaskConfig schedulerTaskConfig = resource.getSchedulerTaskConfig();
+
       if (resource.getBucketSize() > 0) {
         view.setBucketSize(resource.getBucketSize());
       } else {
@@ -137,10 +142,9 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
         // scheduler
         // message, and then remove the partitions from the ideal state
         if (rebalancerConfig != null
-            && rebalancerConfig.getStateModelDefId().stringify()
-                .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-          // TODO fix it
-          // updateScheduledTaskStatus(view, manager, idealState);
+            && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
+                StateModelDefId.SchedulerTaskQueue)) {
+          updateScheduledTaskStatus(resourceId, view, manager, schedulerTaskConfig);
         }
       }
     }
@@ -160,13 +164,16 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
     }
 
     long endTime = System.currentTimeMillis();
-    log.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
+    LOG.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
   }
 
   // TODO fix it
-  private void updateScheduledTaskStatus(ExternalView ev, HelixManager manager,
-      IdealState taskQueueIdealState) {
+  private void updateScheduledTaskStatus(ResourceId resourceId, ExternalView ev,
+      HelixManager manager,
+      SchedulerTaskConfig schedulerTaskConfig) {
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
     ZNRecord finishedTasks = new ZNRecord(ev.getResourceName());
 
     // Place holder for finished partitions
@@ -177,23 +184,21 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
     Map<String, Map<String, String>> controllerMsgUpdates =
         new HashMap<String, Map<String, String>>();
 
-    Builder keyBuilder = accessor.keyBuilder();
-
     for (String taskPartitionName : ev.getPartitionStringSet()) {
       for (String taskState : ev.getStateMap(taskPartitionName).values()) {
         if (taskState.equalsIgnoreCase(HelixDefinedState.ERROR.toString())
             || taskState.equalsIgnoreCase("COMPLETED")) {
-          log.info(taskPartitionName + " finished as " + taskState);
-          finishedTasks.getListFields().put(taskPartitionName, emptyList);
-          finishedTasks.getMapFields().put(taskPartitionName, emptyMap);
+          LOG.info(taskPartitionName + " finished as " + taskState);
+          finishedTasks.setListField(taskPartitionName, emptyList);
+          finishedTasks.setMapField(taskPartitionName, emptyMap);
 
           // Update original scheduler message status update
-          if (taskQueueIdealState.getRecord().getMapField(taskPartitionName) != null) {
-            String controllerMsgId =
-                taskQueueIdealState.getRecord().getMapField(taskPartitionName)
-                    .get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
+          Message innerMessage =
+              schedulerTaskConfig.getInnerMessage(Id.partition(taskPartitionName));
+          if (innerMessage != null) {
+            String controllerMsgId = innerMessage.getControllerMessagId();
             if (controllerMsgId != null) {
-              log.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
+              LOG.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
               if (!controllerMsgUpdates.containsKey(controllerMsgId)) {
                 controllerMsgUpdates.put(controllerMsgId, new HashMap<String, String>());
               }
@@ -204,16 +209,16 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
       }
     }
     // fill the controllerMsgIdCountMap
-    for (String taskId : taskQueueIdealState.getPartitionStringSet()) {
-      String controllerMsgId =
-          taskQueueIdealState.getRecord().getMapField(taskId)
-              .get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
+    for (PartitionId taskId : schedulerTaskConfig.getPartitionSet()) {
+      Message innerMessage = schedulerTaskConfig.getInnerMessage(taskId);
+      String controllerMsgId = innerMessage.getControllerMessagId();
+
       if (controllerMsgId != null) {
-        if (!controllerMsgIdCountMap.containsKey(controllerMsgId)) {
-          controllerMsgIdCountMap.put(controllerMsgId, 0);
+        Integer curCnt = controllerMsgIdCountMap.get(controllerMsgId);
+        if (curCnt == null) {
+          curCnt = 0;
         }
-        controllerMsgIdCountMap.put(controllerMsgId,
-            (controllerMsgIdCountMap.get(controllerMsgId) + 1));
+        controllerMsgIdCountMap.put(controllerMsgId, curCnt + 1);
       }
     }
 
@@ -223,18 +228,16 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
             keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), controllerMsgId);
         StatusUpdate controllerStatusUpdate = accessor.getProperty(controllerStatusUpdateKey);
         for (String taskPartitionName : controllerMsgUpdates.get(controllerMsgId).keySet()) {
+          Message innerMessage =
+              schedulerTaskConfig.getInnerMessage(Id.partition(taskPartitionName));
+
           Map<String, String> result = new HashMap<String, String>();
           result.put("Result", controllerMsgUpdates.get(controllerMsgId).get(taskPartitionName));
           controllerStatusUpdate.getRecord().setMapField(
-              "MessageResult "
-                  + taskQueueIdealState.getRecord().getMapField(taskPartitionName)
-                      .get(Message.Attributes.TGT_NAME.toString())
-                  + " "
-                  + taskPartitionName
-                  + " "
-                  + taskQueueIdealState.getRecord().getMapField(taskPartitionName)
-                      .get(Message.Attributes.MSG_ID.toString()), result);
+              "MessageResult " + innerMessage.getTgtName() + " " + taskPartitionName + " "
+                  + innerMessage.getMsgId(), result);
         }
+
         // All done for the scheduled tasks that came from controllerMsgId, add summary for it
         if (controllerMsgUpdates.get(controllerMsgId).size() == controllerMsgIdCountMap.get(
             controllerMsgId).intValue()) {
@@ -266,12 +269,12 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
       ZNRecordDelta znDelta = new ZNRecordDelta(finishedTasks, MergeOperation.SUBTRACT);
       List<ZNRecordDelta> deltaList = new LinkedList<ZNRecordDelta>();
       deltaList.add(znDelta);
-      IdealState delta = new IdealState(taskQueueIdealState.getResourceName());
+      IdealState delta = new IdealState(resourceId);
       delta.setDeltaList(deltaList);
 
       // Remove the finished (COMPLETED or ERROR) tasks from the SCHEDULER_TASK_RESOURCE idealstate
       keyBuilder = accessor.keyBuilder();
-      accessor.updateProperty(keyBuilder.idealState(taskQueueIdealState.getResourceName()), delta);
+      accessor.updateProperty(keyBuilder.idealState(resourceId.stringify()), delta);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
index a9fc51d..0c9fe9a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
@@ -35,6 +35,7 @@ import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.SchedulerTaskConfig;
 import org.apache.helix.api.SessionId;
 import org.apache.helix.api.State;
 import org.apache.helix.api.StateModelDefId;
@@ -139,29 +140,29 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
                     nextState, sessionId, new StateModelDefId(stateModelDef.getId()), resourceConfig
                         .getRebalancerConfig().getStateModelFactoryId(), bucketSize);
 
-            // TODO refactor set timeout logic, it's really messy
+            // TODO refactor get/set timeout/inner-message
             RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
             if (rebalancerConfig != null
-                && rebalancerConfig.getStateModelDefId().stringify()
-                    .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+                && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
+                    StateModelDefId.SchedulerTaskQueue)) {
               if (resourceConfig.getPartitionMap().size() > 0) {
                 // TODO refactor it -- we need a way to read in scheduler tasks a priori
                 Resource activeResource = cluster.getResource(resourceId);
                 if (activeResource != null) {
-                  message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
-                      activeResource.getSchedulerTaskConfig().getTaskConfig(partitionId));
+                  message.setInnerMessage(activeResource.getSchedulerTaskConfig().getInnerMessage(
+                      partitionId));
                 }
               }
             }
 
             // Set timeout if needed
             String stateTransition =
-                currentState + "-" + nextState + "_" + Message.Attributes.TIMEOUT;
-
-            if (resourceConfig.getSchedulerTaskConfig() != null) {
-              Integer timeout =
-                  resourceConfig.getSchedulerTaskConfig().getTimeout(stateTransition, partitionId);
-              if (timeout != null && timeout > 0) {
+                String.format("%s-%s_%s", currentState, nextState,
+                    Message.Attributes.TIMEOUT.name());
+            SchedulerTaskConfig schedulerTaskConfig = resourceConfig.getSchedulerTaskConfig();
+            if (schedulerTaskConfig != null) {
+              int timeout = schedulerTaskConfig.getTimeout(stateTransition, partitionId);
+              if (timeout > 0) {
                 message.setExecutionTimeout(timeout);
               }
             }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index b2b538d..68672e3 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -40,6 +40,7 @@ import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.SessionId;
 import org.apache.helix.api.State;
 import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 
 import com.google.common.collect.ImmutableList;
 
@@ -162,6 +163,22 @@ public class Message extends HelixProperty {
   }
 
   /**
+   * Convert a string map to a message
+   * @param msgStrMap
+   * @return message
+   */
+  public static Message toMessage(Map<String, String> msgStrMap) {
+    String msgId = msgStrMap.get(Attributes.MSG_ID.name());
+    if (msgId == null) {
+      throw new IllegalArgumentException("Missing msgId in message string map: " + msgStrMap);
+    }
+
+    ZNRecord record = new ZNRecord(msgId);
+    record.getSimpleFields().putAll(msgStrMap);
+    return new Message(record);
+  }
+
+  /**
    * Set the time that the message was created
    * @param timestamp a UNIX timestamp
    */
@@ -647,16 +664,6 @@ public class Message extends HelixProperty {
     return builder.build();
   }
 
-  // public AtomicInteger getGroupMsgCountDown()
-  // {
-  // return _groupMsgCountDown;
-  // }
-  //
-  // public void setGroupMsgCountDown(AtomicInteger countDown)
-  // {
-  // _groupMsgCountDown = countDown;
-  // }
-
   /**
    * Check if this message is targetted for a controller
    * @return true if this is a controller message, false otherwise
@@ -679,6 +686,39 @@ public class Message extends HelixProperty {
     }
   }
 
+  /**
+   * Get timeout
+   * @return timeout or -1 if not available
+   */
+  public int getTimeout() {
+    String timeoutStr = _record.getSimpleField(Attributes.TIMEOUT.name());
+    int timeout = -1;
+    if (timeoutStr != null) {
+      try {
+        timeout = Integer.parseInt(timeoutStr);
+      } catch (Exception e) {
+        // ignore
+      }
+    }
+    return timeout;
+  }
+
+  /**
+   * Get controller message id, used for scheduler-task-queue state model only
+   * @return controller message id
+   */
+  public String getControllerMessagId() {
+    return _record.getSimpleField(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
+  }
+
+  /**
+   * Set an inner message
+   * @param inner message
+   */
+  public void setInnerMessage(Message message) {
+    _record.setMapField(Attributes.INNER_MESSAGE.name(), message.getRecord().getSimpleFields());
+  }
+
   private boolean isNullOrEmpty(String data) {
     return data == null || data.length() == 0 || data.trim().length() == 0;
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
index 8b6a02c..ca67d42 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
@@ -52,6 +52,7 @@ public class ScheduledTaskStateModel extends StateModel {
   public void onBecomeCompletedFromOffline(Message message, NotificationContext context)
       throws InterruptedException {
     logger.info(_partitionName + " onBecomeCompletedFromOffline");
+    // System.err.println("\t\t" + _partitionName + " onBecomeCompletedFromOffline");
 
     // Construct the inner task message from the mapfields of scheduledTaskQueue resource group
     Map<String, String> messageInfo =

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 38a41be..6d66347 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -22,6 +22,7 @@ package org.apache.helix.integration;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -34,11 +35,13 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.helix.Criteria;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Id;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
@@ -48,6 +51,7 @@ import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
@@ -74,7 +78,6 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     @Override
     public void onTimeOut() {
       // TODO Auto-generated method stub
-
     }
 
     @Override
@@ -86,10 +89,17 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
   TestMessagingHandlerFactory _factory = new TestMessagingHandlerFactory();
 
   public static class TestMessagingHandlerFactory implements MessageHandlerFactory {
+    int cnt;
     public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
 
+    public TestMessagingHandlerFactory() {
+      super();
+      cnt = 0;
+    }
+
     @Override
     public MessageHandler createHandler(Message message, NotificationContext context) {
+      // System.out.println("\t create-hdlr: " + message.getId());
       return new TestMessagingHandler(message, context);
     }
 
@@ -114,73 +124,20 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
       public HelixTaskResult handleMessage() throws InterruptedException {
         HelixTaskResult result = new HelixTaskResult();
         result.setSuccess(true);
-        String destName = _message.getTgtName();
-        result.getTaskResultMap().put("Message", _message.getMsgId().stringify());
-        synchronized (_results) {
-          if (!_results.containsKey(_message.getPartitionId().stringify())) {
-            _results
-                .put(_message.getPartitionId().stringify(), new ConcurrentSkipListSet<String>());
-          }
-        }
-        _results.get(_message.getPartitionId().stringify()).add(_message.getMsgId().stringify());
-        // System.err.println("Message " + _message.getMsgId() + " executed");
-        return result;
-      }
-
-      @Override
-      public void onError(Exception e, ErrorCode code, ErrorType type) {
-        // TODO Auto-generated method stub
-      }
-    }
-  }
-
-  public static class TestMessagingHandlerFactoryLatch implements MessageHandlerFactory {
-    public volatile CountDownLatch _latch = new CountDownLatch(1);
-    public int _messageCount = 0;
-    public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
-
-    @Override
-    public synchronized MessageHandler createHandler(Message message, NotificationContext context) {
-      _messageCount++;
-      return new TestMessagingHandlerLatch(message, context);
-    }
-
-    public synchronized void signal() {
-      _latch.countDown();
-      _latch = new CountDownLatch(1);
-    }
-
-    @Override
-    public String getMessageType() {
-      return "TestMessagingHandlerLatch";
-    }
-
-    @Override
-    public void reset() {
-      // TODO Auto-generated method stub
-    }
-
-    public class TestMessagingHandlerLatch extends MessageHandler {
-      public TestMessagingHandlerLatch(Message message, NotificationContext context) {
-        super(message, context);
-        // TODO Auto-generated constructor stub
-      }
+        // String tgtName = _message.getTgtName();
+        String messageId = _message.getMsgId().stringify();
+        String partitionId = _message.getPartitionId().stringify();
 
-      @Override
-      public HelixTaskResult handleMessage() throws InterruptedException {
-        _latch.await();
-        HelixTaskResult result = new HelixTaskResult();
-        result.setSuccess(true);
-        result.getTaskResultMap().put("Message", _message.getMsgId().stringify());
-        String destName = _message.getTgtName();
+        result.getTaskResultMap().put("Message", messageId);
         synchronized (_results) {
-          if (!_results.containsKey(_message.getPartitionId().stringify())) {
-            _results
-                .put(_message.getPartitionId().stringify(), new ConcurrentSkipListSet<String>());
+          if (!_results.containsKey(partitionId)) {
+            _results.put(partitionId, new HashSet<String>());
           }
+          _results.get(partitionId).add(messageId);
         }
-        _results.get(_message.getPartitionId().stringify()).add(destName);
-        // System.err.println("Message " + _message.getMsgId() + " executed");
+        cnt++;
+        // System.err.println(cnt + ": message " + messageId + ", tgtName: " + tgtName
+        // + ", partitionId: " + partitionId);
         return result;
       }
 
@@ -192,96 +149,8 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
   }
 
   @Test()
-  public void TestSchedulerMsgUsingQueue() throws Exception {
-    Logger.getRootLogger().setLevel(Level.INFO);
-    _factory._results.clear();
-    HelixManager manager = null;
-    for (int i = 0; i < NODE_NR; i++) {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
-          _factory.getMessageType(), _factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
-    }
-
-    Message schedulerMessage =
-        new Message(MessageType.SCHEDULER_MSG + "", Id.message(UUID.randomUUID().toString()));
-    schedulerMessage.setTgtSessionId(Id.session("*"));
-    schedulerMessage.setTgtName("CONTROLLER");
-    // TODO: change it to "ADMIN" ?
-    schedulerMessage.setSrcName("CONTROLLER");
-    schedulerMessage.getRecord().setSimpleField(
-        DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg");
-    // Template for the individual message sent to each participant
-    Message msg = new Message(_factory.getMessageType(), Id.message("Template"));
-    msg.setTgtSessionId(Id.session("*"));
-    msg.setMsgState(MessageState.NEW);
-
-    // Criteria to send individual messages
-    Criteria cr = new Criteria();
-    cr.setInstanceName("localhost_%");
-    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    cr.setSessionSpecific(false);
-    cr.setResource("%");
-    cr.setPartition("%");
-
-    ObjectMapper mapper = new ObjectMapper();
-    SerializationConfig serializationConfig = mapper.getSerializationConfig();
-    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
-    StringWriter sw = new StringWriter();
-    mapper.writeValue(sw, cr);
-
-    String crString = sw.toString();
-
-    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
-    schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
-    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
-
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-    helixDataAccessor.createProperty(
-        keyBuilder.controllerMessage(schedulerMessage.getMsgId().stringify()), schedulerMessage);
-
-    for (int i = 0; i < 30; i++) {
-      Thread.sleep(2000);
-      if (_PARTITIONS == _factory._results.size()) {
-        break;
-      }
-    }
-
-    Assert.assertEquals(_PARTITIONS, _factory._results.size());
-    PropertyKey controllerTaskStatus =
-        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), schedulerMessage
-            .getMsgId().stringify());
-
-    int messageResultCount = 0;
-    for (int i = 0; i < 10; i++) {
-      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
-      Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
-          .equals("" + (_PARTITIONS * 3)));
-      for (String key : statusUpdate.getMapFields().keySet()) {
-        if (key.startsWith("MessageResult ")) {
-          messageResultCount++;
-        }
-      }
-      if (messageResultCount == _PARTITIONS * 3) {
-        break;
-      } else {
-        Thread.sleep(2000);
-      }
-    }
-    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
-    int count = 0;
-    for (Set<String> val : _factory._results.values()) {
-      count += val.size();
-    }
-    Assert.assertEquals(count, _PARTITIONS * 3);
-
-  }
-
-  @Test()
-  public void TestSchedulerMsg() throws Exception {
-    Logger.getRootLogger().setLevel(Level.INFO);
+  public void testSchedulerMsg() throws Exception {
+    // Logger.getRootLogger().setLevel(Level.INFO);
     _factory._results.clear();
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
@@ -419,7 +288,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
   }
 
   @Test()
-  public void TestSchedulerMsg2() throws Exception {
+  public void testSchedulerMsg2() throws Exception {
     _factory._results.clear();
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
@@ -510,7 +379,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
   }
 
   @Test()
-  public void TestSchedulerZeroMsg() throws Exception {
+  public void testSchedulerZeroMsg() throws Exception {
     TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
@@ -562,15 +431,13 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     Thread.sleep(3000);
 
     Assert.assertEquals(0, factory._results.size());
+
+    waitMessageUpdate("SentMessageCount", schedulerMessage.getMsgId().stringify(), helixDataAccessor);
     PropertyKey controllerTaskStatus =
         keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), schedulerMessage
             .getMsgId().stringify());
-    for (int i = 0; i < 10; i++) {
-      StatusUpdate update = helixDataAccessor.getProperty(controllerTaskStatus);
-      if (update == null || update.getRecord().getMapField("SentMessageCount") == null) {
-        Thread.sleep(1000);
-      }
-    }
+    waitMessageUpdate("SentMessageCount", schedulerMessage.getMsgId().stringify(),
+        helixDataAccessor);
     ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
     Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount").equals("0"));
     int count = 0;
@@ -581,14 +448,14 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
   }
 
   @Test()
-  public void TestSchedulerMsg3() throws Exception {
+  public void testSchedulerMsg3() throws Exception {
     _factory._results.clear();
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
       String hostDest = "localhost_" + (START_PORT + i);
       _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
           _factory.getMessageType(), _factory);
-      //
+
       _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
           _factory.getMessageType(), _factory);
       manager = _startCMResultMap.get(hostDest)._manager;
@@ -667,15 +534,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
       HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
       Builder keyBuilder = helixDataAccessor.keyBuilder();
 
-      for (int j = 0; j < 100; j++) {
-        Thread.sleep(200);
-        PropertyKey controllerTaskStatus =
-            keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
-        ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
-        if (statusUpdate.getMapFields().containsKey("Summary")) {
-          break;
-        }
-      }
+      waitMessageUpdate("Summary", msgId, helixDataAccessor);
 
       PropertyKey controllerTaskStatus =
           keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
@@ -702,14 +561,13 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
   }
 
   @Test()
-  public void TestSchedulerMsg4() throws Exception {
+  public void testSchedulerMsg4() throws Exception {
     _factory._results.clear();
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
       String hostDest = "localhost_" + (START_PORT + i);
       _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
           _factory.getMessageType(), _factory);
-      //
       _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
           _factory.getMessageType(), _factory);
       manager = _startCMResultMap.get(hostDest)._manager;
@@ -780,8 +638,8 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
         callback._message.getResultMap()
             .get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
 
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
+    final HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+    final Builder keyBuilder = helixDataAccessor.keyBuilder();
     ArrayList<String> msgIds = new ArrayList<String>();
     for (int i = 0; i < NODE_NR; i++) {
       callback = new MockAsyncCallback();
@@ -793,6 +651,11 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
       sw = new StringWriter();
       mapper.writeValue(sw, cr);
       schedulerMessage.setMsgId(Id.message(UUID.randomUUID().toString()));
+
+      // need to use a different name for scheduler_task_queue task resource
+      schedulerMessage.getRecord().setSimpleField(
+          DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg4_" + i);
+
       crString = sw.toString();
       schedulerMessage.getRecord().setSimpleField("Criteria", crString);
       manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
@@ -801,18 +664,11 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
               DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
       msgIds.add(msgId);
     }
+
     for (int i = 0; i < NODE_NR; i++) {
-      String msgId = msgIds.get(i);
-      for (int j = 0; j < 100; j++) {
-        Thread.sleep(200);
-        PropertyKey controllerTaskStatus =
-            keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
-        ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
-        if (statusUpdate.getMapFields().containsKey("Summary")) {
-          // System.err.println(msgId+" done");
-          break;
-        }
-      }
+      final String msgId = msgIds.get(i);
+
+      waitMessageUpdate("Summary", msgId, helixDataAccessor);
 
       PropertyKey controllerTaskStatus =
           keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
@@ -825,22 +681,11 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
           messageResultCount++;
         }
       }
-      if (messageResultCount != _PARTITIONS * 3 / 5) {
-        int x = 10;
-        x = x + messageResultCount;
-      }
       Assert.assertEquals(messageResultCount, _PARTITIONS * 3 / 5);
     }
 
-    for (int j = 0; j < 100; j++) {
-      Thread.sleep(200);
-      PropertyKey controllerTaskStatus =
-          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgIdPrime);
-      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
-      if (statusUpdate.getMapFields().containsKey("Summary")) {
-        break;
-      }
-    }
+    waitMessageUpdate("Summary", msgIdPrime, helixDataAccessor);
+
     int count = 0;
     for (Set<String> val : _factory._results.values()) {
       // System.out.println(val);
@@ -850,147 +695,31 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     Assert.assertEquals(count, _PARTITIONS * 3 * 2);
   }
 
-  @Test
-  public void TestSchedulerMsgContraints() throws JsonGenerationException, JsonMappingException,
-      IOException, InterruptedException {
-    TestMessagingHandlerFactoryLatch factory = new TestMessagingHandlerFactoryLatch();
-    HelixManager manager = null;
-    for (int i = 0; i < NODE_NR; i++) {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
-          factory.getMessageType(), factory);
-      //
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
-          factory.getMessageType(), factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
-    }
-
-    Message schedulerMessage =
-        new Message(MessageType.SCHEDULER_MSG + "", Id.message(UUID.randomUUID().toString()));
-    schedulerMessage.setTgtSessionId(Id.session("*"));
-    schedulerMessage.setTgtName("CONTROLLER");
-    // TODO: change it to "ADMIN" ?
-    schedulerMessage.setSrcName("CONTROLLER");
-
-    // Template for the individual message sent to each participant
-    Message msg = new Message(factory.getMessageType(), Id.message("Template"));
-    msg.setTgtSessionId(Id.session("*"));
-    msg.setMsgState(MessageState.NEW);
-
-    // Criteria to send individual messages
-    Criteria cr = new Criteria();
-    cr.setInstanceName("localhost_%");
-    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    cr.setSessionSpecific(false);
-    cr.setResource("%");
-    cr.setPartition("%");
-
-    ObjectMapper mapper = new ObjectMapper();
-    SerializationConfig serializationConfig = mapper.getSerializationConfig();
-    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
-    StringWriter sw = new StringWriter();
-    mapper.writeValue(sw, cr);
-
-    String crString = sw.toString();
-
-    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
-    schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
-    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
-    schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
-    schedulerMessage.getRecord().setSimpleField(
-        DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgContraints");
-
-    Criteria cr2 = new Criteria();
-    cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
-    cr2.setInstanceName("*");
-    cr2.setSessionSpecific(false);
-
-    MockAsyncCallback callback = new MockAsyncCallback();
-    mapper = new ObjectMapper();
-    serializationConfig = mapper.getSerializationConfig();
-    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
-    sw = new StringWriter();
-    mapper.writeValue(sw, cr);
-
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-
-    // Set contraints that only 1 msg per participant
-    Map<String, String> constraints = new TreeMap<String, String>();
-    constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
-    constraints.put("TRANSITION", "OFFLINE-COMPLETED");
-    constraints.put("CONSTRAINT_VALUE", "1");
-    constraints.put("INSTANCE", ".*");
-    manager.getClusterManagmentTool().setConstraint(manager.getClusterName(),
-        ConstraintType.MESSAGE_CONSTRAINT, "constraint1", new ConstraintItem(constraints));
-
-    // Send scheduler message
-    crString = sw.toString();
-    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
-    manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
-    String msgId =
-        callback._message.getResultMap()
-            .get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
-
-    for (int j = 0; j < 10; j++) {
-      Thread.sleep(200);
-      PropertyKey controllerTaskStatus =
-          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
-      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
-      if (statusUpdate.getMapFields().containsKey("SentMessageCount")) {
-        Assert.assertEquals(
-            statusUpdate.getMapFields().get("SentMessageCount").get("MessageCount"), ""
-                + (_PARTITIONS * 3));
-        break;
-      }
-    }
+  /**
+   * wait message summary to appear in controller-message-status-update
+   * @param msgId
+   * @param accessor
+   * @return
+   * @throws Exception
+   */
+  private boolean waitMessageUpdate(final String mapKey, final String msgId,
+      final HelixDataAccessor accessor) throws Exception {
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    return TestHelper.verify(new TestHelper.Verifier() {
 
-    for (int i = 0; i < _PARTITIONS * 3 / 5; i++) {
-      for (int j = 0; j < 10; j++) {
-        Thread.sleep(300);
-        if (factory._messageCount == 5 * (i + 1))
-          break;
-      }
-      Thread.sleep(300);
-      Assert.assertEquals(factory._messageCount, 5 * (i + 1));
-      factory.signal();
-      // System.err.println(i);
-    }
-
-    for (int j = 0; j < 10; j++) {
-      Thread.sleep(200);
-      PropertyKey controllerTaskStatus =
-          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
-      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
-      if (statusUpdate.getMapFields().containsKey("Summary")) {
-        break;
-      }
-    }
+      @Override
+      public boolean verify() throws Exception {
+        PropertyKey key = keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(), msgId);
+        HelixProperty statusUpdate = accessor.getProperty(key);
+        if (statusUpdate == null) {
+          return false;
+        }
 
-    Assert.assertEquals(_PARTITIONS, factory._results.size());
-    PropertyKey controllerTaskStatus =
-        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
-    ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
-    Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
-        .equals("" + (_PARTITIONS * 3)));
-    int messageResultCount = 0;
-    for (String key : statusUpdate.getMapFields().keySet()) {
-      if (key.startsWith("MessageResult ")) {
-        messageResultCount++;
+        if (statusUpdate.getRecord().getMapField(mapKey) == null) {
+          return false;
+        }
+        return true;
       }
-    }
-    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
-
-    int count = 0;
-    for (Set<String> val : factory._results.values()) {
-      count += val.size();
-    }
-    Assert.assertEquals(count, _PARTITIONS * 3);
-
-    manager.getClusterManagmentTool().removeConstraint(manager.getClusterName(),
-        ConstraintType.MESSAGE_CONSTRAINT, "constraint1");
-
+    }, 20 * 1000);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
new file mode 100644
index 0000000..b5c8d91
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
@@ -0,0 +1,253 @@
+package org.apache.helix.integration;
+
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Id;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestSchedulerMsgContraints extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+
+  class MockAsyncCallback extends AsyncCallback {
+    Message _message;
+
+    public MockAsyncCallback() {
+    }
+
+    @Override
+    public void onTimeOut() {
+      // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void onReplyMessage(Message message) {
+      _message = message;
+    }
+  }
+
+  public static class TestMessagingHandlerFactoryLatch implements MessageHandlerFactory {
+    public volatile CountDownLatch _latch = new CountDownLatch(1);
+    public int _messageCount = 0;
+    public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
+
+    @Override
+    public synchronized MessageHandler createHandler(Message message, NotificationContext context) {
+      _messageCount++;
+      return new TestMessagingHandlerLatch(message, context);
+    }
+
+    public synchronized void signal() {
+      _latch.countDown();
+      _latch = new CountDownLatch(1);
+    }
+
+    @Override
+    public String getMessageType() {
+      return "TestMessagingHandlerLatch";
+    }
+
+    @Override
+    public void reset() {
+      // TODO Auto-generated method stub
+    }
+
+    public class TestMessagingHandlerLatch extends MessageHandler {
+      public TestMessagingHandlerLatch(Message message, NotificationContext context) {
+        super(message, context);
+        // TODO Auto-generated constructor stub
+      }
+
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException {
+        _latch.await();
+        HelixTaskResult result = new HelixTaskResult();
+        result.setSuccess(true);
+        result.getTaskResultMap().put("Message", _message.getMsgId().stringify());
+        String destName = _message.getTgtName();
+        synchronized (_results) {
+          if (!_results.containsKey(_message.getPartitionId().stringify())) {
+            _results
+                .put(_message.getPartitionId().stringify(), new ConcurrentSkipListSet<String>());
+          }
+        }
+        _results.get(_message.getPartitionId().stringify()).add(destName);
+        // System.err.println("Message " + _message.getMsgId() + " executed");
+        return result;
+      }
+
+      @Override
+      public void onError(Exception e, ErrorCode code, ErrorType type) {
+        // TODO Auto-generated method stub
+      }
+    }
+  }
+
+  @Test
+  public void testSchedulerMsgContraints() throws Exception {
+    TestMessagingHandlerFactoryLatch factory = new TestMessagingHandlerFactoryLatch();
+    HelixManager manager = null;
+    for (int i = 0; i < NODE_NR; i++) {
+      String hostDest = "localhost_" + (START_PORT + i);
+      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+          factory.getMessageType(), factory);
+      //
+      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+          factory.getMessageType(), factory);
+      manager = _startCMResultMap.get(hostDest)._manager;
+    }
+
+    Message schedulerMessage =
+        new Message(MessageType.SCHEDULER_MSG + "", Id.message(UUID.randomUUID().toString()));
+    schedulerMessage.setTgtSessionId(Id.session("*"));
+    schedulerMessage.setTgtName("CONTROLLER");
+    // TODO: change it to "ADMIN" ?
+    schedulerMessage.setSrcName("CONTROLLER");
+
+    // Template for the individual message sent to each participant
+    Message msg = new Message(factory.getMessageType(), Id.message("Template"));
+    msg.setTgtSessionId(Id.session("*"));
+    msg.setMsgState(MessageState.NEW);
+
+    // Criteria to send individual messages
+    Criteria cr = new Criteria();
+    cr.setInstanceName("localhost_%");
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setResource("%");
+    cr.setPartition("%");
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    StringWriter sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    String crString = sw.toString();
+
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
+    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+    schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
+    schedulerMessage.getRecord().setSimpleField(
+        DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgContraints");
+
+    Criteria cr2 = new Criteria();
+    cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
+    cr2.setInstanceName("*");
+    cr2.setSessionSpecific(false);
+
+    MockAsyncCallback callback = new MockAsyncCallback();
+    mapper = new ObjectMapper();
+    serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
+
+    // Set contraints that only 1 msg per participant
+    Map<String, String> constraints = new TreeMap<String, String>();
+    constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
+    constraints.put("TRANSITION", "OFFLINE-COMPLETED");
+    constraints.put("CONSTRAINT_VALUE", "1");
+    constraints.put("INSTANCE", ".*");
+    manager.getClusterManagmentTool().setConstraint(manager.getClusterName(),
+        ConstraintType.MESSAGE_CONSTRAINT, "constraint1", new ConstraintItem(constraints));
+
+    // Send scheduler message
+    crString = sw.toString();
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
+    String msgId =
+        callback._message.getResultMap()
+            .get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
+
+    for (int j = 0; j < 10; j++) {
+      Thread.sleep(200);
+      PropertyKey controllerTaskStatus =
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+      if (statusUpdate.getMapFields().containsKey("SentMessageCount")) {
+        Assert.assertEquals(
+            statusUpdate.getMapFields().get("SentMessageCount").get("MessageCount"), ""
+                + (_PARTITIONS * 3));
+        break;
+      }
+    }
+
+    for (int i = 0; i < _PARTITIONS * 3 / 5; i++) {
+      for (int j = 0; j < 10; j++) {
+        Thread.sleep(300);
+        if (factory._messageCount == 5 * (i + 1))
+          break;
+      }
+      Thread.sleep(300);
+      Assert.assertEquals(factory._messageCount, 5 * (i + 1));
+      factory.signal();
+      // System.err.println(i);
+    }
+
+    for (int j = 0; j < 10; j++) {
+      Thread.sleep(200);
+      PropertyKey controllerTaskStatus =
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+      if (statusUpdate.getMapFields().containsKey("Summary")) {
+        break;
+      }
+    }
+
+    Assert.assertEquals(_PARTITIONS, factory._results.size());
+    PropertyKey controllerTaskStatus =
+        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+    ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+    Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
+        .equals("" + (_PARTITIONS * 3)));
+    int messageResultCount = 0;
+    for (String key : statusUpdate.getMapFields().keySet()) {
+      if (key.startsWith("MessageResult ")) {
+        messageResultCount++;
+      }
+    }
+    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+
+    int count = 0;
+    for (Set<String> val : factory._results.values()) {
+      count += val.size();
+    }
+    Assert.assertEquals(count, _PARTITIONS * 3);
+
+    manager.getClusterManagmentTool().removeConstraint(manager.getClusterName(),
+        ConstraintType.MESSAGE_CONSTRAINT, "constraint1");
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4e5e3e1d/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
new file mode 100644
index 0000000..6175ce0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
@@ -0,0 +1,181 @@
+package org.apache.helix.integration;
+
+import java.io.StringWriter;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Id;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestSchedulerMsgUsingQueue extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+  public static class TestMessagingHandlerFactory implements MessageHandlerFactory {
+    int cnt;
+    public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
+
+    public TestMessagingHandlerFactory() {
+      super();
+      cnt = 0;
+    }
+
+    @Override
+    public MessageHandler createHandler(Message message, NotificationContext context) {
+      // System.out.println("\t create-hdlr: " + message.getId());
+      return new TestMessagingHandler(message, context);
+    }
+
+    @Override
+    public String getMessageType() {
+      return "TestParticipant";
+    }
+
+    @Override
+    public void reset() {
+      // TODO Auto-generated method stub
+
+    }
+
+    public class TestMessagingHandler extends MessageHandler {
+      public TestMessagingHandler(Message message, NotificationContext context) {
+        super(message, context);
+        // TODO Auto-generated constructor stub
+      }
+
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException {
+        HelixTaskResult result = new HelixTaskResult();
+        result.setSuccess(true);
+        // String tgtName = _message.getTgtName();
+        String messageId = _message.getMsgId().stringify();
+        String partitionId = _message.getPartitionId().stringify();
+
+        result.getTaskResultMap().put("Message", messageId);
+        synchronized (_results) {
+          if (!_results.containsKey(partitionId)) {
+            _results.put(partitionId, new HashSet<String>());
+          }
+          _results.get(partitionId).add(messageId);
+        }
+        cnt++;
+        // System.err.println(cnt + ": message " + messageId + ", tgtName: " + tgtName
+        // + ", partitionId: " + partitionId);
+        return result;
+      }
+
+      @Override
+      public void onError(Exception e, ErrorCode code, ErrorType type) {
+        // TODO Auto-generated method stub
+      }
+    }
+  }
+
+  @Test()
+  public void testSchedulerMsgUsingQueue() throws Exception {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    // _factory._results.clear();
+    TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+
+    HelixManager manager = null;
+    for (int i = 0; i < NODE_NR; i++) {
+      String hostDest = "localhost_" + (START_PORT + i);
+      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+          factory.getMessageType(), factory);
+      manager = _startCMResultMap.get(hostDest)._manager;
+    }
+
+    Message schedulerMessage =
+        new Message(MessageType.SCHEDULER_MSG + "", Id.message(UUID.randomUUID().toString()));
+    schedulerMessage.setTgtSessionId(Id.session("*"));
+    schedulerMessage.setTgtName("CONTROLLER");
+    // TODO: change it to "ADMIN" ?
+    schedulerMessage.setSrcName("CONTROLLER");
+    schedulerMessage.getRecord().setSimpleField(
+        DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgUsingQueue");
+    // Template for the individual message sent to each participant
+    Message msg = new Message(factory.getMessageType(), Id.message("Template"));
+    msg.setTgtSessionId(Id.session("*"));
+    msg.setMsgState(MessageState.NEW);
+
+    // Criteria to send individual messages
+    Criteria cr = new Criteria();
+    cr.setInstanceName("localhost_%");
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setResource("%");
+    cr.setPartition("%");
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    StringWriter sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    String crString = sw.toString();
+
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
+    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+
+    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
+    helixDataAccessor.createProperty(
+        keyBuilder.controllerMessage(schedulerMessage.getMsgId().stringify()), schedulerMessage);
+
+    for (int i = 0; i < 30; i++) {
+      Thread.sleep(2000);
+      if (_PARTITIONS == factory._results.size()) {
+        break;
+      }
+    }
+
+    Assert.assertEquals(_PARTITIONS, factory._results.size());
+    PropertyKey controllerTaskStatus =
+        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), schedulerMessage
+            .getMsgId().stringify());
+
+    int messageResultCount = 0;
+    for (int i = 0; i < 10; i++) {
+      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+      Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
+          .equals("" + (_PARTITIONS * 3)));
+      for (String key : statusUpdate.getMapFields().keySet()) {
+        if (key.startsWith("MessageResult ")) {
+          messageResultCount++;
+        }
+      }
+      if (messageResultCount == _PARTITIONS * 3) {
+        break;
+      } else {
+        Thread.sleep(2000);
+      }
+    }
+    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+    int count = 0;
+    for (Set<String> val : factory._results.values()) {
+      count += val.size();
+    }
+    Assert.assertEquals(count, _PARTITIONS * 3);
+
+  }
+}


[3/3] git commit: [HELIX-244] Redesign rebalancers using rebalancer-specific configs

Posted by zz...@apache.org.
[HELIX-244] Redesign rebalancers using rebalancer-specific configs


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/0a8baa12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/0a8baa12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/0a8baa12

Branch: refs/heads/helix-logical-model
Commit: 0a8baa12f72b5ead84e92bdeb089fb20658df706
Parents: 4e5e3e1
Author: zzhang <zz...@apache.org>
Authored: Fri Sep 13 16:31:52 2013 -0700
Committer: zzhang <zz...@apache.org>
Committed: Fri Sep 13 16:31:52 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/HelixProperty.java    |  10 +-
 .../main/java/org/apache/helix/api/Cluster.java |  20 +-
 .../org/apache/helix/api/ClusterAccessor.java   |  59 ++-
 .../org/apache/helix/api/ClusterConfig.java     |  34 +-
 .../helix/api/CustomRebalancerConfig.java       | 143 +++++++
 .../helix/api/FullAutoRebalancerConfig.java     |  86 ++++
 .../org/apache/helix/api/NamespacedConfig.java  |  59 ++-
 .../org/apache/helix/api/RebalancerConfig.java  | 426 +++++++++++--------
 .../org/apache/helix/api/RebalancerRef.java     |   7 +-
 .../java/org/apache/helix/api/Resource.java     |  57 ++-
 .../helix/api/SemiAutoRebalancerConfig.java     | 146 +++++++
 .../helix/api/UserDefinedRebalancerConfig.java  | 113 +++++
 .../rebalancer/NewAutoRebalancer.java           |  61 +--
 .../rebalancer/NewCustomRebalancer.java         |  23 +-
 .../controller/rebalancer/NewRebalancer.java    |  23 +-
 .../rebalancer/NewSemiAutoRebalancer.java       |  23 +-
 .../rebalancer/NewUserDefinedRebalancer.java    |  35 ++
 .../helix/controller/stages/AttributeName.java  |   3 +-
 .../stages/NewBestPossibleStateCalcStage.java   |  55 +--
 .../stages/NewCurrentStateComputationStage.java |   2 +-
 .../stages/NewCurrentStateOutput.java           | 255 -----------
 .../stages/NewExternalViewComputeStage.java     |   2 +-
 .../stages/NewMessageGenerationStage.java       |  11 +-
 .../stages/NewMessageSelectionStage.java        |   9 +-
 .../stages/NewReadClusterDataStage.java         |   9 -
 .../stages/NewResourceComputationStage.java     |  20 +-
 .../controller/stages/ResourceCurrentState.java | 255 +++++++++++
 .../strategy/AutoRebalanceStrategy.java         |  20 +-
 .../helix/model/ClusterConfiguration.java       |   2 +-
 .../java/org/apache/helix/model/Message.java    |  11 +
 .../helix/model/PartitionConfiguration.java     |   2 +-
 .../helix/model/ResourceConfiguration.java      |  68 ++-
 .../helix/tools/ClusterStateVerifier.java       |  14 +-
 .../apache/helix/api/TestNamespacedConfig.java  | 129 ++++++
 .../org/apache/helix/api/TestNewStages.java     |  76 +---
 .../org/apache/helix/api/TestUserConfig.java    |  86 ----
 .../helix/controller/stages/BaseStageTest.java  |  13 +-
 .../TestBestPossibleCalcStageCompatibility.java |  12 +-
 .../stages/TestBestPossibleStateCalcStage.java  |   7 +-
 .../TestCurrentStateComputationStage.java       |   8 +-
 .../TestCustomizedIdealStateRebalancer.java     |  19 +-
 .../apache/helix/examples/NewModelExample.java  |   9 +-
 42 files changed, 1606 insertions(+), 816 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/HelixProperty.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixProperty.java b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
index 9f1195f..f52d51c 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
@@ -27,7 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.helix.api.UserConfig;
+import org.apache.helix.api.NamespacedConfig;
 
 /**
  * A wrapper class for ZNRecord. Used as a base class for IdealState, CurrentState, etc.
@@ -228,11 +228,11 @@ public class HelixProperty {
   }
 
   /**
-   * Add user-defined configuration properties to this property
-   * @param userConfig UserConfig properties
+   * Add namespaced configuration properties to this property
+   * @param namespacedConfig namespaced properties
    */
-  public void addUserConfig(UserConfig userConfig) {
-    UserConfig.addConfigToProperty(this, userConfig);
+  public void addNamespacedConfig(NamespacedConfig namespacedConfig) {
+    NamespacedConfig.addConfigToProperty(this, namespacedConfig);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index ab95936..005ae0d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.StateModelDefinition;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableMap;
@@ -70,11 +71,16 @@ public class Cluster {
    * @param participantMap
    * @param controllerMap
    * @param leaderId
+   * @param constraintMap
+   * @param stateModelMap
+   * @param userConfig
+   * @param isPaused
    */
   public Cluster(ClusterId id, Map<ResourceId, Resource> resourceMap,
       Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
       ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap,
-      UserConfig userConfig, boolean isPaused) {
+      Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
+      boolean isPaused) {
 
     // build the config
     // Guava's transform and "copy" functions really return views so the maps all reflect each other
@@ -93,8 +99,8 @@ public class Cluster {
           }
         });
     _config =
-        new ClusterConfig(id, resourceConfigMap, participantConfigMap, constraintMap, userConfig,
-            isPaused);
+        new ClusterConfig(id, resourceConfigMap, participantConfigMap, constraintMap,
+            stateModelMap, userConfig, isPaused);
 
     _resourceMap = ImmutableMap.copyOf(resourceMap);
 
@@ -191,6 +197,14 @@ public class Cluster {
   }
 
   /**
+   * Get all the state model definitions on the cluster
+   * @return map of state model definition id to state model definition
+   */
+  public Map<StateModelDefId, StateModelDefinition> getStateModelMap() {
+    return _config.getStateModelMap();
+  }
+
+  /**
    * Get user-specified configuration properties of this cluster
    * @return UserConfig properties
    */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
index 59da8b7..4bfe780 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
@@ -20,6 +20,7 @@ package org.apache.helix.api;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -35,12 +36,14 @@ import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.PartitionConfiguration;
 import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.ResourceConfiguration;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
 public class ClusterAccessor {
@@ -84,6 +87,12 @@ public class ClusterAccessor {
     if (cluster.isPaused()) {
       pauseCluster();
     }
+    StateModelDefinitionAccessor stateModelDefAccessor =
+        new StateModelDefinitionAccessor(_accessor);
+    Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
+    for (StateModelDefinition stateModelDef : stateModelDefs.values()) {
+      stateModelDefAccessor.addStateModelDefinition(stateModelDef);
+    }
 
     return true;
   }
@@ -199,7 +208,7 @@ public class ClusterAccessor {
         }
         resourceMap.put(resourceId,
             new Resource(resourceId, idealState, null, externalViewMap.get(resourceName),
-                userConfig, partitionUserConfigs, liveInstanceMap.size()));
+                userConfig, partitionUserConfigs));
       }
     }
 
@@ -241,8 +250,13 @@ public class ClusterAccessor {
     } else {
       userConfig = new UserConfig(_clusterId);
     }
+
+    StateModelDefinitionAccessor stateModelDefAccessor =
+        new StateModelDefinitionAccessor(_accessor);
+    Map<StateModelDefId, StateModelDefinition> stateModelMap =
+        stateModelDefAccessor.readStateModelDefinitions();
     return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
-        clusterConstraintMap, userConfig, isPaused);
+        clusterConstraintMap, stateModelMap, userConfig, isPaused);
   }
 
   /**
@@ -264,6 +278,7 @@ public class ClusterAccessor {
    * @param resource
    */
   public void addResourceToCluster(ResourceConfig resource) {
+    // TODO: this belongs in ResourceAccessor
     StateModelDefId stateModelDefId = resource.getRebalancerConfig().getStateModelDefId();
     if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
       throw new HelixException("State model: " + stateModelDefId + " not found in cluster: "
@@ -278,7 +293,9 @@ public class ClusterAccessor {
 
     // Add resource user config
     if (resource.getUserConfig() != null) {
-      ResourceConfiguration configuration = ResourceConfiguration.from(resource.getUserConfig());
+      ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
+      configuration.addNamespacedConfig(resource.getUserConfig());
+      configuration.addRebalancerConfig(resource.getRebalancerConfig());
       _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
     }
 
@@ -294,13 +311,24 @@ public class ClusterAccessor {
     }
     idealState.setStateModelDefId(rebalancerConfig.getStateModelDefId());
     for (PartitionId partitionId : resource.getPartitionSet()) {
-      List<ParticipantId> preferenceList = rebalancerConfig.getPreferenceList(partitionId);
-      Map<ParticipantId, State> preferenceMap = rebalancerConfig.getPreferenceMap(partitionId);
-      if (preferenceList != null) {
-        idealState.setPreferenceList(partitionId, preferenceList);
-      }
-      if (preferenceMap != null) {
-        idealState.setParticipantStateMap(partitionId, preferenceMap);
+      if (rebalancerConfig.getRebalancerMode() == RebalanceMode.SEMI_AUTO) {
+        SemiAutoRebalancerConfig config = SemiAutoRebalancerConfig.from(rebalancerConfig);
+        List<ParticipantId> preferenceList = config.getPreferenceList(partitionId);
+        if (preferenceList != null) {
+          idealState.setPreferenceList(partitionId, preferenceList);
+        }
+      } else if (rebalancerConfig.getRebalancerMode() == RebalanceMode.CUSTOMIZED) {
+        CustomRebalancerConfig config = CustomRebalancerConfig.from(rebalancerConfig);
+        Map<ParticipantId, State> preferenceMap = config.getPreferenceMap(partitionId);
+        if (preferenceMap != null) {
+          idealState.setParticipantStateMap(partitionId, preferenceMap);
+        }
+      } else {
+        // TODO: need these for as long as we use IdealState as the backing physical model
+        List<ParticipantId> emptyList = Collections.emptyList();
+        Map<ParticipantId, State> emptyMap = Collections.emptyMap();
+        idealState.setPreferenceList(partitionId, emptyList);
+        idealState.setParticipantStateMap(partitionId, emptyMap);
       }
       Partition partition = resource.getPartition(partitionId);
       if (partition.getUserConfig() != null) {
@@ -317,9 +345,12 @@ public class ClusterAccessor {
     if (groupTag != null) {
       idealState.setInstanceGroupTag(groupTag);
     }
-    RebalancerRef rebalancerRef = rebalancerConfig.getRebalancerRef();
-    if (rebalancerRef != null) {
-      idealState.setRebalancerRef(rebalancerRef);
+    if (rebalancerConfig.getRebalancerMode() == RebalanceMode.USER_DEFINED) {
+      UserDefinedRebalancerConfig config = UserDefinedRebalancerConfig.from(rebalancerConfig);
+      RebalancerRef rebalancerRef = config.getRebalancerRef();
+      if (rebalancerRef != null) {
+        idealState.setRebalancerRef(rebalancerRef);
+      }
     }
     StateModelFactoryId stateModelFactoryId = rebalancerConfig.getStateModelFactoryId();
     if (stateModelFactoryId != null) {
@@ -377,7 +408,7 @@ public class ClusterAccessor {
     instanceConfig.setPort(Integer.toString(participant.getPort()));
     instanceConfig.setInstanceEnabled(participant.isEnabled());
     UserConfig userConfig = participant.getUserConfig();
-    instanceConfig.addUserConfig(userConfig);
+    instanceConfig.addNamespacedConfig(userConfig);
     Set<String> tags = participant.getTags();
     for (String tag : tags) {
       instanceConfig.addTag(tag);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
index 5e4a858..dd19096 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
@@ -6,6 +6,7 @@ import java.util.Map;
 
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.StateModelDefinition;
 
 import com.google.common.collect.ImmutableMap;
 
@@ -36,6 +37,7 @@ public class ClusterConfig {
   private final Map<ResourceId, ResourceConfig> _resourceMap;
   private final Map<ParticipantId, ParticipantConfig> _participantMap;
   private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+  private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
   private final UserConfig _userConfig;
   private final boolean _isPaused;
 
@@ -44,17 +46,21 @@ public class ClusterConfig {
    * @param id cluster id
    * @param resourceMap map of resource id to resource config
    * @param participantMap map of participant id to participant config
-   * @param constraintMapmap of constraint type to all constraints of that type
+   * @param constraintMap map of constraint type to all constraints of that type
+   * @param stateModelMap map of state model id to state model definition
    * @param userConfig user-defined cluster properties
    * @param isPaused true if paused, false if active
    */
   public ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
       Map<ParticipantId, ParticipantConfig> participantMap,
-      Map<ConstraintType, ClusterConstraints> constraintMap, UserConfig userConfig, boolean isPaused) {
+      Map<ConstraintType, ClusterConstraints> constraintMap,
+      Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
+      boolean isPaused) {
     _id = id;
     _resourceMap = ImmutableMap.copyOf(resourceMap);
     _participantMap = ImmutableMap.copyOf(participantMap);
     _constraintMap = ImmutableMap.copyOf(constraintMap);
+    _stateModelMap = ImmutableMap.copyOf(stateModelMap);
     _userConfig = userConfig;
     _isPaused = isPaused;
   }
@@ -92,6 +98,14 @@ public class ClusterConfig {
   }
 
   /**
+   * Get all the state model definitions on the cluster
+   * @return map of state model definition id to state model definition
+   */
+  public Map<StateModelDefId, StateModelDefinition> getStateModelMap() {
+    return _stateModelMap;
+  }
+
+  /**
    * Get user-specified configuration properties of this cluster
    * @return UserConfig properties
    */
@@ -115,6 +129,7 @@ public class ClusterConfig {
     private final Map<ResourceId, ResourceConfig> _resourceMap;
     private final Map<ParticipantId, ParticipantConfig> _participantMap;
     private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+    private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
     private UserConfig _userConfig;
     private boolean _isPaused;
 
@@ -127,6 +142,7 @@ public class ClusterConfig {
       _resourceMap = new HashMap<ResourceId, ResourceConfig>();
       _participantMap = new HashMap<ParticipantId, ParticipantConfig>();
       _constraintMap = new HashMap<ConstraintType, ClusterConstraints>();
+      _stateModelMap = new HashMap<StateModelDefId, StateModelDefinition>();
       _isPaused = false;
       _userConfig = new UserConfig(id);
     }
@@ -198,6 +214,16 @@ public class ClusterConfig {
     }
 
     /**
+     * Add a constraint to the cluster
+     * @param constraint cluster constraint of a specific type
+     * @return Builder
+     */
+    public Builder addStateModelDefinition(StateModelDefinition stateModelDef) {
+      _stateModelMap.put(stateModelDef.getStateModelDefId(), stateModelDef);
+      return this;
+    }
+
+    /**
      * Set the paused status of the cluster
      * @param isPaused true if paused, false otherwise
      * @return Builder
@@ -222,8 +248,8 @@ public class ClusterConfig {
      * @return ClusterConfig
      */
     public ClusterConfig build() {
-      return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _userConfig,
-          _isPaused);
+      return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _stateModelMap,
+          _userConfig, _isPaused);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/CustomRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/CustomRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/CustomRebalancerConfig.java
new file mode 100644
index 0000000..97d4a45
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/CustomRebalancerConfig.java
@@ -0,0 +1,143 @@
+package org.apache.helix.api;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Configuration properties for the CUSTOMIZED rebalancer
+ */
+public final class CustomRebalancerConfig extends RebalancerConfig {
+  /**
+   * Instantiate a new config for CUSTOMIZED
+   * @param resourceId the resource to rebalance
+   * @param stateModelDefId the state model that the resource follows
+   * @param partitionMap map of partition id to partition
+   */
+  public CustomRebalancerConfig(ResourceId resourceId, StateModelDefId stateModelDefId,
+      Map<PartitionId, Partition> partitionMap) {
+    super(resourceId, RebalanceMode.CUSTOMIZED, stateModelDefId, partitionMap);
+  }
+
+  /**
+   * Instantiate from a base RebalancerConfig
+   * @param config populated rebalancer config
+   */
+  private CustomRebalancerConfig(RebalancerConfig config) {
+    super(config);
+  }
+
+  /**
+   * Get the preference map for a partition
+   * @param partitionId the partition to look up
+   * @return preference map of participant to state for each replica
+   */
+  public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
+    Map<String, String> rawPreferenceMap = getMapField(partitionId.stringify());
+    if (rawPreferenceMap != null) {
+      return IdealState.participantStateMapFromStringMap(rawPreferenceMap);
+    }
+    return Collections.emptyMap();
+  }
+
+  /**
+   * Set the preference map for a partition
+   * @param partitionId the partition to set
+   * @param preferenceMap map of participant to state for each replica
+   */
+  public void setPreferenceMap(PartitionId partitionId, Map<ParticipantId, State> preferenceMap) {
+    setMapField(partitionId.toString(), IdealState.stringMapFromParticipantStateMap(preferenceMap));
+  }
+
+  /**
+   * Get all the preference maps for a partition
+   * @return map of partition id to map of participant id to state for each replica
+   */
+  public Map<PartitionId, Map<ParticipantId, State>> getPreferenceMaps() {
+    Map<String, Map<String, String>> rawPreferenceMaps = getMapFields();
+    return IdealState.participantStateMapsFromStringMaps(rawPreferenceMaps);
+  }
+
+  /**
+   * Set all the preference maps for a partition
+   * @param preferenceMaps map of partition id to map of participant id to state for each replica
+   */
+  public void setPreferenceMaps(Map<PartitionId, Map<ParticipantId, State>> preferenceMaps) {
+    setMapFields(IdealState.stringMapsFromParticipantStateMaps(preferenceMaps));
+  }
+
+  /**
+   * Get a CustomRebalancerConfig from a RebalancerConfig
+   * @param config populated RebalancerConfig
+   * @return CustomRebalancerConfig
+   */
+  public static CustomRebalancerConfig from(RebalancerConfig config) {
+    return new CustomRebalancerConfig(config);
+  }
+
+  /**
+   * Assembler for a CUSTOMIZED configuration
+   */
+  public static class Builder extends RebalancerConfig.Builder<Builder> {
+    private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
+
+    /**
+     * Build for a specific resource
+     * @param resourceId the resource to rebalance
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+      _preferenceMaps = new HashMap<PartitionId, Map<ParticipantId, State>>();
+    }
+
+    /**
+     * Add a preference map of a partition
+     * @param partitionId the partition to set
+     * @param preferenceMap map of participant id to state
+     * @return Builder
+     */
+    public Builder preferenceMap(PartitionId partitionId, Map<ParticipantId, State> preferenceMap) {
+      _preferenceMaps.put(partitionId, preferenceMap);
+      return this;
+    }
+
+    @Override
+    public CustomRebalancerConfig build() {
+      if (_partitionMap.isEmpty()) {
+        addPartitions(1);
+      }
+      CustomRebalancerConfig config =
+          new CustomRebalancerConfig(_resourceId, _stateModelDefId, _partitionMap);
+      update(config);
+      config.setPreferenceMaps(_preferenceMaps);
+      return config;
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/FullAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/FullAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/FullAutoRebalancerConfig.java
new file mode 100644
index 0000000..3482d16
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/FullAutoRebalancerConfig.java
@@ -0,0 +1,86 @@
+package org.apache.helix.api;
+
+import java.util.Map;
+
+import org.apache.helix.model.IdealState.RebalanceMode;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Configuration properties for the FULL_AUTO rebalancer
+ */
+public final class FullAutoRebalancerConfig extends RebalancerConfig {
+  /**
+   * Instantiate a new config for FULL_AUTO
+   * @param resourceId the resource to rebalance
+   * @param stateModelDefId the state model that the resource follows
+   * @param partitionMap map of partition id to partition
+   */
+  public FullAutoRebalancerConfig(ResourceId resourceId, StateModelDefId stateModelDefId,
+      Map<PartitionId, Partition> partitionMap) {
+    super(resourceId, RebalanceMode.FULL_AUTO, stateModelDefId, partitionMap);
+  }
+
+  /**
+   * Instantiate from a base RebalancerConfig
+   * @param config populated rebalancer config
+   */
+  private FullAutoRebalancerConfig(RebalancerConfig config) {
+    super(config);
+  }
+
+  /**
+   * Get a FullAutoRebalancerConfig from a RebalancerConfig
+   * @param config populated RebalancerConfig
+   * @return FullAutoRebalancerConfig
+   */
+  public static FullAutoRebalancerConfig from(RebalancerConfig config) {
+    return new FullAutoRebalancerConfig(config);
+  }
+
+  /**
+   * Assembler for a FULL_AUTO configuration
+   */
+  public static class Builder extends RebalancerConfig.Builder<Builder> {
+    /**
+     * Build for a specific resource
+     * @param resourceId the resource to rebalance
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+    }
+
+    @Override
+    public FullAutoRebalancerConfig build() {
+      if (_partitionMap.isEmpty()) {
+        addPartitions(1);
+      }
+      FullAutoRebalancerConfig config =
+          new FullAutoRebalancerConfig(_resourceId, _stateModelDefId, _partitionMap);
+      update(config);
+      return config;
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/NamespacedConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/NamespacedConfig.java b/helix-core/src/main/java/org/apache/helix/api/NamespacedConfig.java
index 0a46fa7..f7656d7 100644
--- a/helix-core/src/main/java/org/apache/helix/api/NamespacedConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/NamespacedConfig.java
@@ -46,22 +46,29 @@ public abstract class NamespacedConfig extends ZNRecord {
   }
 
   /**
-   * Instantiate a UserConfig from an existing HelixProperty
+   * Instantiate a NamespacedConfig from an existing HelixProperty
    * @param property property wrapping a configuration
    */
   public NamespacedConfig(HelixProperty property, String prefix) {
     super(property.getRecord());
     _prefix = prefix + '_';
-    // filter out any configuration that isn't user-defined
-    Predicate<String> keyFilter = new Predicate<String>() {
-      @Override
-      public boolean apply(String key) {
-        return key.contains(_prefix);
-      }
-    };
-    super.setMapFields(Maps.filterKeys(super.getMapFields(), keyFilter));
-    super.setListFields(Maps.filterKeys(super.getListFields(), keyFilter));
-    super.setSimpleFields(Maps.filterKeys(super.getSimpleFields(), keyFilter));
+    filterNonPrefixedFields();
+  }
+
+  /**
+   * Instantiate a NamespacedConfig as a copy of another NamedspacedConfig
+   * @param config populated NamespacedConfig
+   */
+  public NamespacedConfig(NamespacedConfig config) {
+    super(config.getId());
+    _prefix = config.getPrefix() + '_';
+    if (config.getRawPayload() != null && config.getRawPayload().length > 0) {
+      setRawPayload(config.getRawPayload());
+      setPayloadSerializer(config.getPayloadSerializer());
+    }
+    super.setSimpleFields(config.getPrefixedSimpleFields());
+    super.setListFields(config.getPrefixedListFields());
+    super.setMapFields(config.getPrefixedMapFields());
   }
 
   @Override
@@ -77,7 +84,7 @@ public abstract class NamespacedConfig extends ZNRecord {
   @Override
   public void setMapFields(Map<String, Map<String, String>> mapFields) {
     for (String k : mapFields.keySet()) {
-      setMapField(_prefix + k, mapFields.get(k));
+      super.setMapField(_prefix + k, mapFields.get(k));
     }
   }
 
@@ -102,7 +109,7 @@ public abstract class NamespacedConfig extends ZNRecord {
   @Override
   public void setListFields(Map<String, List<String>> listFields) {
     for (String k : listFields.keySet()) {
-      setListField(_prefix + k, listFields.get(k));
+      super.setListField(_prefix + k, listFields.get(k));
     }
   }
 
@@ -140,6 +147,30 @@ public abstract class NamespacedConfig extends ZNRecord {
   }
 
   /**
+   * Get the prefix used to distinguish these config properties
+   * @return string prefix, not including the underscore
+   */
+  public String getPrefix() {
+    return _prefix.substring(0, _prefix.indexOf('_'));
+  }
+
+  /**
+   * Remove all fields from this config that are not prefixed
+   */
+  private void filterNonPrefixedFields() {
+    // filter out any configuration that isn't user-defined
+    Predicate<String> keyFilter = new Predicate<String>() {
+      @Override
+      public boolean apply(String key) {
+        return key.contains(_prefix);
+      }
+    };
+    super.setMapFields(Maps.filterKeys(super.getMapFields(), keyFilter));
+    super.setListFields(Maps.filterKeys(super.getListFields(), keyFilter));
+    super.setSimpleFields(Maps.filterKeys(super.getSimpleFields(), keyFilter));
+  }
+
+  /**
    * Get all map fields with prefixed keys
    * @return prefixed map fields
    */
@@ -187,7 +218,7 @@ public abstract class NamespacedConfig extends ZNRecord {
   private static <T> Map<String, T> convertToPrefixlessMap(Map<String, T> rawMap, String prefix) {
     Map<String, T> convertedMap = new HashMap<String, T>();
     for (String rawKey : rawMap.keySet()) {
-      String k = rawKey.substring(rawKey.indexOf(prefix) + 1);
+      String k = rawKey.substring(prefix.length());
       convertedMap.put(k, rawMap.get(rawKey));
     }
     return ImmutableMap.copyOf(convertedMap);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
index 1f401c8..1816d91 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
@@ -19,84 +19,106 @@ package org.apache.helix.api;
  * under the License.
  */
 
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.helix.HelixConstants;
-import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfiguration;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.base.Functions;
+import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 /**
  * Captures the configuration properties necessary for rebalancing
  */
 public class RebalancerConfig extends NamespacedConfig {
-  private final RebalanceMode _rebalancerMode;
-  private final RebalancerRef _rebalancerRef;
-  private final StateModelDefId _stateModelDefId;
-  private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
-  private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
-  private final ResourceAssignment _resourceAssignment;
-  private final int _replicaCount;
-  private final boolean _anyLiveParticipant;
-  private final String _participantGroupTag;
-  private final int _maxPartitionsPerParticipant;
-  private final StateModelFactoryId _stateModelFactoryId;
+
+  /**
+   * Fields used by the base RebalancerConfig
+   */
+  public enum Fields {
+    ANY_LIVE_PARTICIPANT,
+    MAX_PARTITIONS_PER_PARTICIPANT,
+    PARTICIPANT_GROUP_TAG,
+    REBALANCE_MODE,
+    REPLICA_COUNT,
+    STATE_MODEL_DEFINITION,
+    STATE_MODEL_FACTORY
+  }
+
+  private final ResourceId _resourceId;
+  private final Set<String> _fieldsSet;
   private final Map<PartitionId, Partition> _partitionMap;
 
   /**
-   * Instantiate the configuration of a rebalance task
-   * @param idealState the physical ideal state
-   * @param resourceAssignment last mapping of a resource
+   * Instantiate a RebalancerConfig.
+   * @param resourceId the resource to rebalance
+   * @param rebalancerMode the mode to rebalance with
+   * @param stateModelDefId the state model that the resource uses
+   * @param partitionMap partitions of the resource
    */
-  public RebalancerConfig(Map<PartitionId, Partition> partitionMap, IdealState idealState,
-      ResourceAssignment resourceAssignment, int liveParticipantCount) {
-    super(idealState.getResourceId(), RebalancerConfig.class.getSimpleName());
+  public RebalancerConfig(ResourceId resourceId, RebalanceMode rebalancerMode,
+      StateModelDefId stateModelDefId, Map<PartitionId, Partition> partitionMap) {
+    super(resourceId, RebalancerConfig.class.getSimpleName());
+    _resourceId = resourceId;
+    _fieldsSet =
+        ImmutableSet.copyOf(Lists.transform(Arrays.asList(Fields.values()),
+            Functions.toStringFunction()));
+    setEnumField(Fields.REBALANCE_MODE.toString(), rebalancerMode);
+    setSimpleField(Fields.STATE_MODEL_DEFINITION.toString(), stateModelDefId.stringify());
     _partitionMap = ImmutableMap.copyOf(partitionMap);
-    _rebalancerMode = idealState.getRebalanceMode();
-    _rebalancerRef = idealState.getRebalancerRef();
-    _stateModelDefId = idealState.getStateModelDefId();
-    String replicaCount = idealState.getReplicas();
-    if (replicaCount.equals(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString())) {
-      _replicaCount = liveParticipantCount;
-      _anyLiveParticipant = true;
-    } else {
-      _replicaCount = Integer.parseInt(idealState.getReplicas());
-      _anyLiveParticipant = false;
-    }
-    _participantGroupTag = idealState.getInstanceGroupTag();
-    _maxPartitionsPerParticipant = idealState.getMaxPartitionsPerInstance();
-    _stateModelFactoryId = idealState.getStateModelFactoryId();
-
-    // Build preference lists and maps
-    ImmutableMap.Builder<PartitionId, List<ParticipantId>> preferenceLists =
-        new ImmutableMap.Builder<PartitionId, List<ParticipantId>>();
-    ImmutableMap.Builder<PartitionId, Map<ParticipantId, State>> preferenceMaps =
-        new ImmutableMap.Builder<PartitionId, Map<ParticipantId, State>>();
-    for (PartitionId partitionId : idealState.getPartitionSet()) {
-      List<ParticipantId> preferenceList = idealState.getPreferenceList(partitionId);
-      if (preferenceList != null) {
-        preferenceLists.put(partitionId, ImmutableList.copyOf(preferenceList));
-      }
-      Map<ParticipantId, State> preferenceMap = idealState.getParticipantStateMap(partitionId);
-      if (preferenceMap != null) {
-        preferenceMaps.put(partitionId, ImmutableMap.copyOf(preferenceMap));
-      }
+  }
+
+  /**
+   * Extract rebalancer-specific configuration from a physical resource config
+   * @param resourceConfiguration resource config
+   * @param partitionMap map of partition id to partition object
+   */
+  protected RebalancerConfig(ResourceConfiguration resourceConfiguration,
+      Map<PartitionId, Partition> partitionMap) {
+    super(resourceConfiguration, RebalancerConfig.class.getSimpleName());
+    _resourceId = resourceConfiguration.getResourceId();
+    _fieldsSet =
+        ImmutableSet.copyOf(Lists.transform(Arrays.asList(Fields.values()),
+            Functions.toStringFunction()));
+    _partitionMap = ImmutableMap.copyOf(partitionMap);
+  }
+
+  /**
+   * Copy a RebalancerConfig from an existing one
+   * @param config populated RebalancerConfig
+   */
+  protected RebalancerConfig(RebalancerConfig config) {
+    super(config.getResourceId(), RebalancerConfig.class.getSimpleName());
+    _resourceId = config.getResourceId();
+    _partitionMap = config.getPartitionMap();
+    _fieldsSet =
+        ImmutableSet.copyOf(Lists.transform(Arrays.asList(Fields.values()),
+            Functions.toStringFunction()));
+    super.setSimpleFields(config.getRawSimpleFields());
+    super.setListFields(config.getRawListFields());
+    super.setMapFields(config.getRawMapFields());
+    if (config.getRawPayload() != null && config.getRawPayload().length > 0) {
+      setRawPayload(config.getRawPayload());
+      setPayloadSerializer(config.getPayloadSerializer());
     }
-    _preferenceLists = preferenceLists.build();
-    _preferenceMaps = preferenceMaps.build();
+  }
 
-    // Leave the resource assignment as is
-    _resourceAssignment = resourceAssignment;
+  /**
+   * Get the resource id
+   * @return ResourceId
+   */
+  public ResourceId getResourceId() {
+    return _resourceId;
   }
 
   /**
@@ -131,15 +153,7 @@ public class RebalancerConfig extends NamespacedConfig {
    * @return rebalancer mode
    */
   public RebalanceMode getRebalancerMode() {
-    return _rebalancerMode;
-  }
-
-  /**
-   * Get the rebalancer class name
-   * @return rebalancer class name or null if not exist
-   */
-  public RebalancerRef getRebalancerRef() {
-    return _rebalancerRef;
+    return getEnumField(Fields.REBALANCE_MODE.toString(), RebalanceMode.class, RebalanceMode.NONE);
   }
 
   /**
@@ -147,63 +161,56 @@ public class RebalancerConfig extends NamespacedConfig {
    * @return state model definition
    */
   public StateModelDefId getStateModelDefId() {
-    return _stateModelDefId;
+    return Id.stateModelDef(getStringField(Fields.STATE_MODEL_DEFINITION.toString(), null));
   }
 
   /**
-   * Get the ideal node and state assignment of the resource
-   * @return resource assignment
+   * Get the number of replicas each partition should have. This function will return 0 if some
+   * policy overrides the replica count, for instance if any live participant can accept a replica.
+   * @return replica count
    */
-  public ResourceAssignment getResourceAssignment() {
-    return _resourceAssignment;
+  public int getReplicaCount() {
+    return getIntField(Fields.REPLICA_COUNT.toString(), 0);
   }
 
   /**
-   * Get the preference list of participants for a given partition
-   * @param partitionId the partition to look up
-   * @return the ordered preference list (early entries are more preferred)
+   * Set the number of replicas each partition should have.
+   * @param replicaCount replica count
    */
-  public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
-    if (_preferenceLists.containsKey(partitionId)) {
-      return _preferenceLists.get(partitionId);
-    }
-    return Collections.emptyList();
+  public void setReplicaCount(int replicaCount) {
+    setIntField(Fields.REPLICA_COUNT.toString(), replicaCount);
   }
 
   /**
-   * Get the preference map of participants and states for a given partition
-   * @param partitionId the partition to look up
-   * @return a mapping of participant to state for each replica
+   * Get the number of partitions of this resource that a given participant can accept
+   * @return maximum number of partitions
    */
-  public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
-    if (_preferenceMaps.containsKey(partitionId)) {
-      return _preferenceMaps.get(partitionId);
-    }
-    return Collections.emptyMap();
+  public int getMaxPartitionsPerParticipant() {
+    return getIntField(Fields.MAX_PARTITIONS_PER_PARTICIPANT.toString(), Integer.MAX_VALUE);
   }
 
   /**
-   * Get the number of replicas each partition should have
-   * @return replica count
+   * Set the number of partitions of this resource that a given participant can accept
+   * @param maxPartitionsPerParticipant maximum number of partitions
    */
-  public int getReplicaCount() {
-    return _replicaCount;
+  public void setMaxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
+    setIntField(Fields.MAX_PARTITIONS_PER_PARTICIPANT.toString(), maxPartitionsPerParticipant);
   }
 
   /**
-   * Get the number of partitions of this resource that a given participant can accept
-   * @return maximum number of partitions
+   * Get the tag, if any, which must be present on assignable instances
+   * @return group tag, or null if it is not present
    */
-  public int getMaxPartitionsPerParticipant() {
-    return _maxPartitionsPerParticipant;
+  public String getParticipantGroupTag() {
+    return getStringField(Fields.PARTICIPANT_GROUP_TAG.toString(), null);
   }
 
   /**
-   * Get the tag, if any, which must be present on assignable instances
-   * @return group tag
+   * Set the tag, if any, which must be present on assignable instances
+   * @param participantGroupTag group tag
    */
-  public String getParticipantGroupTag() {
-    return _participantGroupTag;
+  public void setParticipantGroupTag(String participantGroupTag) {
+    setSimpleField(Fields.PARTICIPANT_GROUP_TAG.toString(), participantGroupTag);
   }
 
   /**
@@ -211,7 +218,17 @@ public class RebalancerConfig extends NamespacedConfig {
    * @return state model factory id
    */
   public StateModelFactoryId getStateModelFactoryId() {
-    return _stateModelFactoryId;
+    return Id.stateModelFactory(getStringField(Fields.STATE_MODEL_FACTORY.toString(), null));
+  }
+
+  /**
+   * Set state model factory id
+   * @param factoryId state model factory id
+   */
+  public void setStateModelFactoryId(StateModelFactoryId factoryId) {
+    if (factoryId != null) {
+      setSimpleField(Fields.STATE_MODEL_FACTORY.toString(), factoryId.stringify());
+    }
   }
 
   /**
@@ -219,67 +236,129 @@ public class RebalancerConfig extends NamespacedConfig {
    * @return true if they can, false if they cannot
    */
   public boolean canAssignAnyLiveParticipant() {
-    return _anyLiveParticipant;
+    return getBooleanField(Fields.ANY_LIVE_PARTICIPANT.toString(), false);
+  }
+
+  /**
+   * Specify if replicas can be assigned to any live participant
+   * @param canAssignAny true if they can, false if they cannot
+   */
+  public void setCanAssignAnyLiveParticipant(boolean canAssignAny) {
+    setBooleanField(Fields.ANY_LIVE_PARTICIPANT.toString(), canAssignAny);
+  }
+
+  /*
+   * Override: removes from view fields set by RebalancerConfig
+   */
+  @Override
+  public Map<String, String> getSimpleFields() {
+    return Maps.filterKeys(super.getSimpleFields(), filterBaseConfigFields());
+  }
+
+  /*
+   * Override: makes sure that updated simple fields include those from this class
+   */
+  @Override
+  public void setSimpleFields(Map<String, String> simpleFields) {
+    Map<String, String> copySimpleFields = new HashMap<String, String>();
+    copySimpleFields.putAll(simpleFields);
+    for (String field : _fieldsSet) {
+      String value = getStringField(field, null);
+      if (value != null) {
+        copySimpleFields.put(field, value);
+      }
+    }
+    super.setSimpleFields(copySimpleFields);
+  }
+
+  /**
+   * Get a predicate that can checks if a key is used by this config
+   * @return Guava Predicate
+   */
+  private Predicate<String> filterBaseConfigFields() {
+    return new Predicate<String>() {
+      @Override
+      public boolean apply(String key) {
+        return !_fieldsSet.contains(key);
+      }
+    };
+  }
+
+  /**
+   * Get simple fields without filtering out base fields
+   * @return simple fields
+   */
+  private Map<String, String> getRawSimpleFields() {
+    return super.getSimpleFields();
+  }
+
+  /**
+   * Get list fields without filtering out base fields
+   * @return list fields
+   */
+  private Map<String, List<String>> getRawListFields() {
+    return super.getListFields();
+  }
+
+  /**
+   * Get map fields without filtering out base fields
+   * @return map fields
+   */
+  private Map<String, Map<String, String>> getRawMapFields() {
+    return super.getMapFields();
+  }
+
+  /**
+   * Get a RebalancerConfig from a physical resource configuration
+   * @param config resource configuration
+   * @return RebalancerConfig
+   */
+  public static RebalancerConfig from(ResourceConfiguration config,
+      Map<PartitionId, UserConfig> partitionConfigs) {
+    Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
+    for (PartitionId partitionId : config.getPartitionIds()) {
+      if (partitionConfigs.containsKey(partitionId)) {
+        partitionMap
+            .put(partitionId, new Partition(partitionId, partitionConfigs.get(partitionId)));
+      } else {
+        partitionMap.put(partitionId, new Partition(partitionId));
+      }
+    }
+    return new RebalancerConfig(config, partitionMap);
   }
 
   /**
    * Assembles a RebalancerConfig
    */
-  public static class Builder {
-    private final ResourceId _id;
-    private final IdealState _idealState;
+  public static abstract class Builder<T extends Builder<T>> {
+    protected final ResourceId _resourceId;
+    protected final Map<PartitionId, Partition> _partitionMap;
+    protected StateModelDefId _stateModelDefId;
+    private StateModelFactoryId _stateModelFactoryId;
     private boolean _anyLiveParticipant;
-    private ResourceAssignment _resourceAssignment;
-    private final Map<PartitionId, Partition> _partitionMap;
+    private int _replicaCount;
+    private int _maxPartitionsPerParticipant;
 
     /**
      * Configure the rebalancer for a resource
      * @param resourceId the resource to rebalance
      */
     public Builder(ResourceId resourceId) {
-      _id = resourceId;
-      _idealState = new IdealState(resourceId);
+      _resourceId = resourceId;
       _anyLiveParticipant = false;
+      _replicaCount = 0;
+      _maxPartitionsPerParticipant = Integer.MAX_VALUE;
       _partitionMap = new HashMap<PartitionId, Partition>();
     }
 
     /**
-     * Set the rebalancer mode
-     * @param mode {@link RebalanceMode}
-     */
-    public Builder rebalancerMode(RebalanceMode mode) {
-      _idealState.setRebalanceMode(mode);
-      return this;
-    }
-
-    /**
-     * Set a user-defined rebalancer
-     * @param rebalancerRef a reference to the rebalancer
-     * @return Builder
-     */
-    public Builder rebalancer(RebalancerRef rebalancerRef) {
-      _idealState.setRebalancerRef(rebalancerRef);
-      return this;
-    }
-
-    /**
      * Set the state model definition
      * @param stateModelDefId state model identifier
      * @return Builder
      */
-    public Builder stateModelDef(StateModelDefId stateModelDefId) {
-      _idealState.setStateModelDefId(stateModelDefId);
-      return this;
-    }
-
-    /**
-     * Set the full assignment of partitions to nodes and corresponding states
-     * @param resourceAssignment resource assignment
-     * @return Builder
-     */
-    public Builder resourceAssignment(ResourceAssignment resourceAssignment) {
-      _resourceAssignment = resourceAssignment;
-      return this;
+    public T stateModelDef(StateModelDefId stateModelDefId) {
+      _stateModelDefId = stateModelDefId;
+      return self();
     }
 
     /**
@@ -287,9 +366,9 @@ public class RebalancerConfig extends NamespacedConfig {
      * @param replicaCount number of replicas
      * @return Builder
      */
-    public Builder replicaCount(int replicaCount) {
-      _idealState.setReplicas(Integer.toString(replicaCount));
-      return this;
+    public T replicaCount(int replicaCount) {
+      _replicaCount = replicaCount;
+      return self();
     }
 
     /**
@@ -297,9 +376,9 @@ public class RebalancerConfig extends NamespacedConfig {
      * @param maxPartitions
      * @return Builder
      */
-    public Builder maxPartitionsPerParticipant(int maxPartitions) {
-      _idealState.setMaxPartitionsPerInstance(maxPartitions);
-      return this;
+    public T maxPartitionsPerParticipant(int maxPartitions) {
+      _maxPartitionsPerParticipant = maxPartitions;
+      return self();
     }
 
     /**
@@ -307,19 +386,19 @@ public class RebalancerConfig extends NamespacedConfig {
      * @param stateModelFactoryId
      * @return Builder
      */
-    public Builder stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
-      _idealState.setStateModelFactoryId(stateModelFactoryId);
-      return this;
+    public T stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+      _stateModelFactoryId = stateModelFactoryId;
+      return self();
     }
 
     /**
      * Set whether any live participant should be used in rebalancing
      * @param useAnyParticipant true if any live participant can be used, false otherwise
-     * @return
+     * @return Builder
      */
-    public Builder anyLiveParticipant(boolean useAnyParticipant) {
-      _anyLiveParticipant = true;
-      return this;
+    public T anyLiveParticipant(boolean useAnyParticipant) {
+      _anyLiveParticipant = useAnyParticipant;
+      return self();
     }
 
     /**
@@ -327,9 +406,9 @@ public class RebalancerConfig extends NamespacedConfig {
      * @param partition fully-qualified partition
      * @return Builder
      */
-    public Builder addPartition(Partition partition) {
+    public T addPartition(Partition partition) {
       _partitionMap.put(partition.getId(), partition);
-      return this;
+      return self();
     }
 
     /**
@@ -337,11 +416,11 @@ public class RebalancerConfig extends NamespacedConfig {
      * @param partitions
      * @return Builder
      */
-    public Builder addPartitions(Collection<Partition> partitions) {
+    public T addPartitions(Collection<Partition> partitions) {
       for (Partition partition : partitions) {
         addPartition(partition);
       }
-      return this;
+      return self();
     }
 
     /**
@@ -351,28 +430,35 @@ public class RebalancerConfig extends NamespacedConfig {
      * @param partitionCount number of partitions to add
      * @return Builder
      */
-    public Builder addPartitions(int partitionCount) {
+    public T addPartitions(int partitionCount) {
       for (int i = 0; i < partitionCount; i++) {
-        addPartition(new Partition(Id.partition(_id, Integer.toString(i)), null));
+        addPartition(new Partition(Id.partition(_resourceId, Integer.toString(i)), null));
       }
-      return this;
+      return self();
     }
 
     /**
-     * Assemble a RebalancerConfig
-     * @return a fully defined rebalancer configuration
+     * Update a RebalancerConfig with built fields
      */
-    public RebalancerConfig build() {
-      // add a single partition if one hasn't been added yet since 1 partition is default
-      if (_partitionMap.isEmpty()) {
-        addPartitions(1);
-      }
-      if (_anyLiveParticipant) {
-        return new RebalancerConfig(_partitionMap, _idealState, _resourceAssignment,
-            Integer.parseInt(_idealState.getReplicas()));
-      } else {
-        return new RebalancerConfig(_partitionMap, _idealState, _resourceAssignment, -1);
+    protected void update(RebalancerConfig rebalancerConfig) {
+      rebalancerConfig.setReplicaCount(_replicaCount);
+      rebalancerConfig.setCanAssignAnyLiveParticipant(_anyLiveParticipant);
+      rebalancerConfig.setMaxPartitionsPerParticipant(_maxPartitionsPerParticipant);
+      if (_stateModelFactoryId != null) {
+        rebalancerConfig.setStateModelFactoryId(_stateModelFactoryId);
       }
     }
+
+    /**
+     * Get a reference to the actual builder class
+     * @return Builder
+     */
+    protected abstract T self();
+
+    /**
+     * Get a fully-initialized RebalancerConfig instance
+     * @return RebalancerConfig based on what was built
+     */
+    public abstract RebalancerConfig build();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
index 5c7ce56..f610585 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
@@ -19,7 +19,7 @@ package org.apache.helix.api;
  * under the License.
  */
 
-import org.apache.helix.controller.rebalancer.NewRebalancer;
+import org.apache.helix.controller.rebalancer.NewUserDefinedRebalancer;
 import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 
@@ -35,9 +35,10 @@ public class RebalancerRef {
   /**
    * @return
    */
-  public NewRebalancer getRebalancer() {
+  public NewUserDefinedRebalancer getRebalancer() {
     try {
-      return (NewRebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
+      return (NewUserDefinedRebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName)
+          .newInstance());
     } catch (Exception e) {
       LOG.warn("Exception while invoking custom rebalancer class:" + _rebalancerClassName, e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 2ee7a4f..c598550 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -24,8 +24,10 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.HelixConstants.StateModelToken;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.ResourceAssignment;
 
@@ -45,7 +47,7 @@ public class Resource {
    */
   public Resource(ResourceId id, IdealState idealState, ResourceAssignment resourceAssignment,
       ExternalView externalView, UserConfig userConfig,
-      Map<PartitionId, UserConfig> partitionUserConfigs, int liveParticipantCount) {
+      Map<PartitionId, UserConfig> partitionUserConfigs) {
     Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
         new HashMap<PartitionId, Map<String, String>>();
     Set<PartitionId> partitionSet = idealState.getPartitionSet();
@@ -65,10 +67,57 @@ public class Resource {
 
     }
 
-    SchedulerTaskConfig schedulerTaskConfig = schedulerTaskConfig(idealState);
+    String replicas = idealState.getReplicas();
+    boolean anyLiveParticipant = false;
+    int replicaCount = 0;
+    if (replicas.equals(StateModelToken.ANY_LIVEINSTANCE.toString())) {
+      anyLiveParticipant = true;
+    } else {
+      replicaCount = Integer.parseInt(replicas);
+    }
 
-    RebalancerConfig rebalancerConfig =
-        new RebalancerConfig(partitionMap, idealState, resourceAssignment, liveParticipantCount);
+    // Build a RebalancerConfig specific to the mode
+    RebalancerConfig rebalancerConfig = null;
+    if (idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO) {
+      rebalancerConfig =
+          new FullAutoRebalancerConfig.Builder(id).addPartitions(partitionMap.values())
+              .anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
+              .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
+              .stateModelDef(idealState.getStateModelDefId())
+              .stateModelFactoryId(idealState.getStateModelFactoryId()).build();
+    } else if (idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+      SemiAutoRebalancerConfig semiAutoConfig =
+          new SemiAutoRebalancerConfig.Builder(id).addPartitions(partitionMap.values())
+              .anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
+              .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
+              .stateModelDef(idealState.getStateModelDefId())
+              .stateModelFactoryId(idealState.getStateModelFactoryId()).build();
+      for (PartitionId partitionId : partitionMap.keySet()) {
+        semiAutoConfig.setPreferenceList(partitionId, idealState.getPreferenceList(partitionId));
+      }
+      rebalancerConfig = semiAutoConfig;
+    } else if (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+      CustomRebalancerConfig customConfig =
+          new CustomRebalancerConfig.Builder(id).addPartitions(partitionMap.values())
+              .anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
+              .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
+              .stateModelDef(idealState.getStateModelDefId())
+              .stateModelFactoryId(idealState.getStateModelFactoryId()).build();
+      for (PartitionId partitionId : partitionMap.keySet()) {
+        customConfig.setPreferenceMap(partitionId, idealState.getParticipantStateMap(partitionId));
+      }
+      rebalancerConfig = customConfig;
+    } else if (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+      rebalancerConfig =
+          new UserDefinedRebalancerConfig.Builder(id).addPartitions(partitionMap.values())
+              .anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
+              .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
+              .stateModelDef(idealState.getStateModelDefId())
+              .stateModelFactoryId(idealState.getStateModelFactoryId())
+              .rebalancerRef(idealState.getRebalancerRef()).build();
+    }
+
+    SchedulerTaskConfig schedulerTaskConfig = schedulerTaskConfig(idealState);
 
     _config =
         new ResourceConfig(id, schedulerTaskConfig, rebalancerConfig, userConfig,

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/SemiAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/SemiAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/SemiAutoRebalancerConfig.java
new file mode 100644
index 0000000..a5284a8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/SemiAutoRebalancerConfig.java
@@ -0,0 +1,146 @@
+package org.apache.helix.api;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Configuration properties for the SEMI_AUTO rebalancer
+ */
+public final class SemiAutoRebalancerConfig extends RebalancerConfig {
+  /**
+   * Instantiate a new config for SEMI_AUTO
+   * @param resourceId the resource to rebalance
+   * @param stateModelDefId the state model that the resource follows
+   * @param partitionMap map of partition id to partition
+   */
+  public SemiAutoRebalancerConfig(ResourceId resourceId, StateModelDefId stateModelDefId,
+      Map<PartitionId, Partition> partitionMap) {
+    super(resourceId, RebalanceMode.SEMI_AUTO, stateModelDefId, partitionMap);
+  }
+
+  /**
+   * Instantiate from a base RebalancerConfig
+   * @param config populated rebalancer config
+   */
+  private SemiAutoRebalancerConfig(RebalancerConfig config) {
+    super(config);
+  }
+
+  /**
+   * Get the preference list for a partition
+   * @param partitionId the partition to look up
+   * @return preference list ordered from most preferred to least preferred
+   */
+  public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
+    List<String> rawPreferenceList = getListField(partitionId.stringify());
+    if (rawPreferenceList != null) {
+      return IdealState.preferenceListFromStringList(rawPreferenceList);
+    }
+    return Collections.emptyList();
+  }
+
+  /**
+   * Set the preference list for a partition
+   * @param partitionId the partition to set
+   * @param preferenceList preference list ordered from most preferred to least preferred
+   */
+  public void setPreferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
+    setListField(partitionId.toString(), IdealState.stringListFromPreferenceList(preferenceList));
+  }
+
+  /**
+   * Get all the preference lists for a partition
+   * @return map of partition id to list of participants ordered from most preferred to least
+   *         preferred
+   */
+  public Map<PartitionId, List<ParticipantId>> getPreferenceLists() {
+    Map<String, List<String>> rawPreferenceLists = getListFields();
+    return IdealState.preferenceListsFromStringLists(rawPreferenceLists);
+  }
+
+  /**
+   * Set all the preference lists for a partition
+   * @param preferenceLists map of partition id to list of participants ordered from most preferred
+   *          to least preferred
+   */
+  public void setPreferenceLists(Map<PartitionId, List<ParticipantId>> preferenceLists) {
+    setListFields(IdealState.stringListsFromPreferenceLists(preferenceLists));
+  }
+
+  /**
+   * Get a SemiAutoRebalancerConfig from a RebalancerConfig
+   * @param config populated RebalancerConfig
+   * @return SemiAutoRebalancerConfig
+   */
+  public static SemiAutoRebalancerConfig from(RebalancerConfig config) {
+    return new SemiAutoRebalancerConfig(config);
+  }
+
+  /**
+   * Assembler for a SEMI_AUTO configuration
+   */
+  public static class Builder extends RebalancerConfig.Builder<Builder> {
+    private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
+
+    /**
+     * Build for a specific resource
+     * @param resourceId the resource to rebalance
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+      _preferenceLists = new HashMap<PartitionId, List<ParticipantId>>();
+    }
+
+    /**
+     * Add a preference list of a partition
+     * @param partitionId the partition to set
+     * @param preferenceList list of participant ids, most preferred first
+     * @return Builder
+     */
+    public Builder preferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
+      _preferenceLists.put(partitionId, preferenceList);
+      return this;
+    }
+
+    @Override
+    public SemiAutoRebalancerConfig build() {
+      if (_partitionMap.isEmpty()) {
+        addPartitions(1);
+      }
+      SemiAutoRebalancerConfig config =
+          new SemiAutoRebalancerConfig(_resourceId, _stateModelDefId, _partitionMap);
+      update(config);
+      config.setPreferenceLists(_preferenceLists);
+      return config;
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java
new file mode 100644
index 0000000..345c80b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java
@@ -0,0 +1,113 @@
+package org.apache.helix.api;
+
+import java.util.Map;
+
+import org.apache.helix.model.IdealState.RebalanceMode;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Configuration properties for the USER_DEFINED rebalancer. If additional fields are necessary, all
+ * getters and setters for simple, list, and map fields are available.
+ */
+public final class UserDefinedRebalancerConfig extends RebalancerConfig {
+  public enum Fields {
+    REBALANCER_CLASS_NAME
+  }
+
+  /**
+   * Instantiate a new config for USER_DEFINED
+   * @param resourceId the resource to rebalance
+   * @param stateModelDefId the state model that the resource follows
+   * @param partitionMap map of partition id to partition
+   * @param rebalancerRef instantiated rebalancer reference
+   */
+  public UserDefinedRebalancerConfig(ResourceId resourceId, StateModelDefId stateModelDefId,
+      Map<PartitionId, Partition> partitionMap, RebalancerRef rebalancerRef) {
+    super(resourceId, RebalanceMode.USER_DEFINED, stateModelDefId, partitionMap);
+    setSimpleField(Fields.REBALANCER_CLASS_NAME.toString(), rebalancerRef.toString());
+  }
+
+  /**
+   * Instantiate from a base RebalancerConfig
+   * @param config populated rebalancer config
+   */
+  private UserDefinedRebalancerConfig(RebalancerConfig config) {
+    super(config);
+  }
+
+  /**
+   * Get a reference to the class used to rebalance this resource
+   * @return RebalancerRef, or null if none set
+   */
+  public RebalancerRef getRebalancerRef() {
+    String rebalancerClassName = getStringField(Fields.REBALANCER_CLASS_NAME.toString(), null);
+    if (rebalancerClassName != null) {
+      return RebalancerRef.from(rebalancerClassName);
+    }
+    return null;
+  }
+
+  /**
+   * Get a UserDefinedRebalancerConfig from a RebalancerConfig
+   * @param config populated RebalancerConfig
+   * @return UserDefinedRebalancerConfig
+   */
+  public static UserDefinedRebalancerConfig from(RebalancerConfig config) {
+    return new UserDefinedRebalancerConfig(config);
+  }
+
+  /**
+   * Assembler for a USER_DEFINED configuration
+   */
+  public static class Builder extends RebalancerConfig.Builder<Builder> {
+    private RebalancerRef _rebalancerRef;
+
+    /**
+     * Build for a specific resource
+     * @param resourceId the resource to rebalance
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+    }
+
+    public Builder rebalancerRef(RebalancerRef rebalancerRef) {
+      _rebalancerRef = rebalancerRef;
+      return this;
+    }
+
+    @Override
+    public UserDefinedRebalancerConfig build() {
+      if (_partitionMap.isEmpty()) {
+        addPartitions(1);
+      }
+      UserDefinedRebalancerConfig config =
+          new UserDefinedRebalancerConfig(_resourceId, _stateModelDefId, _partitionMap,
+              _rebalancerRef);
+      update(config);
+      return config;
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
index 6e13a14..798e883 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.rebalancer;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -29,17 +30,15 @@ import java.util.Set;
 
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.FullAutoRebalancerConfig;
 import org.apache.helix.api.Id;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.State;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
-import org.apache.helix.controller.stages.NewCurrentStateOutput;
+import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
@@ -61,19 +60,20 @@ import com.google.common.collect.Lists;
  * The output is a preference list and a mapping based on that preference list, i.e. partition p
  * has a replica on node k with state s.
  */
-public class NewAutoRebalancer implements NewRebalancer {
+public class NewAutoRebalancer implements NewRebalancer<FullAutoRebalancerConfig> {
   // These should be final, but are initialized in init rather than a constructor
   private AutoRebalanceStrategy _algorithm;
 
   private static final Logger LOG = Logger.getLogger(NewAutoRebalancer.class);
 
   @Override
-  public ResourceAssignment computeResourceMapping(ResourceConfig resourceConfig, Cluster cluster,
-      StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
+  public ResourceAssignment computeResourceMapping(FullAutoRebalancerConfig config,
+      Cluster cluster, ResourceCurrentState currentState) {
+    StateModelDefinition stateModelDef =
+        cluster.getStateModelMap().get(config.getStateModelDefId());
     // Compute a preference list based on the current ideal state
-    List<PartitionId> partitions = new ArrayList<PartitionId>(resourceConfig.getPartitionSet());
+    List<PartitionId> partitions = new ArrayList<PartitionId>(config.getPartitionSet());
     List<String> partitionNames = Lists.transform(partitions, Functions.toStringFunction());
-    RebalancerConfig config = resourceConfig.getRebalancerConfig();
     Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
     Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
     int replicas = -1;
@@ -91,7 +91,7 @@ public class NewAutoRebalancer implements NewRebalancer {
         new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
     List<String> liveNodes = Lists.transform(liveParticipantList, Functions.toStringFunction());
     Map<PartitionId, Map<ParticipantId, State>> currentMapping =
-        currentMapping(resourceConfig, currentStateOutput, stateCountMap);
+        currentMapping(config, currentState, stateCountMap);
 
     // If there are nodes tagged with resource, use only those nodes
     Set<String> taggedNodes = new HashSet<String>();
@@ -104,7 +104,8 @@ public class NewAutoRebalancer implements NewRebalancer {
     }
     if (taggedNodes.size() > 0) {
       if (LOG.isInfoEnabled()) {
-        LOG.info("found the following instances with tag " + resourceConfig.getId() + " " + taggedNodes);
+        LOG.info("found the following instances with tag " + config.getResourceId() + " "
+            + taggedNodes);
       }
       liveNodes = new ArrayList<String>(taggedNodes);
     }
@@ -121,8 +122,8 @@ public class NewAutoRebalancer implements NewRebalancer {
     }
     ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
     _algorithm =
-        new AutoRebalanceStrategy(resourceConfig.getId().toString(), partitionNames, stateCountMap,
-            maxPartition, placementScheme);
+        new AutoRebalanceStrategy(config.getResourceId().stringify(), partitionNames,
+            stateCountMap, maxPartition, placementScheme);
     ZNRecord newMapping =
         _algorithm.computePartitionAssignment(liveNodes,
             ResourceAssignment.stringMapsFromReplicaMaps(currentMapping), allNodes);
@@ -133,40 +134,44 @@ public class NewAutoRebalancer implements NewRebalancer {
 
     // compute a full partition mapping for the resource
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + resourceConfig.getId());
+      LOG.debug("Processing resource:" + config.getResourceId());
     }
-    ResourceAssignment partitionMapping = new ResourceAssignment(resourceConfig.getId());
+    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
     for (PartitionId partition : partitions) {
       Set<ParticipantId> disabledParticipantsForPartition =
           NewConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition);
+      List<String> rawPreferenceList = newMapping.getListField(partition.stringify());
+      if (rawPreferenceList == null) {
+        rawPreferenceList = Collections.emptyList();
+      }
       List<ParticipantId> preferenceList =
-          Lists.transform(newMapping.getListField(partition.stringify()),
-              new Function<String, ParticipantId>() {
-                @Override
-                public ParticipantId apply(String participantName) {
-                  return Id.participant(participantName);
-                }
-              });
+          Lists.transform(rawPreferenceList, new Function<String, ParticipantId>() {
+            @Override
+            public ParticipantId apply(String participantName) {
+              return Id.participant(participantName);
+            }
+          });
       preferenceList =
           NewConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
       Map<ParticipantId, State> bestStateForPartition =
           NewConstraintBasedAssignment.computeAutoBestStateForPartition(liveParticipants,
               stateModelDef, preferenceList,
-              currentStateOutput.getCurrentStateMap(resourceConfig.getId(), partition),
+              currentState.getCurrentStateMap(config.getResourceId(), partition),
               disabledParticipantsForPartition);
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
   }
 
-  private Map<PartitionId, Map<ParticipantId, State>> currentMapping(ResourceConfig resourceConfig,
-      NewCurrentStateOutput currentStateOutput, Map<String, Integer> stateCountMap) {
+  private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
+      FullAutoRebalancerConfig config, ResourceCurrentState currentStateOutput,
+      Map<String, Integer> stateCountMap) {
     Map<PartitionId, Map<ParticipantId, State>> map =
         new HashMap<PartitionId, Map<ParticipantId, State>>();
 
-    for (PartitionId partition : resourceConfig.getPartitionSet()) {
+    for (PartitionId partition : config.getPartitionSet()) {
       Map<ParticipantId, State> curStateMap =
-          currentStateOutput.getCurrentStateMap(resourceConfig.getId(), partition);
+          currentStateOutput.getCurrentStateMap(config.getResourceId(), partition);
       map.put(partition, new HashMap<ParticipantId, State>());
       for (ParticipantId node : curStateMap.keySet()) {
         State state = curStateMap.get(node);
@@ -176,7 +181,7 @@ public class NewAutoRebalancer implements NewRebalancer {
       }
 
       Map<ParticipantId, State> pendingStateMap =
-          currentStateOutput.getPendingStateMap(resourceConfig.getId(), partition);
+          currentStateOutput.getPendingStateMap(config.getResourceId(), partition);
       for (ParticipantId node : pendingStateMap.keySet()) {
         State state = pendingStateMap.get(node);
         if (stateCountMap.containsKey(state)) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
index 5725ca9..5570c36 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
@@ -25,15 +25,13 @@ import java.util.Set;
 
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.CustomRebalancerConfig;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.State;
 import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
-import org.apache.helix.controller.stages.NewCurrentStateOutput;
+import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -47,21 +45,22 @@ import org.apache.log4j.Logger;
  * The output is a verified mapping based on that preference list, i.e. partition p has a replica
  * on node k with state s, where s may be a dropped or error state if necessary.
  */
-public class NewCustomRebalancer implements NewRebalancer {
+public class NewCustomRebalancer implements NewRebalancer<CustomRebalancerConfig> {
 
   private static final Logger LOG = Logger.getLogger(NewCustomRebalancer.class);
 
   @Override
-  public ResourceAssignment computeResourceMapping(ResourceConfig resourceConfig, Cluster cluster,
-      StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
+  public ResourceAssignment computeResourceMapping(CustomRebalancerConfig config, Cluster cluster,
+      ResourceCurrentState currentState) {
+    StateModelDefinition stateModelDef =
+        cluster.getStateModelMap().get(config.getStateModelDefId());
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + resourceConfig.getId());
+      LOG.debug("Processing resource:" + config.getResourceId());
     }
-    ResourceAssignment partitionMapping = new ResourceAssignment(resourceConfig.getId());
-    RebalancerConfig config = resourceConfig.getRebalancerConfig();
-    for (PartitionId partition : resourceConfig.getPartitionSet()) {
+    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+    for (PartitionId partition : config.getPartitionSet()) {
       Map<ParticipantId, State> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resourceConfig.getId(), partition);
+          currentState.getCurrentStateMap(config.getResourceId(), partition);
       Set<ParticipantId> disabledInstancesForPartition =
           NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
               partition);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
index feac955..4792877 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
@@ -20,28 +20,23 @@ package org.apache.helix.controller.rebalancer;
  */
 
 import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.ResourceConfig;
-import org.apache.helix.controller.stages.NewCurrentStateOutput;
+import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
 
 /**
- * Allows one to come up with custom implementation of a rebalancer.<br/>
- * This will be invoked on all changes that happen in the cluster.<br/>
- * Simply return the newIdealState for a resource in this method.<br/>
+ * Arbitrary configurable rebalancer interface.
+ * @see {@link NewUserDefinedRebalancer} for an interface with a plugged-in class
  */
-public interface NewRebalancer {
+interface NewRebalancer<T extends RebalancerConfig> {
 
   /**
    * Given a resource, existing mapping, and liveness of resources, compute a new mapping of
    * resources.
-   * @param resource the resource for which a mapping will be computed
+   * @param rebalancerConfig resource properties used by the rebalancer
    * @param cluster a snapshot of the entire cluster state
-   * @param stateModelDef the state model for which to rebalance the resource
-   * @param currentStateOutput a combination of the current states and pending current states
+   * @param currentState a combination of the current states and pending current states
    */
-  ResourceAssignment computeResourceMapping(final ResourceConfig resourceConfig,
-      final Cluster cluster, final StateModelDefinition stateModelDef,
-      final NewCurrentStateOutput currentStateOutput);
+  ResourceAssignment computeResourceMapping(final T rebalancerConfig, final Cluster cluster,
+      final ResourceCurrentState currentState);
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
index 6a5072c..cb061ab 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
@@ -26,12 +26,10 @@ import java.util.Set;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.SemiAutoRebalancerConfig;
 import org.apache.helix.api.State;
 import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
-import org.apache.helix.controller.stages.NewCurrentStateOutput;
+import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -45,21 +43,22 @@ import org.apache.log4j.Logger;
  * The output is a mapping based on that preference list, i.e. partition p has a replica on node k
  * with state s.
  */
-public class NewSemiAutoRebalancer implements NewRebalancer {
+public class NewSemiAutoRebalancer implements NewRebalancer<SemiAutoRebalancerConfig> {
 
   private static final Logger LOG = Logger.getLogger(NewSemiAutoRebalancer.class);
 
   @Override
-  public ResourceAssignment computeResourceMapping(ResourceConfig resourceConfig, Cluster cluster,
-      StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
+  public ResourceAssignment computeResourceMapping(SemiAutoRebalancerConfig config,
+      Cluster cluster, ResourceCurrentState currentState) {
+    StateModelDefinition stateModelDef =
+        cluster.getStateModelMap().get(config.getStateModelDefId());
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + resourceConfig.getId());
+      LOG.debug("Processing resource:" + config.getResourceId());
     }
-    ResourceAssignment partitionMapping = new ResourceAssignment(resourceConfig.getId());
-    RebalancerConfig config = resourceConfig.getRebalancerConfig();
-    for (PartitionId partition : resourceConfig.getPartitionSet()) {
+    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+    for (PartitionId partition : config.getPartitionSet()) {
       Map<ParticipantId, State> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resourceConfig.getId(), partition);
+          currentState.getCurrentStateMap(config.getResourceId(), partition);
       Set<ParticipantId> disabledInstancesForPartition =
           NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
               partition);


[2/3] [HELIX-244] Redesign rebalancers using rebalancer-specific configs

Posted by zz...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewUserDefinedRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewUserDefinedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewUserDefinedRebalancer.java
new file mode 100644
index 0000000..ae7bba4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewUserDefinedRebalancer.java
@@ -0,0 +1,35 @@
+package org.apache.helix.controller.rebalancer;
+
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.UserDefinedRebalancerConfig;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.ResourceAssignment;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Allows one to come up with a user-defined implementation of a rebalancer.<br/>
+ * This will be invoked on all changes that happen in the cluster.<br/>
+ * Simply return the resourceMapping for a resource in this method.<br/>
+ */
+public interface NewUserDefinedRebalancer extends NewRebalancer<UserDefinedRebalancerConfig> {
+  @Override
+  ResourceAssignment computeResourceMapping(final UserDefinedRebalancerConfig rebalancerConfig,
+      final Cluster cluster, final ResourceCurrentState currentState);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index 9abf67c..ae0278b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -26,6 +26,5 @@ public enum AttributeName {
   MESSAGES_ALL,
   MESSAGES_SELECTED,
   MESSAGES_THROTTLE,
-  LOCAL_STATE,
-  STATE_MODEL_DEFINITIONS
+  LOCAL_STATE
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
index bc88b73..7c4341b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -23,19 +23,22 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.CustomRebalancerConfig;
+import org.apache.helix.api.FullAutoRebalancerConfig;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
-import org.apache.helix.api.Resource;
 import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.SemiAutoRebalancerConfig;
 import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.UserDefinedRebalancerConfig;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.controller.rebalancer.NewAutoRebalancer;
 import org.apache.helix.controller.rebalancer.NewCustomRebalancer;
-import org.apache.helix.controller.rebalancer.NewRebalancer;
 import org.apache.helix.controller.rebalancer.NewSemiAutoRebalancer;
+import org.apache.helix.controller.rebalancer.NewUserDefinedRebalancer;
 import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.ResourceAssignment;
@@ -56,7 +59,7 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
       LOG.info("START BestPossibleStateCalcStage.process()");
     }
 
-    NewCurrentStateOutput currentStateOutput =
+    ResourceCurrentState currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
     Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
@@ -86,7 +89,7 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
    * @return assignment for the dropped resource
    */
   private ResourceAssignment mapDroppedResource(Cluster cluster, ResourceId resourceId,
-      NewCurrentStateOutput currentStateOutput, StateModelDefinition stateModelDef) {
+      ResourceCurrentState currentStateOutput, StateModelDefinition stateModelDef) {
     ResourceAssignment partitionMapping = new ResourceAssignment(resourceId);
     Set<PartitionId> mappedPartitions =
         currentStateOutput.getCurrentStateMappedPartitions(resourceId);
@@ -105,12 +108,10 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
     return partitionMapping;
   }
 
-  // TODO check this
   private NewBestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
-      Map<ResourceId, ResourceConfig> resourceMap, NewCurrentStateOutput currentStateOutput) {
+      Map<ResourceId, ResourceConfig> resourceMap, ResourceCurrentState currentStateOutput) {
     NewBestPossibleStateOutput output = new NewBestPossibleStateOutput();
-    Map<StateModelDefId, StateModelDefinition> stateModelDefs =
-        event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
+    Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
 
     for (ResourceId resourceId : resourceMap.keySet()) {
       LOG.debug("Processing resource:" + resourceId);
@@ -132,27 +133,33 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
 
       ResourceConfig resourceConfig = resourceMap.get(resourceId);
       RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
-      NewRebalancer rebalancer = null;
-      if (rebalancerConfig.getRebalancerMode() == RebalanceMode.USER_DEFINED
-          && rebalancerConfig.getRebalancerRef() != null) {
-        rebalancer = rebalancerConfig.getRebalancerRef().getRebalancer();
-      }
-      if (rebalancer == null) {
+      ResourceAssignment resourceAssignment = null;
+      if (rebalancerConfig.getRebalancerMode() == RebalanceMode.USER_DEFINED) {
+        UserDefinedRebalancerConfig config = UserDefinedRebalancerConfig.from(rebalancerConfig);
+        if (config.getRebalancerRef() != null) {
+          NewUserDefinedRebalancer rebalancer = config.getRebalancerRef().getRebalancer();
+          resourceAssignment =
+              rebalancer.computeResourceMapping(config, cluster, currentStateOutput);
+        }
+      } else {
         if (rebalancerConfig.getRebalancerMode() == RebalanceMode.FULL_AUTO) {
-          rebalancer = new NewAutoRebalancer();
+          FullAutoRebalancerConfig config = FullAutoRebalancerConfig.from(rebalancerConfig);
+          resourceAssignment =
+              new NewAutoRebalancer().computeResourceMapping(config, cluster, currentStateOutput);
         } else if (rebalancerConfig.getRebalancerMode() == RebalanceMode.SEMI_AUTO) {
-          rebalancer = new NewSemiAutoRebalancer();
-        } else {
-          rebalancer = new NewCustomRebalancer();
+          SemiAutoRebalancerConfig config = SemiAutoRebalancerConfig.from(rebalancerConfig);
+          resourceAssignment =
+              new NewSemiAutoRebalancer().computeResourceMapping(config, cluster,
+                  currentStateOutput);
+        } else if (rebalancerConfig.getRebalancerMode() == RebalanceMode.CUSTOMIZED) {
+          CustomRebalancerConfig config = CustomRebalancerConfig.from(rebalancerConfig);
+          resourceAssignment =
+              new NewCustomRebalancer().computeResourceMapping(config, cluster, currentStateOutput);
         }
       }
-
-      StateModelDefinition stateModelDef =
-          stateModelDefs.get(rebalancerConfig.getStateModelDefId());
-      ResourceAssignment resourceAssignment =
-          rebalancer.computeResourceMapping(resourceConfig, cluster, stateModelDef,
-              currentStateOutput);
       if (resourceAssignment == null) {
+        StateModelDefinition stateModelDef =
+            stateModelDefs.get(rebalancerConfig.getStateModelDefId());
         resourceAssignment =
             mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
       }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
index e0cd22f..873419c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
@@ -56,7 +56,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
           + ". Requires DataCache|RESOURCE");
     }
 
-    NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
+    ResourceCurrentState currentStateOutput = new ResourceCurrentState();
 
     for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
       ParticipantId participantId = liveParticipant.getId();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
deleted file mode 100644
index d8bbfe3..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
+++ /dev/null
@@ -1,255 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.ResourceId;
-import org.apache.helix.api.State;
-import org.apache.helix.api.StateModelDefId;
-import org.apache.helix.model.CurrentState;
-
-public class NewCurrentStateOutput {
-  /**
-   * map of resource-id to map of partition-id to map of participant-id to state
-   * represent current-state for the participant
-   */
-  private final Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> _currentStateMap;
-
-  /**
-   * map of resource-id to map of partition-id to map of participant-id to state
-   * represent pending messages for the participant
-   */
-  private final Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> _pendingStateMap;
-
-  /**
-   * map of resource-id to state model definition id
-   */
-  private final Map<ResourceId, StateModelDefId> _resourceStateModelMap;
-
-  /**
-   * map of resource-id to current-state config's
-   */
-  private final Map<ResourceId, CurrentState> _curStateMetaMap;
-
-  /**
-   * construct
-   */
-  public NewCurrentStateOutput() {
-    _currentStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
-    _pendingStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
-    _resourceStateModelMap = new HashMap<ResourceId, StateModelDefId>();
-    _curStateMetaMap = new HashMap<ResourceId, CurrentState>();
-
-  }
-
-  /**
-   * @param resourceId
-   * @param stateModelDefId
-   */
-  public void setResourceStateModelDef(ResourceId resourceId, StateModelDefId stateModelDefId) {
-    _resourceStateModelMap.put(resourceId, stateModelDefId);
-  }
-
-  /**
-   * @param resourceId
-   * @return
-   */
-  public StateModelDefId getResourceStateModelDef(ResourceId resourceId) {
-    return _resourceStateModelMap.get(resourceId);
-  }
-
-  /**
-   * @param resourceId
-   * @param bucketSize
-   */
-  public void setBucketSize(ResourceId resourceId, int bucketSize) {
-    CurrentState curStateMeta = _curStateMetaMap.get(resourceId);
-    if (curStateMeta == null) {
-      curStateMeta = new CurrentState(resourceId);
-      _curStateMetaMap.put(resourceId, curStateMeta);
-    }
-    curStateMeta.setBucketSize(bucketSize);
-  }
-
-  /**
-   * @param resourceId
-   * @return
-   */
-  public int getBucketSize(ResourceId resourceId) {
-    int bucketSize = 0;
-    CurrentState curStateMeta = _curStateMetaMap.get(resourceId);
-    if (curStateMeta != null) {
-      bucketSize = curStateMeta.getBucketSize();
-    }
-
-    return bucketSize;
-  }
-
-  /**
-   * @param stateMap
-   * @param resourceId
-   * @param partitionId
-   * @param participantId
-   * @param state
-   */
-  static void setStateMap(Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap,
-      ResourceId resourceId, PartitionId partitionId, ParticipantId participantId, State state) {
-    if (!stateMap.containsKey(resourceId)) {
-      stateMap.put(resourceId, new HashMap<PartitionId, Map<ParticipantId, State>>());
-    }
-
-    if (!stateMap.get(resourceId).containsKey(partitionId)) {
-      stateMap.get(resourceId).put(partitionId, new HashMap<ParticipantId, State>());
-    }
-    stateMap.get(resourceId).get(partitionId).put(participantId, state);
-  }
-
-  /**
-   * @param stateMap
-   * @param resourceId
-   * @param partitionId
-   * @param participantId
-   * @return state
-   */
-  static State getState(Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap,
-      ResourceId resourceId, PartitionId partitionId, ParticipantId participantId) {
-    Map<PartitionId, Map<ParticipantId, State>> map = stateMap.get(resourceId);
-    if (map != null) {
-      Map<ParticipantId, State> instanceStateMap = map.get(partitionId);
-      if (instanceStateMap != null) {
-        return instanceStateMap.get(participantId);
-      }
-    }
-    return null;
-
-  }
-
-  /**
-   * @param stateMap
-   * @param resourceId
-   * @param partitionId
-   * @return
-   */
-  static Map<ParticipantId, State> getStateMap(
-      Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap, ResourceId resourceId,
-      PartitionId partitionId) {
-    if (stateMap.containsKey(resourceId)) {
-      Map<PartitionId, Map<ParticipantId, State>> map = stateMap.get(resourceId);
-      if (map.containsKey(partitionId)) {
-        return map.get(partitionId);
-      }
-    }
-    return Collections.emptyMap();
-  }
-
-  /**
-   * @param resourceId
-   * @param partitionId
-   * @param participantId
-   * @param state
-   */
-  public void setCurrentState(ResourceId resourceId, PartitionId partitionId,
-      ParticipantId participantId, State state) {
-    setStateMap(_currentStateMap, resourceId, partitionId, participantId, state);
-  }
-
-  /**
-   * @param resourceId
-   * @param partitionId
-   * @param participantId
-   * @param state
-   */
-  public void setPendingState(ResourceId resourceId, PartitionId partitionId,
-      ParticipantId participantId, State state) {
-    setStateMap(_pendingStateMap, resourceId, partitionId, participantId, state);
-  }
-
-  /**
-   * given (resource, partition, instance), returns currentState
-   * @param resourceName
-   * @param partition
-   * @param instanceName
-   * @return
-   */
-  public State getCurrentState(ResourceId resourceId, PartitionId partitionId,
-      ParticipantId participantId) {
-    return getState(_currentStateMap, resourceId, partitionId, participantId);
-  }
-
-  /**
-   * given (resource, partition, instance), returns toState
-   * @param resourceName
-   * @param partition
-   * @param instanceName
-   * @return
-   */
-  public State getPendingState(ResourceId resourceId, PartitionId partitionId,
-      ParticipantId participantId) {
-    return getState(_pendingStateMap, resourceId, partitionId, participantId);
-  }
-
-  /**
-   * @param resourceId
-   * @param partitionId
-   * @return
-   */
-  public Map<ParticipantId, State> getCurrentStateMap(ResourceId resourceId, PartitionId partitionId) {
-    return getStateMap(_currentStateMap, resourceId, partitionId);
-  }
-
-  /**
-   * Get the partitions mapped in the current state
-   * @param resourceId resource to look up
-   * @return set of mapped partitions, or empty set if there are none
-   */
-  public Set<PartitionId> getCurrentStateMappedPartitions(ResourceId resourceId) {
-    Map<PartitionId, Map<ParticipantId, State>> currentStateMap = _currentStateMap.get(resourceId);
-    if (currentStateMap != null) {
-      return currentStateMap.keySet();
-    }
-    return Collections.emptySet();
-  }
-
-  /**
-   * @param resourceId
-   * @param partitionId
-   * @return
-   */
-  public Map<ParticipantId, State> getPendingStateMap(ResourceId resourceId, PartitionId partitionId) {
-    return getStateMap(_pendingStateMap, resourceId, partitionId);
-
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("current state= ").append(_currentStateMap);
-    sb.append(", pending state= ").append(_pendingStateMap);
-    return sb.toString();
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
index 6361dcf..8c151e3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
@@ -76,7 +76,7 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
 
-    NewCurrentStateOutput currentStateOutput =
+    ResourceCurrentState currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
 
     List<ExternalView> newExtViews = new ArrayList<ExternalView>();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
index 0c9fe9a..c74d577 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
@@ -60,11 +60,10 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
   public void process(ClusterEvent event) throws Exception {
     HelixManager manager = event.getAttribute("helixmanager");
     Cluster cluster = event.getAttribute("ClusterDataCache");
-    Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
-        event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
+    Map<StateModelDefId, StateModelDefinition> stateModelDefMap = cluster.getStateModelMap();
     Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
-    NewCurrentStateOutput currentStateOutput =
+    ResourceCurrentState currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
     NewBestPossibleStateOutput bestPossibleStateOutput =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
@@ -137,8 +136,8 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
                     .getSessionId();
             Message message =
                 createMessage(manager, resourceId, partitionId, participantId, currentState,
-                    nextState, sessionId, new StateModelDefId(stateModelDef.getId()), resourceConfig
-                        .getRebalancerConfig().getStateModelFactoryId(), bucketSize);
+                    nextState, sessionId, new StateModelDefId(stateModelDef.getId()),
+                    resourceConfig.getRebalancerConfig().getStateModelFactoryId(), bucketSize);
 
             // TODO refactor get/set timeout/inner-message
             RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
@@ -207,7 +206,7 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
     message.setTgtSessionId(participantSessionId);
     message.setSrcSessionId(Id.session(manager.getSessionId()));
     message.setStateModelDef(stateModelDefId);
-    message.setStateModelFactoryName(stateModelFactoryId.stringify());
+    message.setStateModelFactoryId(stateModelFactoryId);
     message.setBucketSize(bucketSize);
 
     return message;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
index 0ebbe74..1ec1a50 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
@@ -28,7 +28,6 @@ import java.util.TreeMap;
 
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
-import org.apache.helix.api.ParticipantConfig;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
@@ -39,10 +38,7 @@ import org.apache.helix.api.State;
 import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
@@ -91,11 +87,10 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
     Cluster cluster = event.getAttribute("ClusterDataCache");
-    Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
-        event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
+    Map<StateModelDefId, StateModelDefinition> stateModelDefMap = cluster.getStateModelMap();
     Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
-    NewCurrentStateOutput currentStateOutput =
+    ResourceCurrentState currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
     NewMessageOutput messageGenOutput = event.getAttribute(AttributeName.MESSAGES_ALL.toString());
     if (cluster == null || resourceMap == null || currentStateOutput == null

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
index 88b2d99..4200fba 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
@@ -19,19 +19,14 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-import java.util.Map;
-
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.ClusterAccessor;
 import org.apache.helix.api.ClusterId;
 import org.apache.helix.api.Id;
-import org.apache.helix.api.StateModelDefId;
-import org.apache.helix.api.StateModelDefinitionAccessor;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
@@ -50,11 +45,8 @@ public class NewReadClusterDataStage extends AbstractBaseStage {
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
     ClusterId clusterId = Id.cluster(manager.getClusterName());
     ClusterAccessor clusterAccessor = new ClusterAccessor(clusterId, accessor);
-    StateModelDefinitionAccessor stateModelDefAccessor = new StateModelDefinitionAccessor(accessor);
 
     Cluster cluster = clusterAccessor.readCluster();
-    Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
-        stateModelDefAccessor.readStateModelDefinitions();
 
     ClusterStatusMonitor clusterStatusMonitor =
         (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
@@ -75,7 +67,6 @@ public class NewReadClusterDataStage extends AbstractBaseStage {
     }
 
     event.addAttribute("ClusterDataCache", cluster);
-    event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefMap);
 
     long endTime = System.currentTimeMillis();
     LOG.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
index 50a825b..8e3006f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -19,10 +19,12 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Id;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.Partition;
 import org.apache.helix.api.PartitionId;
@@ -30,10 +32,10 @@ import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
-import org.apache.helix.api.StateModelFactoryId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.log4j.Logger;
 
 /**
@@ -82,17 +84,17 @@ public class NewResourceComputationStage extends AbstractBaseStage {
 
         // don't overwrite ideal state configs
         if (!resourceBuilderMap.containsKey(resourceId)) {
-          RebalancerConfig.Builder rebalancerConfigBuilder =
-              new RebalancerConfig.Builder(resourceId);
-          rebalancerConfigBuilder.stateModelDef(currentState.getStateModelDefId());
-          rebalancerConfigBuilder.stateModelFactoryId(new StateModelFactoryId(currentState
-              .getStateModelFactoryName()));
+          Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
           for (PartitionId partitionId : currentState.getPartitionStateMap().keySet()) {
-            rebalancerConfigBuilder.addPartition(new Partition(partitionId));
+            partitionMap.put(partitionId, new Partition(partitionId));
           }
-
+          RebalancerConfig rebalancerConfig =
+              new RebalancerConfig(resourceId, RebalanceMode.NONE,
+                  currentState.getStateModelDefId(), partitionMap);
+          rebalancerConfig.setStateModelFactoryId(Id.stateModelFactory(currentState
+              .getStateModelFactoryName()));
           ResourceConfig.Builder resourceBuilder = new ResourceConfig.Builder(resourceId);
-          resourceBuilder.rebalancerConfig(rebalancerConfigBuilder.build());
+          resourceBuilder.rebalancerConfig(rebalancerConfig);
           resourceBuilder.bucketSize(currentState.getBucketSize());
           resourceBuilder.batchMessageMode(currentState.getBatchMessageMode());
           resourceBuilderMap.put(resourceId, resourceBuilder);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
new file mode 100644
index 0000000..db33a4f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
@@ -0,0 +1,255 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.model.CurrentState;
+
+public class ResourceCurrentState {
+  /**
+   * map of resource-id to map of partition-id to map of participant-id to state
+   * represent current-state for the participant
+   */
+  private final Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> _currentStateMap;
+
+  /**
+   * map of resource-id to map of partition-id to map of participant-id to state
+   * represent pending messages for the participant
+   */
+  private final Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> _pendingStateMap;
+
+  /**
+   * map of resource-id to state model definition id
+   */
+  private final Map<ResourceId, StateModelDefId> _resourceStateModelMap;
+
+  /**
+   * map of resource-id to current-state config's
+   */
+  private final Map<ResourceId, CurrentState> _curStateMetaMap;
+
+  /**
+   * construct
+   */
+  public ResourceCurrentState() {
+    _currentStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
+    _pendingStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
+    _resourceStateModelMap = new HashMap<ResourceId, StateModelDefId>();
+    _curStateMetaMap = new HashMap<ResourceId, CurrentState>();
+
+  }
+
+  /**
+   * @param resourceId
+   * @param stateModelDefId
+   */
+  public void setResourceStateModelDef(ResourceId resourceId, StateModelDefId stateModelDefId) {
+    _resourceStateModelMap.put(resourceId, stateModelDefId);
+  }
+
+  /**
+   * @param resourceId
+   * @return
+   */
+  public StateModelDefId getResourceStateModelDef(ResourceId resourceId) {
+    return _resourceStateModelMap.get(resourceId);
+  }
+
+  /**
+   * @param resourceId
+   * @param bucketSize
+   */
+  public void setBucketSize(ResourceId resourceId, int bucketSize) {
+    CurrentState curStateMeta = _curStateMetaMap.get(resourceId);
+    if (curStateMeta == null) {
+      curStateMeta = new CurrentState(resourceId);
+      _curStateMetaMap.put(resourceId, curStateMeta);
+    }
+    curStateMeta.setBucketSize(bucketSize);
+  }
+
+  /**
+   * @param resourceId
+   * @return
+   */
+  public int getBucketSize(ResourceId resourceId) {
+    int bucketSize = 0;
+    CurrentState curStateMeta = _curStateMetaMap.get(resourceId);
+    if (curStateMeta != null) {
+      bucketSize = curStateMeta.getBucketSize();
+    }
+
+    return bucketSize;
+  }
+
+  /**
+   * @param stateMap
+   * @param resourceId
+   * @param partitionId
+   * @param participantId
+   * @param state
+   */
+  static void setStateMap(Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap,
+      ResourceId resourceId, PartitionId partitionId, ParticipantId participantId, State state) {
+    if (!stateMap.containsKey(resourceId)) {
+      stateMap.put(resourceId, new HashMap<PartitionId, Map<ParticipantId, State>>());
+    }
+
+    if (!stateMap.get(resourceId).containsKey(partitionId)) {
+      stateMap.get(resourceId).put(partitionId, new HashMap<ParticipantId, State>());
+    }
+    stateMap.get(resourceId).get(partitionId).put(participantId, state);
+  }
+
+  /**
+   * @param stateMap
+   * @param resourceId
+   * @param partitionId
+   * @param participantId
+   * @return state
+   */
+  static State getState(Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap,
+      ResourceId resourceId, PartitionId partitionId, ParticipantId participantId) {
+    Map<PartitionId, Map<ParticipantId, State>> map = stateMap.get(resourceId);
+    if (map != null) {
+      Map<ParticipantId, State> instanceStateMap = map.get(partitionId);
+      if (instanceStateMap != null) {
+        return instanceStateMap.get(participantId);
+      }
+    }
+    return null;
+
+  }
+
+  /**
+   * @param stateMap
+   * @param resourceId
+   * @param partitionId
+   * @return
+   */
+  static Map<ParticipantId, State> getStateMap(
+      Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap, ResourceId resourceId,
+      PartitionId partitionId) {
+    if (stateMap.containsKey(resourceId)) {
+      Map<PartitionId, Map<ParticipantId, State>> map = stateMap.get(resourceId);
+      if (map.containsKey(partitionId)) {
+        return map.get(partitionId);
+      }
+    }
+    return Collections.emptyMap();
+  }
+
+  /**
+   * @param resourceId
+   * @param partitionId
+   * @param participantId
+   * @param state
+   */
+  public void setCurrentState(ResourceId resourceId, PartitionId partitionId,
+      ParticipantId participantId, State state) {
+    setStateMap(_currentStateMap, resourceId, partitionId, participantId, state);
+  }
+
+  /**
+   * @param resourceId
+   * @param partitionId
+   * @param participantId
+   * @param state
+   */
+  public void setPendingState(ResourceId resourceId, PartitionId partitionId,
+      ParticipantId participantId, State state) {
+    setStateMap(_pendingStateMap, resourceId, partitionId, participantId, state);
+  }
+
+  /**
+   * given (resource, partition, instance), returns currentState
+   * @param resourceName
+   * @param partition
+   * @param instanceName
+   * @return
+   */
+  public State getCurrentState(ResourceId resourceId, PartitionId partitionId,
+      ParticipantId participantId) {
+    return getState(_currentStateMap, resourceId, partitionId, participantId);
+  }
+
+  /**
+   * given (resource, partition, instance), returns toState
+   * @param resourceName
+   * @param partition
+   * @param instanceName
+   * @return
+   */
+  public State getPendingState(ResourceId resourceId, PartitionId partitionId,
+      ParticipantId participantId) {
+    return getState(_pendingStateMap, resourceId, partitionId, participantId);
+  }
+
+  /**
+   * @param resourceId
+   * @param partitionId
+   * @return
+   */
+  public Map<ParticipantId, State> getCurrentStateMap(ResourceId resourceId, PartitionId partitionId) {
+    return getStateMap(_currentStateMap, resourceId, partitionId);
+  }
+
+  /**
+   * Get the partitions mapped in the current state
+   * @param resourceId resource to look up
+   * @return set of mapped partitions, or empty set if there are none
+   */
+  public Set<PartitionId> getCurrentStateMappedPartitions(ResourceId resourceId) {
+    Map<PartitionId, Map<ParticipantId, State>> currentStateMap = _currentStateMap.get(resourceId);
+    if (currentStateMap != null) {
+      return currentStateMap.keySet();
+    }
+    return Collections.emptySet();
+  }
+
+  /**
+   * @param resourceId
+   * @param partitionId
+   * @return
+   */
+  public Map<ParticipantId, State> getPendingStateMap(ResourceId resourceId, PartitionId partitionId) {
+    return getStateMap(_pendingStateMap, resourceId, partitionId);
+
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("current state= ").append(_currentStateMap);
+    sb.append(", pending state= ").append(_pendingStateMap);
+    return sb.toString();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 72046bf..a522352 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -27,10 +27,10 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.Map.Entry;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
@@ -96,23 +96,27 @@ public class AutoRebalanceStrategy {
    */
   public ZNRecord computePartitionAssignment(final List<String> liveNodes,
       final Map<String, Map<String, String>> currentMapping, final List<String> allNodes) {
+    List<String> sortedLiveNodes = new ArrayList<String>(liveNodes);
+    Collections.sort(sortedLiveNodes);
+    List<String> sortedAllNodes = new ArrayList<String>(allNodes);
+    Collections.sort(sortedAllNodes);
     int numReplicas = countStateReplicas();
     ZNRecord znRecord = new ZNRecord(_resourceName);
-    if (liveNodes.size() == 0) {
+    if (sortedLiveNodes.size() == 0) {
       return znRecord;
     }
-    int distRemainder = (numReplicas * _partitions.size()) % liveNodes.size();
-    int distFloor = (numReplicas * _partitions.size()) / liveNodes.size();
+    int distRemainder = (numReplicas * _partitions.size()) % sortedLiveNodes.size();
+    int distFloor = (numReplicas * _partitions.size()) / sortedLiveNodes.size();
     _nodeMap = new HashMap<String, Node>();
     _liveNodesList = new ArrayList<Node>();
 
-    for (String id : allNodes) {
+    for (String id : sortedAllNodes) {
       Node node = new Node(id);
       node.capacity = 0;
       node.hasCeilingCapacity = false;
       _nodeMap.put(id, node);
     }
-    for (int i = 0; i < liveNodes.size(); i++) {
+    for (int i = 0; i < sortedLiveNodes.size(); i++) {
       boolean usingCeiling = false;
       int targetSize = (_maximumPerNode > 0) ? Math.min(distFloor, _maximumPerNode) : distFloor;
       if (distRemainder > 0 && targetSize < _maximumPerNode) {
@@ -120,7 +124,7 @@ public class AutoRebalanceStrategy {
         distRemainder = distRemainder - 1;
         usingCeiling = true;
       }
-      Node node = _nodeMap.get(liveNodes.get(i));
+      Node node = _nodeMap.get(sortedLiveNodes.get(i));
       node.isAlive = true;
       node.capacity = targetSize;
       node.hasCeilingCapacity = usingCeiling;
@@ -131,7 +135,7 @@ public class AutoRebalanceStrategy {
     _stateMap = generateStateMap();
 
     // compute the preferred mapping if all nodes were up
-    _preferredAssignment = computePreferredPlacement(allNodes);
+    _preferredAssignment = computePreferredPlacement(sortedAllNodes);
 
     // logger.info("preferred mapping:"+ preferredAssignment);
     // from current mapping derive the ones in preferred location

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
index c7fef8d..5bd1e86 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
@@ -53,7 +53,7 @@ public class ClusterConfiguration extends HelixProperty {
   public static ClusterConfiguration from(UserConfig userConfig) {
     ClusterConfiguration clusterConfiguration =
         new ClusterConfiguration(Id.cluster(userConfig.getId()));
-    clusterConfiguration.addUserConfig(userConfig);
+    clusterConfiguration.addNamespacedConfig(userConfig);
     return clusterConfiguration;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 68672e3..189c239 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -40,6 +40,7 @@ import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.SessionId;
 import org.apache.helix.api.State;
 import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.StateModelFactoryId;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 
 import com.google.common.collect.ImmutableList;
@@ -574,6 +575,16 @@ public class Message extends HelixProperty {
     _record.setSimpleField(Attributes.STATE_MODEL_FACTORY_NAME.toString(), factoryName);
   }
 
+  /**
+   * Set the state model factory associated with this message
+   * @param factoryName the name of the factory
+   */
+  public void setStateModelFactoryId(StateModelFactoryId factoryId) {
+    if (factoryId != null) {
+      setStateModelFactoryName(factoryId.stringify());
+    }
+  }
+
   // TODO: remove this. impl in HelixProperty
   @Override
   public int getBucketSize() {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/model/PartitionConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/PartitionConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/PartitionConfiguration.java
index 44d6ac4..d5b9627 100644
--- a/helix-core/src/main/java/org/apache/helix/model/PartitionConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/PartitionConfiguration.java
@@ -53,7 +53,7 @@ public class PartitionConfiguration extends HelixProperty {
   public static PartitionConfiguration from(UserConfig userConfig) {
     PartitionConfiguration partitionConfiguration =
         new PartitionConfiguration(Id.partition(userConfig.getId()));
-    partitionConfiguration.addUserConfig(userConfig);
+    partitionConfiguration.addNamespacedConfig(userConfig);
     return partitionConfiguration;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
index 20c05a4..307ab0f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
@@ -1,10 +1,19 @@
 package org.apache.helix.model;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Id;
+import org.apache.helix.api.NamespacedConfig;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.ResourceId;
-import org.apache.helix.api.UserConfig;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.collect.Lists;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -29,6 +38,10 @@ import org.apache.helix.api.UserConfig;
  * Persisted configuration properties for a resource
  */
 public class ResourceConfiguration extends HelixProperty {
+  public enum Fields {
+    PARTITION_LIST
+  }
+
   /**
    * Instantiate for an id
    * @param id resource id
@@ -38,6 +51,14 @@ public class ResourceConfiguration extends HelixProperty {
   }
 
   /**
+   * Get the resource that is rebalanced
+   * @return resource id
+   */
+  public ResourceId getResourceId() {
+    return Id.resource(getId());
+  }
+
+  /**
    * Instantiate from a record
    * @param record configuration properties
    */
@@ -46,14 +67,49 @@ public class ResourceConfiguration extends HelixProperty {
   }
 
   /**
-   * Create a new ResourceConfiguration from a UserConfig
-   * @param userConfig user-defined configuration properties
+   * Set the partitions for this resource
+   * @param partitionIds list of partition ids
+   */
+  public void setPartitionIds(List<PartitionId> partitionIds) {
+    _record.setListField(Fields.PARTITION_LIST.toString(),
+        Lists.transform(partitionIds, Functions.toStringFunction()));
+  }
+
+  /**
+   * Get the partitions for this resource
+   * @return list of partition ids
+   */
+  public List<PartitionId> getPartitionIds() {
+    List<String> partitionNames = _record.getListField(Fields.PARTITION_LIST.toString());
+    if (partitionNames != null) {
+      return Lists.transform(partitionNames, new Function<String, PartitionId>() {
+        @Override
+        public PartitionId apply(String partitionName) {
+          return Id.partition(partitionName);
+        }
+      });
+    }
+    return null;
+  }
+
+  /**
+   * Add a rebalancer config to this resource
+   * @param config populated rebalancer config
+   */
+  public void addRebalancerConfig(RebalancerConfig config) {
+    addNamespacedConfig(config);
+    setPartitionIds(new ArrayList<PartitionId>(config.getPartitionSet()));
+  }
+
+  /**
+   * Create a new ResourceConfiguration from a NamespacedConfig
+   * @param namespacedConfig namespaced configuration properties
    * @return ResourceConfiguration
    */
-  public static ResourceConfiguration from(UserConfig userConfig) {
+  public static ResourceConfiguration from(NamespacedConfig namespacedConfig) {
     ResourceConfiguration resourceConfiguration =
-        new ResourceConfiguration(Id.resource(userConfig.getId()));
-    resourceConfiguration.addUserConfig(userConfig);
+        new ResourceConfiguration(Id.resource(namespacedConfig.getId()));
+    resourceConfiguration.addNamespacedConfig(namespacedConfig);
     return resourceConfiguration;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index 1defc9f..c407941 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -266,18 +266,10 @@ public class ClusterStateVerifier {
         }
       }
 
-      Map<String, StateModelDefinition> stateModelDefs =
-          accessor.getChildValuesMap(keyBuilder.stateModelDefs());
-      Map<StateModelDefId, StateModelDefinition> convertedDefs =
-          new HashMap<StateModelDefId, StateModelDefinition>();
-      for (String defName : stateModelDefs.keySet()) {
-        convertedDefs.put(Id.stateModelDef(defName), stateModelDefs.get(defName));
-      }
       ClusterAccessor clusterAccessor = new ClusterAccessor(Id.cluster(clusterName), accessor);
       Cluster cluster = clusterAccessor.readCluster();
       // calculate best possible state
-      NewBestPossibleStateOutput bestPossOutput =
-          ClusterStateVerifier.calcBestPossState(cluster, convertedDefs);
+      NewBestPossibleStateOutput bestPossOutput = ClusterStateVerifier.calcBestPossState(cluster);
 
       // set error states
       if (errStates != null) {
@@ -443,11 +435,9 @@ public class ClusterStateVerifier {
    * @throws Exception
    */
 
-  static NewBestPossibleStateOutput calcBestPossState(Cluster cluster,
-      Map<StateModelDefId, StateModelDefinition> convertedDefs) throws Exception {
+  static NewBestPossibleStateOutput calcBestPossState(Cluster cluster) throws Exception {
     ClusterEvent event = new ClusterEvent("sampleEvent");
     event.addAttribute("ClusterDataCache", cluster);
-    event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), convertedDefs);
 
     NewResourceComputationStage rcState = new NewResourceComputationStage();
     NewCurrentStateComputationStage csStage = new NewCurrentStateComputationStage();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java b/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java
new file mode 100644
index 0000000..380d99e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java
@@ -0,0 +1,129 @@
+package org.apache.helix.api;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
+import org.apache.helix.model.ResourceConfiguration;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A user config is a namespaced subset in the physical model and a separate entity in the logical
+ * model. These tests ensure that that separation is honored.
+ */
+public class TestNamespacedConfig {
+  /**
+   * Ensure that user configs are separated from helix configs in properties that hold both
+   */
+  @Test
+  public void testUserConfigUpdates() {
+    final String testKey = "testKey";
+    final String prefixedKey = UserConfig.class.getSimpleName() + "_testKey";
+    final String testSimpleValue = "testValue";
+    final List<String> testListValue = ImmutableList.of("testValue");
+    final Map<String, String> testMapValue = ImmutableMap.of("testInnerKey", "testValue");
+
+    // first, add Helix configuration to an InstanceConfig
+    ParticipantId participantId = Id.participant("testParticipant");
+    InstanceConfig instanceConfig = new InstanceConfig(participantId);
+    instanceConfig.setHostName("localhost");
+
+    // now, add user configuration
+    UserConfig userConfig = new UserConfig(participantId);
+    userConfig.setSimpleField(testKey, testSimpleValue);
+    userConfig.setListField(testKey, testListValue);
+    userConfig.setMapField(testKey, testMapValue);
+
+    // add the user configuration to the Helix configuration
+    instanceConfig.addNamespacedConfig(userConfig);
+
+    // get the user configuration back from the property
+    UserConfig retrievedConfig = UserConfig.from(instanceConfig);
+
+    // check that the property still has the host name
+    Assert.assertTrue(instanceConfig.getHostName().equals("localhost"));
+
+    // check that the retrieved config does not contain the host name
+    Assert.assertEquals(retrievedConfig.getStringField(
+        InstanceConfigProperty.HELIX_HOST.toString(), "not localhost"), "not localhost");
+
+    // check that both the retrieved config and the original config have the added properties
+    Assert.assertEquals(userConfig.getSimpleField(testKey), testSimpleValue);
+    Assert.assertEquals(userConfig.getListField(testKey), testListValue);
+    Assert.assertEquals(userConfig.getMapField(testKey), testMapValue);
+    Assert.assertEquals(retrievedConfig.getSimpleField(testKey), testSimpleValue);
+    Assert.assertEquals(retrievedConfig.getListField(testKey), testListValue);
+    Assert.assertEquals(retrievedConfig.getMapField(testKey), testMapValue);
+
+    // test that the property has the user config, but prefixed
+    Assert.assertEquals(instanceConfig.getRecord().getSimpleField(prefixedKey), testSimpleValue);
+    Assert.assertEquals(instanceConfig.getRecord().getListField(prefixedKey), testListValue);
+    Assert.assertEquals(instanceConfig.getRecord().getMapField(prefixedKey), testMapValue);
+  }
+
+  @Test
+  public void testConfiguredResource() {
+    // Set up the namespaced configs
+    String userKey = "userKey";
+    String userValue = "userValue";
+    ResourceId resourceId = Id.resource("testResource");
+    UserConfig userConfig = new UserConfig(resourceId);
+    userConfig.setSimpleField(userKey, userValue);
+    PartitionId partitionId = Id.partition(resourceId, "0");
+    Partition partition = new Partition(partitionId);
+    Map<ParticipantId, State> preferenceMap = new HashMap<ParticipantId, State>();
+    ParticipantId participantId = Id.participant("participant");
+    preferenceMap.put(participantId, State.from("ONLINE"));
+    CustomRebalancerConfig rebalancerConfig =
+        new CustomRebalancerConfig.Builder(resourceId).replicaCount(1).addPartition(partition)
+            .stateModelDef(Id.stateModelDef("OnlineOffline"))
+            .preferenceMap(partitionId, preferenceMap).build();
+
+    // copy in the configs
+    ResourceConfiguration config = new ResourceConfiguration(resourceId);
+    config.addNamespacedConfig(userConfig);
+    config.addRebalancerConfig(rebalancerConfig);
+
+    // recreate the configs and check the fields
+    UserConfig retrievedUserConfig = UserConfig.from(config);
+    Assert.assertEquals(retrievedUserConfig.getSimpleField(userKey), userValue);
+    Map<PartitionId, UserConfig> partitionConfigs = Collections.emptyMap();
+    RebalancerConfig retrievedRebalancerConfig = RebalancerConfig.from(config, partitionConfigs);
+    Assert.assertEquals(retrievedRebalancerConfig.getReplicaCount(),
+        rebalancerConfig.getReplicaCount());
+    Assert.assertEquals(retrievedRebalancerConfig.getStateModelDefId(),
+        rebalancerConfig.getStateModelDefId());
+    Assert.assertTrue(retrievedRebalancerConfig.getPartitionMap().containsKey(partitionId));
+    Assert.assertEquals(retrievedRebalancerConfig.getPartitionSet().size(), rebalancerConfig
+        .getPartitionSet().size());
+    CustomRebalancerConfig customConfig = CustomRebalancerConfig.from(retrievedRebalancerConfig);
+    Assert.assertEquals(customConfig.getPreferenceMap(partitionId).get(participantId),
+        State.from("ONLINE"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index 0561814..643875c 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -20,7 +20,6 @@ package org.apache.helix.api;
  */
 
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -28,14 +27,12 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.controller.rebalancer.NewAutoRebalancer;
-import org.apache.helix.controller.rebalancer.NewCustomRebalancer;
 import org.apache.helix.controller.rebalancer.NewSemiAutoRebalancer;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.NewBestPossibleStateOutput;
-import org.apache.helix.controller.stages.NewCurrentStateOutput;
+import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.mock.controller.ClusterController;
@@ -43,10 +40,8 @@ import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.apache.helix.tools.StateModelConfigGenerator;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -117,7 +112,7 @@ public class TestNewStages extends ZkUnitTestBase {
     ClusterAccessor clusterAccessor = new ClusterAccessor(_clusterId, _dataAccessor);
     Cluster cluster = clusterAccessor.readCluster();
     ClusterEvent event = new ClusterEvent(testName);
-    event.addAttribute(AttributeName.CURRENT_STATE.toString(), new NewCurrentStateOutput());
+    event.addAttribute(AttributeName.CURRENT_STATE.toString(), new ResourceCurrentState());
     Map<ResourceId, ResourceConfig> resourceConfigMap =
         Maps.transformValues(cluster.getResourceMap(), new Function<Resource, ResourceConfig>() {
           @Override
@@ -127,11 +122,6 @@ public class TestNewStages extends ZkUnitTestBase {
         });
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceConfigMap);
     event.addAttribute("ClusterDataCache", cluster);
-    Map<StateModelDefId, StateModelDefinition> stateModelMap =
-        new HashMap<StateModelDefId, StateModelDefinition>();
-    stateModelMap.put(Id.stateModelDef("MasterSlave"), new StateModelDefinition(
-        StateModelConfigGenerator.generateConfigForMasterSlave()));
-    event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelMap);
 
     // Run the stage
     try {
@@ -166,57 +156,25 @@ public class TestNewStages extends ZkUnitTestBase {
 
     ResourceId resourceId = new ResourceId("TestDB0");
     Resource resource = cluster.getResource(resourceId);
-    StateModelDefinition masterSlave =
-        new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
-    NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
-    ResourceAssignment fullAutoResult =
-        new NewAutoRebalancer().computeResourceMapping(resource.getConfig(), cluster, masterSlave,
-            currentStateOutput);
-    verifyFullAutoRebalance(resource, fullAutoResult);
+    ResourceCurrentState currentStateOutput = new ResourceCurrentState();
+    SemiAutoRebalancerConfig semiAutoConfig =
+        SemiAutoRebalancerConfig.from(resource.getRebalancerConfig());
     ResourceAssignment semiAutoResult =
-        new NewSemiAutoRebalancer().computeResourceMapping(resource.getConfig(), cluster,
-            masterSlave,
+        new NewSemiAutoRebalancer().computeResourceMapping(semiAutoConfig, cluster,
             currentStateOutput);
     verifySemiAutoRebalance(resource, semiAutoResult);
-    ResourceAssignment customResult =
-        new NewCustomRebalancer().computeResourceMapping(resource.getConfig(), cluster,
-            masterSlave,
-            currentStateOutput);
-    verifyCustomRebalance(resource, customResult);
 
     System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
   }
 
   /**
-   * Check that a full auto rebalance is run, and at least one replica per partition is mapped
-   * @param resource the resource to verify
-   * @param assignment the assignment to verify
-   */
-  private void verifyFullAutoRebalance(Resource resource, ResourceAssignment assignment) {
-    Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getPartitionSet().size());
-    for (PartitionId partitionId : assignment.getMappedPartitions()) {
-      Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
-      Assert.assertTrue(replicaMap.size() <= r);
-      Assert.assertTrue(replicaMap.size() > 0);
-      boolean hasMaster = false;
-      for (State state : replicaMap.values()) {
-        if (state.equals(State.from("MASTER"))) {
-          Assert.assertFalse(hasMaster);
-          hasMaster = true;
-        }
-      }
-      Assert.assertTrue(hasMaster);
-    }
-  }
-
-  /**
    * Check that a semi auto rebalance is run, and all partitions are mapped by preference list
    * @param resource the resource to verify
    * @param assignment the assignment to verify
    */
   private void verifySemiAutoRebalance(Resource resource, ResourceAssignment assignment) {
     Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getPartitionSet().size());
-    RebalancerConfig config = resource.getRebalancerConfig();
+    SemiAutoRebalancerConfig config = SemiAutoRebalancerConfig.from(resource.getRebalancerConfig());
     for (PartitionId partitionId : assignment.getMappedPartitions()) {
       List<ParticipantId> preferenceList = config.getPreferenceList(partitionId);
       Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
@@ -235,26 +193,6 @@ public class TestNewStages extends ZkUnitTestBase {
     }
   }
 
-  /**
-   * For vanilla customized rebalancing, the resource assignment should match the preference map
-   * @param resource the resource to verify
-   * @param assignment the assignment to verify
-   */
-  private void verifyCustomRebalance(Resource resource, ResourceAssignment assignment) {
-    Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getPartitionSet().size());
-    RebalancerConfig config = resource.getRebalancerConfig();
-    for (PartitionId partitionId : assignment.getMappedPartitions()) {
-      Map<ParticipantId, State> preferenceMap = config.getPreferenceMap(partitionId);
-      Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
-      Assert.assertEquals(replicaMap.size(), preferenceMap.size());
-      Assert.assertEquals(replicaMap.size(), r);
-      for (ParticipantId participant : preferenceMap.keySet()) {
-        Assert.assertTrue(replicaMap.containsKey(participant));
-        Assert.assertEquals(replicaMap.get(participant), preferenceMap.get(participant));
-      }
-    }
-  }
-
   @BeforeClass
   public void beforeClass() throws Exception {
     // set up a running class

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/test/java/org/apache/helix/api/TestUserConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestUserConfig.java b/helix-core/src/test/java/org/apache/helix/api/TestUserConfig.java
deleted file mode 100644
index 36cbf61..0000000
--- a/helix-core/src/test/java/org/apache/helix/api/TestUserConfig.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package org.apache.helix.api;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * A user config is a namespaced subset in the physical model and a separate entity in the logical
- * model. These tests ensure that that separation is honored.
- */
-public class TestUserConfig {
-  /**
-   * Ensure that user configs are separated from helix configs in properties that hold both
-   */
-  @Test
-  public void testUserConfigUpdates() {
-    final String testKey = "testKey";
-    final String prefixedKey = UserConfig.class.getSimpleName() + "_testKey";
-    final String testSimpleValue = "testValue";
-    final List<String> testListValue = ImmutableList.of("testValue");
-    final Map<String, String> testMapValue = ImmutableMap.of("testInnerKey", "testValue");
-
-    // first, add Helix configuration to an InstanceConfig
-    ParticipantId participantId = Id.participant("testParticipant");
-    InstanceConfig instanceConfig = new InstanceConfig(participantId);
-    instanceConfig.setHostName("localhost");
-
-    // now, add user configuration
-    UserConfig userConfig = new UserConfig(participantId);
-    userConfig.setSimpleField(testKey, testSimpleValue);
-    userConfig.setListField(testKey, testListValue);
-    userConfig.setMapField(testKey, testMapValue);
-
-    // add the user configuration to the Helix configuration
-    instanceConfig.addUserConfig(userConfig);
-
-    // get the user configuration back from the property
-    UserConfig retrievedConfig = UserConfig.from(instanceConfig);
-
-    // check that the property still has the host name
-    Assert.assertTrue(instanceConfig.getHostName().equals("localhost"));
-
-    // check that the retrieved config does not contain the host name
-    Assert.assertEquals(retrievedConfig.getStringField(
-        InstanceConfigProperty.HELIX_HOST.toString(), "not localhost"), "not localhost");
-
-    // check that both the retrieved config and the original config have the added properties
-    Assert.assertEquals(userConfig.getSimpleField(testKey), testSimpleValue);
-    Assert.assertEquals(userConfig.getListField(testKey), testListValue);
-    Assert.assertEquals(userConfig.getMapField(testKey), testMapValue);
-    Assert.assertEquals(retrievedConfig.getSimpleField(testKey), testSimpleValue);
-    Assert.assertEquals(retrievedConfig.getListField(testKey), testListValue);
-    Assert.assertEquals(retrievedConfig.getMapField(testKey), testMapValue);
-
-    // test that the property has the user config, but prefixed
-    Assert.assertEquals(instanceConfig.getRecord().getSimpleField(prefixedKey), testSimpleValue);
-    Assert.assertEquals(instanceConfig.getRecord().getListField(prefixedKey), testListValue);
-    Assert.assertEquals(instanceConfig.getRecord().getMapField(prefixedKey), testMapValue);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index f8494bd..b680844 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -34,10 +35,11 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Id;
 import org.apache.helix.api.Partition;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.api.Resource;
 import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.UserConfig;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.model.IdealState;
@@ -170,10 +172,11 @@ public class BaseStageTest {
       for (PartitionId partitionId : idealState.getPartitionSet()) {
         partitionMap.put(partitionId, new Partition(partitionId));
       }
-      RebalancerConfig rebalancerConfig = new RebalancerConfig(partitionMap, idealState, null, 0);
-      ResourceConfig resourceConfig =
-          new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalancerConfig).build();
-      resourceMap.put(resourceId, resourceConfig);
+      Map<PartitionId, UserConfig> partitionConfigMap = Collections.emptyMap();
+      Resource resource =
+          new Resource(resourceId, idealState, null, null, new UserConfig(resourceId),
+              partitionConfigMap);
+      resourceMap.put(resourceId, resource.getConfig());
     }
 
     return resourceMap;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
index 7dd2d45..8873a61 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
@@ -31,10 +31,8 @@ import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.State;
-import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.IdealStateModeProperty;
-import org.apache.helix.model.StateModelDefinition;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
@@ -56,13 +54,12 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
     List<IdealState> idealStates =
         setupIdealStateDeprecated(5, resources, 10, 1, IdealStateModeProperty.AUTO);
     setupLiveInstances(5);
-    Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
+    setupStateModel();
 
     Map<ResourceId, ResourceConfig> resourceMap = getResourceMap(idealStates);
-    NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
+    ResourceCurrentState currentStateOutput = new ResourceCurrentState();
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
-    event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefs);
 
     NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
     runStage(event, stage1);
@@ -95,13 +92,12 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
     List<IdealState> idealStates =
         setupIdealStateDeprecated(5, resources, 10, 1, IdealStateModeProperty.CUSTOMIZED);
     setupLiveInstances(5);
-    Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
+    setupStateModel();
 
     Map<ResourceId, ResourceConfig> resourceMap = getResourceMap(idealStates);
-    NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
+    ResourceCurrentState currentStateOutput = new ResourceCurrentState();
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
-    event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefs);
 
     NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
     runStage(event, stage1);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
index 234c441..9f7ea82 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
@@ -28,10 +28,8 @@ import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.State;
-import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.StateModelDefinition;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
@@ -47,13 +45,12 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest {
     };
     List<IdealState> idealStates = setupIdealState(5, resources, 10, 1, RebalanceMode.SEMI_AUTO);
     setupLiveInstances(5);
-    Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
+    setupStateModel();
 
     Map<ResourceId, ResourceConfig> resourceMap = getResourceMap(idealStates);
-    NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
+    ResourceCurrentState currentStateOutput = new ResourceCurrentState();
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
-    event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefs);
 
     NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
     runStage(event, stage1);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index 489537f..837eee7 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -48,7 +48,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
     NewCurrentStateComputationStage stage = new NewCurrentStateComputationStage();
     runStage(event, new NewReadClusterDataStage());
     runStage(event, stage);
-    NewCurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    ResourceCurrentState output = event.getAttribute(AttributeName.CURRENT_STATE.toString());
     AssertJUnit.assertEquals(
         output.getCurrentStateMap(Id.resource("testResourceName"),
             Id.partition("testResourceName_0")).size(), 0);
@@ -69,7 +69,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
     NewCurrentStateComputationStage stage = new NewCurrentStateComputationStage();
     runStage(event, new NewReadClusterDataStage());
     runStage(event, stage);
-    NewCurrentStateOutput output1 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    ResourceCurrentState output1 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
     AssertJUnit.assertEquals(
         output1.getCurrentStateMap(Id.resource("testResourceName"),
             Id.partition("testResourceName_0")).size(), 0);
@@ -88,7 +88,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
 
     runStage(event, new NewReadClusterDataStage());
     runStage(event, stage);
-    NewCurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    ResourceCurrentState output2 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
     State pendingState =
         output2.getPendingState(Id.resource("testResourceName"),
             Id.partition("testResourceName_1"), Id.participant("localhost_3"));
@@ -113,7 +113,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
         stateWithDeadSession);
     runStage(event, new NewReadClusterDataStage());
     runStage(event, stage);
-    NewCurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    ResourceCurrentState output3 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
     State currentState =
         output3.getCurrentState(Id.resource("testResourceName"),
             Id.partition("testResourceName_1"), Id.participant("localhost_3"));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index d8e7955..1b0a0b4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -30,12 +30,11 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.State;
-import org.apache.helix.controller.rebalancer.NewRebalancer;
+import org.apache.helix.api.UserDefinedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.NewUserDefinedRebalancer;
 import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.NewCurrentStateOutput;
+import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
@@ -55,20 +54,22 @@ public class TestCustomizedIdealStateRebalancer extends
   String db2 = TEST_DB + "2";
   static boolean testRebalancerInvoked = false;
 
-  public static class TestRebalancer implements NewRebalancer {
+  public static class TestRebalancer implements NewUserDefinedRebalancer {
 
     /**
      * Very basic mapping that evenly assigns one replica of each partition to live nodes, each of
      * which is in the highest-priority state.
      */
     @Override
-    public ResourceAssignment computeResourceMapping(ResourceConfig resourceConfig, Cluster cluster,
-        StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
+    public ResourceAssignment computeResourceMapping(UserDefinedRebalancerConfig config,
+        Cluster cluster, ResourceCurrentState currentState) {
+      StateModelDefinition stateModelDef =
+          cluster.getStateModelMap().get(config.getStateModelDefId());
       List<ParticipantId> liveParticipants =
           new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
-      ResourceAssignment resourceMapping = new ResourceAssignment(resourceConfig.getId());
+      ResourceAssignment resourceMapping = new ResourceAssignment(config.getResourceId());
       int i = 0;
-      for (PartitionId partitionId : resourceConfig.getPartitionSet()) {
+      for (PartitionId partitionId : config.getPartitionSet()) {
         int nodeIndex = i % liveParticipants.size();
         Map<ParticipantId, State> replicaMap = new HashMap<ParticipantId, State>();
         replicaMap.put(liveParticipants.get(nodeIndex), stateModelDef.getStatesPriorityList()

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0a8baa12/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
index 642a8a3..e08394c 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
@@ -8,10 +8,10 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.api.ClusterAccessor;
 import org.apache.helix.api.ClusterConfig;
 import org.apache.helix.api.ClusterId;
+import org.apache.helix.api.FullAutoRebalancerConfig;
 import org.apache.helix.api.Id;
 import org.apache.helix.api.ParticipantConfig;
 import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.State;
@@ -22,7 +22,6 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
@@ -90,9 +89,9 @@ public class NewModelExample {
 
   private static ResourceConfig getResource(StateModelDefinition stateModelDef) {
     ResourceId resourceId = Id.resource("exampleResource");
-    RebalancerConfig.Builder rebalanceConfigBuilder =
-        new RebalancerConfig.Builder(resourceId).rebalancerMode(RebalanceMode.FULL_AUTO)
-            .replicaCount(3).addPartitions(5).stateModelDef(stateModelDef.getStateModelDefId());
+    FullAutoRebalancerConfig.Builder rebalanceConfigBuilder =
+        new FullAutoRebalancerConfig.Builder(resourceId).replicaCount(3).addPartitions(5)
+            .stateModelDef(stateModelDef.getStateModelDefId());
     ResourceConfig.Builder resourceBuilder =
         new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalanceConfigBuilder.build());
     return resourceBuilder.build();