You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:53 UTC

[45/53] [abbrv] [HELIX-209] Shuffling around rebalancer code to allow for compatibility

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
deleted file mode 100644
index dfea7fc..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
+++ /dev/null
@@ -1,198 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.ClusterConstraints;
-import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
-import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.ClusterConstraints.ConstraintValue;
-import org.apache.helix.model.ConstraintItem;
-import org.apache.helix.model.Message;
-import org.apache.log4j.Logger;
-
-public class NewMessageThrottleStage extends AbstractBaseStage {
-  private static final Logger LOG = Logger.getLogger(NewMessageThrottleStage.class.getName());
-
-  int valueOf(String valueStr) {
-    int value = Integer.MAX_VALUE;
-
-    try {
-      ConstraintValue valueToken = ConstraintValue.valueOf(valueStr);
-      switch (valueToken) {
-      case ANY:
-        value = Integer.MAX_VALUE;
-        break;
-      default:
-        LOG.error("Invalid constraintValue token:" + valueStr + ". Use default value:"
-            + Integer.MAX_VALUE);
-        break;
-      }
-    } catch (Exception e) {
-      try {
-        value = Integer.parseInt(valueStr);
-      } catch (NumberFormatException ne) {
-        LOG.error("Invalid constraintValue string:" + valueStr + ". Use default value:"
-            + Integer.MAX_VALUE);
-      }
-    }
-    return value;
-  }
-
-  /**
-   * constraints are selected in the order of the following rules: 1) don't select
-   * constraints with CONSTRAINT_VALUE=ANY; 2) if one constraint is more specific than the
-   * other, select the most specific one 3) if a message matches multiple constraints of
-   * incomparable specificity, select the one with the minimum value 4) if a message
-   * matches multiple constraints of incomparable specificity, and they all have the same
-   * value, select the first in alphabetic order
-   */
-  Set<ConstraintItem> selectConstraints(Set<ConstraintItem> items,
-      Map<ConstraintAttribute, String> attributes) {
-    Map<String, ConstraintItem> selectedItems = new HashMap<String, ConstraintItem>();
-    for (ConstraintItem item : items) {
-      // don't select constraints with CONSTRAINT_VALUE=ANY
-      if (item.getConstraintValue().equals(ConstraintValue.ANY.toString())) {
-        continue;
-      }
-
-      String key = item.filter(attributes).toString();
-      if (!selectedItems.containsKey(key)) {
-        selectedItems.put(key, item);
-      } else {
-        ConstraintItem existingItem = selectedItems.get(key);
-        if (existingItem.match(item.getAttributes())) {
-          // item is more specific than existingItem
-          selectedItems.put(key, item);
-        } else if (!item.match(existingItem.getAttributes())) {
-          // existingItem and item are of incomparable specificity
-          int value = valueOf(item.getConstraintValue());
-          int existingValue = valueOf(existingItem.getConstraintValue());
-          if (value < existingValue) {
-            // item's constraint value is less than that of existingItem
-            selectedItems.put(key, item);
-          } else if (value == existingValue) {
-            if (item.toString().compareTo(existingItem.toString()) < 0) {
-              // item is ahead of existingItem in alphabetic order
-              selectedItems.put(key, item);
-            }
-          }
-        }
-      }
-    }
-    return new HashSet<ConstraintItem>(selectedItems.values());
-  }
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    Cluster cluster = event.getAttribute("ClusterDataCache");
-    NewMessageOutput msgSelectionOutput =
-        event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
-    Map<ResourceId, ResourceConfig> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
-
-    if (cluster == null || resourceMap == null || msgSelectionOutput == null) {
-      throw new StageException("Missing attributes in event: " + event
-          + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
-    }
-
-    NewMessageOutput output = new NewMessageOutput();
-
-    // TODO fix it
-    ClusterConstraints constraint = cluster.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
-    Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
-
-    if (constraint != null) {
-      // go through all pending messages, they should be counted but not throttled
-      for (ParticipantId participantId : cluster.getLiveParticipantMap().keySet()) {
-        Participant liveParticipant = cluster.getLiveParticipantMap().get(participantId);
-        throttle(throttleCounterMap, constraint, new ArrayList<Message>(liveParticipant
-            .getMessageMap().values()), false);
-      }
-    }
-
-    // go through all new messages, throttle if necessary
-    // assume messages should be sorted by state transition priority in messageSelection stage
-    for (ResourceId resourceId : resourceMap.keySet()) {
-      ResourceConfig resource = resourceMap.get(resourceId);
-      // TODO fix it
-      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
-        List<Message> messages = msgSelectionOutput.getMessages(resourceId, partitionId);
-        if (constraint != null && messages != null && messages.size() > 0) {
-          messages = throttle(throttleCounterMap, constraint, messages, true);
-        }
-        output.setMessages(resourceId, partitionId, messages);
-      }
-    }
-
-    event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output);
-  }
-
-  private List<Message> throttle(Map<String, Integer> throttleMap, ClusterConstraints constraint,
-      List<Message> messages, final boolean needThrottle) {
-
-    List<Message> throttleOutputMsgs = new ArrayList<Message>();
-    for (Message message : messages) {
-      Map<ConstraintAttribute, String> msgAttr = ClusterConstraints.toConstraintAttributes(message);
-
-      Set<ConstraintItem> matches = constraint.match(msgAttr);
-      matches = selectConstraints(matches, msgAttr);
-
-      boolean msgThrottled = false;
-      for (ConstraintItem item : matches) {
-        String key = item.filter(msgAttr).toString();
-        if (!throttleMap.containsKey(key)) {
-          throttleMap.put(key, valueOf(item.getConstraintValue()));
-        }
-        int value = throttleMap.get(key);
-        throttleMap.put(key, --value);
-
-        if (needThrottle && value < 0) {
-          msgThrottled = true;
-
-          if (LOG.isDebugEnabled()) {
-            // TODO: printout constraint item that throttles the message
-            LOG.debug("message: " + message + " is throttled by constraint: " + item);
-          }
-        }
-      }
-
-      if (!msgThrottled) {
-        throttleOutputMsgs.add(message);
-      }
-    }
-
-    return throttleOutputMsgs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
deleted file mode 100644
index 26050f8..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.accessor.ClusterAccessor;
-import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
-import org.apache.log4j.Logger;
-
-public class NewReadClusterDataStage extends AbstractBaseStage {
-  private static final Logger LOG = Logger.getLogger(NewReadClusterDataStage.class.getName());
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    long startTime = System.currentTimeMillis();
-    LOG.info("START ReadClusterDataStage.process()");
-
-    HelixManager manager = event.getAttribute("helixmanager");
-    if (manager == null) {
-      throw new StageException("HelixManager attribute value is null");
-    }
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    ClusterId clusterId = ClusterId.from(manager.getClusterName());
-    ClusterAccessor clusterAccessor = new ClusterAccessor(clusterId, accessor);
-
-    Cluster cluster = clusterAccessor.readCluster();
-
-    ClusterStatusMonitor clusterStatusMonitor =
-        (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
-    if (clusterStatusMonitor != null) {
-      // TODO fix it
-      // int disabledInstances = 0;
-      // int disabledPartitions = 0;
-      // for (InstanceConfig config : _cache._instanceConfigMap.values()) {
-      // if (config.getInstanceEnabled() == false) {
-      // disabledInstances++;
-      // }
-      // if (config.getDisabledPartitions() != null) {
-      // disabledPartitions += config.getDisabledPartitions().size();
-      // }
-      // }
-      // clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(),
-      // _cache._instanceConfigMap.size(), disabledInstances, disabledPartitions);
-    }
-
-    event.addAttribute("ClusterDataCache", cluster);
-
-    long endTime = System.currentTimeMillis();
-    LOG.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
deleted file mode 100644
index b531bd7..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ /dev/null
@@ -1,138 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.model.CurrentState;
-import org.apache.log4j.Logger;
-
-/**
- * This stage computes all the resources in a cluster. The resources are
- * computed from IdealStates -> this gives all the resources currently active
- * CurrentState for liveInstance-> Helps in finding resources that are inactive
- * and needs to be dropped
- */
-public class NewResourceComputationStage extends AbstractBaseStage {
-  private static Logger LOG = Logger.getLogger(NewResourceComputationStage.class);
-
-  @Override
-  public void process(ClusterEvent event) throws StageException {
-    Cluster cluster = event.getAttribute("ClusterDataCache");
-    if (cluster == null) {
-      throw new StageException("Missing attributes in event: " + event + ". Requires Cluster");
-    }
-
-    Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
-    Map<ResourceId, ResourceConfig> csResCfgMap = getCurStateResourceCfgMap(cluster);
-
-    // ideal-state may be removed, add all resource config in current-state but not in ideal-state
-    for (ResourceId resourceId : csResCfgMap.keySet()) {
-      if (!cluster.getResourceMap().keySet().contains(resourceId)) {
-        resCfgMap.put(resourceId, csResCfgMap.get(resourceId));
-      }
-    }
-
-    for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
-      Resource resource = cluster.getResource(resourceId);
-      RebalancerConfig rebalancerCfg = resource.getRebalancerConfig();
-
-      ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
-      resCfgBuilder.bucketSize(resource.getBucketSize());
-      resCfgBuilder.batchMessageMode(resource.getBatchMessageMode());
-      resCfgBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
-      resCfgBuilder.rebalancerContext(rebalancerCfg.getRebalancerContext(RebalancerContext.class));
-      resCfgMap.put(resourceId, resCfgBuilder.build());
-    }
-
-    event.addAttribute(AttributeName.RESOURCES.toString(), resCfgMap);
-  }
-
-  /**
-   * Get resource config's from current-state
-   * @param cluster
-   * @return resource config map or empty map if not available
-   * @throws StageException
-   */
-  Map<ResourceId, ResourceConfig> getCurStateResourceCfgMap(Cluster cluster) throws StageException {
-    Map<ResourceId, ResourceConfig.Builder> resCfgBuilderMap =
-        new HashMap<ResourceId, ResourceConfig.Builder>();
-
-    Map<ResourceId, PartitionedRebalancerContext.Builder> rebCtxBuilderMap =
-        new HashMap<ResourceId, PartitionedRebalancerContext.Builder>();
-
-    for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
-      for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
-        CurrentState currentState = liveParticipant.getCurrentStateMap().get(resourceId);
-
-        if (currentState.getStateModelDefRef() == null) {
-          LOG.error("state model def is null." + "resource:" + currentState.getResourceId()
-              + ", partitions: " + currentState.getPartitionStateMap().keySet()
-              + ", states: " + currentState.getPartitionStateMap().values());
-          throw new StageException("State model def is null for resource:"
-              + currentState.getResourceId());
-        }
-
-        if (!resCfgBuilderMap.containsKey(resourceId)) {
-          PartitionedRebalancerContext.Builder rebCtxBuilder =
-              new PartitionedRebalancerContext.Builder(resourceId);
-          rebCtxBuilder.stateModelDefId(currentState.getStateModelDefId());
-          rebCtxBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
-              .getStateModelFactoryName()));
-          rebCtxBuilderMap.put(resourceId, rebCtxBuilder);
-
-          ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
-          resCfgBuilder.bucketSize(currentState.getBucketSize());
-          resCfgBuilder.batchMessageMode(currentState.getBatchMessageMode());
-          resCfgBuilderMap.put(resourceId, resCfgBuilder);
-        }
-
-        PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
-        for (PartitionId partitionId : currentState.getTypedPartitionStateMap().keySet()) {
-          rebCtxBuilder.addPartition(new Partition(partitionId));
-        }
-      }
-    }
-
-    Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
-    for (ResourceId resourceId : resCfgBuilderMap.keySet()) {
-      ResourceConfig.Builder resCfgBuilder = resCfgBuilderMap.get(resourceId);
-      PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
-      resCfgBuilder.rebalancerContext(rebCtxBuilder.build());
-      resCfgMap.put(resourceId, resCfgBuilder.build());
-    }
-
-    return resCfgMap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
deleted file mode 100644
index 51c9284..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
+++ /dev/null
@@ -1,151 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerProperties;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.Message;
-import org.apache.log4j.Logger;
-
-public class NewTaskAssignmentStage extends AbstractBaseStage {
-  private static Logger logger = Logger.getLogger(NewTaskAssignmentStage.class);
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    long startTime = System.currentTimeMillis();
-    logger.info("START TaskAssignmentStage.process()");
-
-    HelixManager manager = event.getAttribute("helixmanager");
-    Map<ResourceId, ResourceConfig> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
-    NewMessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
-    Cluster cluster = event.getAttribute("ClusterDataCache");
-    Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
-
-    if (manager == null || resourceMap == null || messageOutput == null || cluster == null
-        || liveParticipantMap == null) {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache|liveInstanceMap");
-    }
-
-    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
-    List<Message> messagesToSend = new ArrayList<Message>();
-    for (ResourceId resourceId : resourceMap.keySet()) {
-      ResourceConfig resource = resourceMap.get(resourceId);
-      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
-        List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
-        messagesToSend.addAll(messages);
-      }
-    }
-
-    List<Message> outputMessages =
-        batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveParticipantMap,
-            manager.getProperties());
-    sendMessages(dataAccessor, outputMessages);
-
-    long endTime = System.currentTimeMillis();
-    logger.info("END TaskAssignmentStage.process(). took: " + (endTime - startTime) + " ms");
-
-  }
-
-  List<Message> batchMessage(Builder keyBuilder, List<Message> messages,
-      Map<ResourceId, ResourceConfig> resourceMap,
-      Map<ParticipantId, Participant> liveParticipantMap, HelixManagerProperties properties) {
-    // group messages by its CurrentState path + "/" + fromState + "/" + toState
-    Map<String, Message> batchMessages = new HashMap<String, Message>();
-    List<Message> outputMessages = new ArrayList<Message>();
-
-    Iterator<Message> iter = messages.iterator();
-    while (iter.hasNext()) {
-      Message message = iter.next();
-      ResourceId resourceId = message.getResourceId();
-      ResourceConfig resource = resourceMap.get(resourceId);
-
-      ParticipantId participantId = ParticipantId.from(message.getTgtName());
-      Participant liveParticipant = liveParticipantMap.get(participantId);
-      String participantVersion = null;
-      if (liveParticipant != null) {
-        participantVersion = liveParticipant.getRunningInstance().getVersion().toString();
-      }
-
-      if (resource == null || !resource.getBatchMessageMode() || participantVersion == null
-          || !properties.isFeatureSupported("batch_message", participantVersion)) {
-        outputMessages.add(message);
-        continue;
-      }
-
-      String key =
-          keyBuilder.currentState(message.getTgtName(), message.getTypedTgtSessionId().stringify(),
-              message.getResourceId().stringify()).getPath()
-              + "/" + message.getTypedFromState() + "/" + message.getTypedToState();
-
-      if (!batchMessages.containsKey(key)) {
-        Message batchMessage = new Message(message.getRecord());
-        batchMessage.setBatchMessageMode(true);
-        outputMessages.add(batchMessage);
-        batchMessages.put(key, batchMessage);
-      }
-      batchMessages.get(key).addPartitionName(message.getPartitionId().stringify());
-    }
-
-    return outputMessages;
-  }
-
-  protected void sendMessages(HelixDataAccessor dataAccessor, List<Message> messages) {
-    if (messages == null || messages.isEmpty()) {
-      return;
-    }
-
-    Builder keyBuilder = dataAccessor.keyBuilder();
-
-    List<PropertyKey> keys = new ArrayList<PropertyKey>();
-    for (Message message : messages) {
-      logger.info("Sending Message " + message.getMessageId() + " to " + message.getTgtName()
-          + " transit " + message.getPartitionId() + "|" + message.getPartitionIds() + " from:"
-          + message.getTypedFromState() + " to:" + message.getTypedToState());
-
-      // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to "
-      // + message.getTgtName() + " transit " + message.getPartitionId() + "|"
-      // + message.getPartitionIds() + " from: " + message.getFromState() + " to: "
-      // + message.getToState());
-
-      keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
-    }
-
-    dataAccessor.createChildren(keys, new ArrayList<Message>(messages));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index be0b7f0..31dbb08 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -35,7 +35,7 @@ public class PersistAssignmentStage extends AbstractBaseStage {
     HelixManager helixManager = event.getAttribute("helixmanager");
     HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
     ResourceAccessor resourceAccessor = new ResourceAccessor(accessor);
-    NewBestPossibleStateOutput assignments =
+    BestPossibleStateOutput assignments =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     for (ResourceId resourceId : assignments.getAssignedResources()) {
       ResourceAssignment assignment = assignments.getResourceAssignment(resourceId);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index ce81f1f..44fddb6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -21,53 +21,53 @@ package org.apache.helix.controller.stages;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
-@Deprecated
 public class ReadClusterDataStage extends AbstractBaseStage {
-  private static final Logger logger = Logger.getLogger(ReadClusterDataStage.class.getName());
-  ClusterDataCache _cache;
-
-  public ReadClusterDataStage() {
-    _cache = new ClusterDataCache();
-  }
+  private static final Logger LOG = Logger.getLogger(ReadClusterDataStage.class.getName());
 
   @Override
   public void process(ClusterEvent event) throws Exception {
     long startTime = System.currentTimeMillis();
-    logger.info("START ReadClusterDataStage.process()");
+    LOG.info("START ReadClusterDataStage.process()");
 
     HelixManager manager = event.getAttribute("helixmanager");
     if (manager == null) {
       throw new StageException("HelixManager attribute value is null");
     }
-    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
-    _cache.refresh(dataAccessor);
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    ClusterId clusterId = ClusterId.from(manager.getClusterName());
+    ClusterAccessor clusterAccessor = new ClusterAccessor(clusterId, accessor);
+
+    Cluster cluster = clusterAccessor.readCluster();
 
     ClusterStatusMonitor clusterStatusMonitor =
         (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
     if (clusterStatusMonitor != null) {
-      int disabledInstances = 0;
-      int disabledPartitions = 0;
-      for (InstanceConfig config : _cache._instanceConfigMap.values()) {
-        if (config.getInstanceEnabled() == false) {
-          disabledInstances++;
-        }
-        if (config.getDisabledPartitions() != null) {
-          disabledPartitions += config.getDisabledPartitions().size();
-        }
-      }
-      clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(),
-          _cache._instanceConfigMap.size(), disabledInstances, disabledPartitions);
+      // TODO fix it
+      // int disabledInstances = 0;
+      // int disabledPartitions = 0;
+      // for (InstanceConfig config : _cache._instanceConfigMap.values()) {
+      // if (config.getInstanceEnabled() == false) {
+      // disabledInstances++;
+      // }
+      // if (config.getDisabledPartitions() != null) {
+      // disabledPartitions += config.getDisabledPartitions().size();
+      // }
+      // }
+      // clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(),
+      // _cache._instanceConfigMap.size(), disabledInstances, disabledPartitions);
     }
 
-    event.addAttribute("ClusterDataCache", _cache);
+    event.addAttribute("ClusterDataCache", cluster);
 
     long endTime = System.currentTimeMillis();
-    logger.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
+    LOG.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
index ae873c7..859c1d0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
@@ -23,10 +23,8 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.log4j.Logger;
 
 public class ReadHealthDataStage extends AbstractBaseStage {
-  private static final Logger LOG = Logger.getLogger(ReadHealthDataStage.class.getName());
   HealthDataCache _cache;
 
   public ReadHealthDataStage() {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
deleted file mode 100644
index 949cfca..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.util.HelixUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Check and invoke custom implementation idealstate rebalancers.<br/>
- * If the resourceConfig has specified className of the customized rebalancer, <br/>
- * the rebalancer will be invoked to re-write the idealstate of the resource<br/>
- */
-@Deprecated
-public class RebalanceIdealStateStage extends AbstractBaseStage {
-  private static final Logger LOG = Logger.getLogger(RebalanceIdealStateStage.class.getName());
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    HelixManager manager = event.getAttribute("helixmanager");
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    Map<String, IdealState> idealStateMap = cache.getIdealStates();
-    CurrentStateOutput currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
-
-    Map<String, IdealState> updatedIdealStates = new HashMap<String, IdealState>();
-    for (String resourceName : idealStateMap.keySet()) {
-      IdealState currentIdealState = idealStateMap.get(resourceName);
-      if (currentIdealState.getRebalanceMode() == RebalanceMode.USER_DEFINED
-          && currentIdealState.getRebalancerRef() != null) {
-        String rebalancerClassName = currentIdealState.getRebalancerRef().toString();
-        LOG.info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
-        try {
-          Rebalancer balancer =
-              (Rebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
-          balancer.init(manager);
-          Resource resource = new Resource(resourceName);
-          for (String partitionName : currentIdealState.getPartitionSet()) {
-            resource.addPartition(partitionName);
-          }
-          ResourceAssignment resourceAssignment =
-              balancer.computeResourceMapping(resource, currentIdealState, currentStateOutput,
-                  cache);
-          StateModelDefinition stateModelDef =
-              cache.getStateModelDef(currentIdealState.getStateModelDefRef());
-          currentIdealState.updateFromAssignment(resourceAssignment, stateModelDef);
-          updatedIdealStates.put(resourceName, currentIdealState);
-        } catch (Exception e) {
-          LOG.error("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
-        }
-      }
-    }
-    if (updatedIdealStates.size() > 0) {
-      cache.getIdealStates().putAll(updatedIdealStates);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index da38ee2..1fdd892 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -19,16 +19,23 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-import java.util.LinkedHashMap;
+import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelFactoryId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Resource;
 import org.apache.log4j.Logger;
 
 /**
@@ -37,102 +44,95 @@ import org.apache.log4j.Logger;
  * CurrentState for liveInstance-> Helps in finding resources that are inactive
  * and needs to be dropped
  */
-@Deprecated
 public class ResourceComputationStage extends AbstractBaseStage {
   private static Logger LOG = Logger.getLogger(ResourceComputationStage.class);
 
   @Override
-  public void process(ClusterEvent event) throws Exception {
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    if (cache == null) {
-      throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
+  public void process(ClusterEvent event) throws StageException {
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    if (cluster == null) {
+      throw new StageException("Missing attributes in event: " + event + ". Requires Cluster");
     }
 
-    Map<String, IdealState> idealStates = cache.getIdealStates();
+    Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
+    Map<ResourceId, ResourceConfig> csResCfgMap = getCurStateResourceCfgMap(cluster);
 
-    Map<String, Resource> resourceMap = new LinkedHashMap<String, Resource>();
+    // ideal-state may be removed, add all resource config in current-state but not in ideal-state
+    for (ResourceId resourceId : csResCfgMap.keySet()) {
+      if (!cluster.getResourceMap().keySet().contains(resourceId)) {
+        resCfgMap.put(resourceId, csResCfgMap.get(resourceId));
+      }
+    }
 
-    if (idealStates != null && idealStates.size() > 0) {
-      for (IdealState idealState : idealStates.values()) {
-        Set<String> partitionSet = idealState.getPartitionSet();
-        String resourceName = idealState.getResourceName();
+    for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
+      Resource resource = cluster.getResource(resourceId);
+      RebalancerConfig rebalancerCfg = resource.getRebalancerConfig();
 
-        for (String partition : partitionSet) {
-          addPartition(partition, resourceName, resourceMap);
-          Resource resource = resourceMap.get(resourceName);
-          resource.setStateModelDefRef(idealState.getStateModelDefRef());
-          resource.setStateModelFactoryName(idealState.getStateModelFactoryName());
-          resource.setBucketSize(idealState.getBucketSize());
-          resource.setBatchMessageMode(idealState.getBatchMessageMode());
-        }
-      }
+      ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
+      resCfgBuilder.bucketSize(resource.getBucketSize());
+      resCfgBuilder.batchMessageMode(resource.getBatchMessageMode());
+      resCfgBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
+      resCfgBuilder.rebalancerContext(rebalancerCfg.getRebalancerContext(RebalancerContext.class));
+      resCfgMap.put(resourceId, resCfgBuilder.build());
     }
 
-    // It's important to get partitions from CurrentState as well since the
-    // idealState might be removed.
-    Map<String, LiveInstance> availableInstances = cache.getLiveInstances();
+    event.addAttribute(AttributeName.RESOURCES.toString(), resCfgMap);
+  }
 
-    if (availableInstances != null && availableInstances.size() > 0) {
-      for (LiveInstance instance : availableInstances.values()) {
-        String instanceName = instance.getInstanceName();
-        String clientSessionId = instance.getTypedSessionId().stringify();
+  /**
+   * Get resource config's from current-state
+   * @param cluster
+   * @return resource config map or empty map if not available
+   * @throws StageException
+   */
+  Map<ResourceId, ResourceConfig> getCurStateResourceCfgMap(Cluster cluster) throws StageException {
+    Map<ResourceId, ResourceConfig.Builder> resCfgBuilderMap =
+        new HashMap<ResourceId, ResourceConfig.Builder>();
+
+    Map<ResourceId, PartitionedRebalancerContext.Builder> rebCtxBuilderMap =
+        new HashMap<ResourceId, PartitionedRebalancerContext.Builder>();
+
+    for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
+      for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
+        CurrentState currentState = liveParticipant.getCurrentStateMap().get(resourceId);
+
+        if (currentState.getStateModelDefRef() == null) {
+          LOG.error("state model def is null." + "resource:" + currentState.getResourceId()
+              + ", partitions: " + currentState.getPartitionStateMap().keySet()
+              + ", states: " + currentState.getPartitionStateMap().values());
+          throw new StageException("State model def is null for resource:"
+              + currentState.getResourceId());
+        }
 
-        Map<String, CurrentState> currentStateMap =
-            cache.getCurrentState(instanceName, clientSessionId);
-        if (currentStateMap == null || currentStateMap.size() == 0) {
-          continue;
+        if (!resCfgBuilderMap.containsKey(resourceId)) {
+          PartitionedRebalancerContext.Builder rebCtxBuilder =
+              new PartitionedRebalancerContext.Builder(resourceId);
+          rebCtxBuilder.stateModelDefId(currentState.getStateModelDefId());
+          rebCtxBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
+              .getStateModelFactoryName()));
+          rebCtxBuilderMap.put(resourceId, rebCtxBuilder);
+
+          ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
+          resCfgBuilder.bucketSize(currentState.getBucketSize());
+          resCfgBuilder.batchMessageMode(currentState.getBatchMessageMode());
+          resCfgBuilderMap.put(resourceId, resCfgBuilder);
         }
-        for (CurrentState currentState : currentStateMap.values()) {
-
-          String resourceName = currentState.getResourceName();
-          Map<String, String> resourceStateMap = currentState.getPartitionStateMap();
-
-          // don't overwrite ideal state settings
-          if (!resourceMap.containsKey(resourceName)) {
-            addResource(resourceName, resourceMap);
-            Resource resource = resourceMap.get(resourceName);
-            resource.setStateModelDefRef(currentState.getStateModelDefRef());
-            resource.setStateModelFactoryName(currentState.getStateModelFactoryName());
-            resource.setBucketSize(currentState.getBucketSize());
-            resource.setBatchMessageMode(currentState.getBatchMessageMode());
-          }
-
-          if (currentState.getStateModelDefRef() == null) {
-            LOG.error("state model def is null." + "resource:" + currentState.getResourceName()
-                + ", partitions: " + currentState.getPartitionStateMap().keySet()
-                + ", states: " + currentState.getPartitionStateMap().values());
-            throw new StageException("State model def is null for resource:"
-                + currentState.getResourceName());
-          }
-
-          for (String partition : resourceStateMap.keySet()) {
-            addPartition(partition, resourceName, resourceMap);
-          }
+
+        PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
+        for (PartitionId partitionId : currentState.getTypedPartitionStateMap().keySet()) {
+          rebCtxBuilder.addPartition(new Partition(partitionId));
         }
       }
     }
 
-    event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
-  }
-
-  private void addResource(String resource, Map<String, Resource> resourceMap) {
-    if (resource == null || resourceMap == null) {
-      return;
-    }
-    if (!resourceMap.containsKey(resource)) {
-      resourceMap.put(resource, new Resource(resource));
-    }
-  }
-
-  private void addPartition(String partition, String resourceName, Map<String, Resource> resourceMap) {
-    if (resourceName == null || partition == null || resourceMap == null) {
-      return;
-    }
-    if (!resourceMap.containsKey(resourceName)) {
-      resourceMap.put(resourceName, new Resource(resourceName));
+    Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
+    for (ResourceId resourceId : resCfgBuilderMap.keySet()) {
+      ResourceConfig.Builder resCfgBuilder = resCfgBuilderMap.get(resourceId);
+      PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
+      resCfgBuilder.rebalancerContext(rebCtxBuilder.build());
+      resCfgMap.put(resourceId, resCfgBuilder.build());
     }
-    Resource resource = resourceMap.get(resourceName);
-    resource.addPartition(partition);
 
+    return resCfgMap;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index c942db9..02188be 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -30,16 +30,17 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerProperties;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
 import org.apache.log4j.Logger;
 
-@Deprecated
 public class TaskAssignmentStage extends AbstractBaseStage {
   private static Logger logger = Logger.getLogger(TaskAssignmentStage.class);
 
@@ -49,30 +50,30 @@ public class TaskAssignmentStage extends AbstractBaseStage {
     logger.info("START TaskAssignmentStage.process()");
 
     HelixManager manager = event.getAttribute("helixmanager");
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-    MessageThrottleStageOutput messageOutput =
-        event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
-
-    if (manager == null || resourceMap == null || messageOutput == null || cache == null
-        || liveInstanceMap == null) {
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
+    MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
+
+    if (manager == null || resourceMap == null || messageOutput == null || cluster == null
+        || liveParticipantMap == null) {
       throw new StageException("Missing attributes in event:" + event
           + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache|liveInstanceMap");
     }
 
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
     List<Message> messagesToSend = new ArrayList<Message>();
-    for (String resourceName : resourceMap.keySet()) {
-      Resource resource = resourceMap.get(resourceName);
-      for (Partition partition : resource.getPartitions()) {
-        List<Message> messages = messageOutput.getMessages(resourceName, partition);
+    for (ResourceId resourceId : resourceMap.keySet()) {
+      ResourceConfig resource = resourceMap.get(resourceId);
+      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
+        List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
         messagesToSend.addAll(messages);
       }
     }
 
     List<Message> outputMessages =
-        batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveInstanceMap,
+        batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveParticipantMap,
             manager.getProperties());
     sendMessages(dataAccessor, outputMessages);
 
@@ -82,8 +83,8 @@ public class TaskAssignmentStage extends AbstractBaseStage {
   }
 
   List<Message> batchMessage(Builder keyBuilder, List<Message> messages,
-      Map<String, Resource> resourceMap, Map<String, LiveInstance> liveInstanceMap,
-      HelixManagerProperties properties) {
+      Map<ResourceId, ResourceConfig> resourceMap,
+      Map<ParticipantId, Participant> liveParticipantMap, HelixManagerProperties properties) {
     // group messages by its CurrentState path + "/" + fromState + "/" + toState
     Map<String, Message> batchMessages = new HashMap<String, Message>();
     List<Message> outputMessages = new ArrayList<Message>();
@@ -92,13 +93,13 @@ public class TaskAssignmentStage extends AbstractBaseStage {
     while (iter.hasNext()) {
       Message message = iter.next();
       ResourceId resourceId = message.getResourceId();
-      Resource resource = resourceMap.get(resourceId.stringify());
+      ResourceConfig resource = resourceMap.get(resourceId);
 
-      String instanceName = message.getTgtName();
-      LiveInstance liveInstance = liveInstanceMap.get(instanceName);
+      ParticipantId participantId = ParticipantId.from(message.getTgtName());
+      Participant liveParticipant = liveParticipantMap.get(participantId);
       String participantVersion = null;
-      if (liveInstance != null) {
-        participantVersion = liveInstance.getTypedHelixVersion().toString();
+      if (liveParticipant != null) {
+        participantVersion = liveParticipant.getRunningInstance().getVersion().toString();
       }
 
       if (resource == null || !resource.getBatchMessageMode() || participantVersion == null
@@ -137,10 +138,10 @@ public class TaskAssignmentStage extends AbstractBaseStage {
           + " transit " + message.getPartitionId() + "|" + message.getPartitionIds() + " from:"
           + message.getTypedFromState() + " to:" + message.getTypedToState());
 
-      // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to " +
-      // message.getTgtName()
-      // + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
-      // + " from: " + message.getFromState() + " to: " + message.getToState());
+      // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to "
+      // + message.getTgtName() + " transit " + message.getPartitionId() + "|"
+      // + message.getPartitionIds() + " from: " + message.getFromState() + " to: "
+      // + message.getToState());
 
       keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 7d84258..09aed50 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -38,7 +38,8 @@ import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.rebalancer.context.RebalancerRef;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Function;
@@ -169,7 +170,7 @@ public class IdealState extends HelixProperty {
   }
 
   /**
-   * Define a custom rebalancer that implements {@link Rebalancer}
+   * Define a custom rebalancer that implements {@link HelixRebalancer}
    * @param rebalancerClassName the name of the custom rebalancing class
    */
   public void setRebalancerClassName(String rebalancerClassName) {
@@ -310,6 +311,15 @@ public class IdealState extends HelixProperty {
 
   /**
    * Set the current mapping of a partition
+   * @param partitionName the partition to set
+   * @param instanceStateMap (participant name, state) pairs
+   */
+  public void setInstanceStateMap(String partitionName, Map<String, String> instanceStateMap) {
+    _record.setMapField(partitionName, instanceStateMap);
+  }
+
+  /**
+   * Set the current mapping of a partition
    * @param partitionId the partition to set
    * @param participantStateMap (participant id, state) pairs
    */
@@ -385,6 +395,15 @@ public class IdealState extends HelixProperty {
 
   /**
    * Set the preference list of a partition
+   * @param partitionName the name of the partition to set
+   * @param preferenceList a list of participants that can serve replicas of the partition
+   */
+  public void setPreferenceList(String partitionName, List<String> preferenceList) {
+    _record.setListField(partitionName, preferenceList);
+  }
+
+  /**
+   * Set the preference list of a partition
    * @param partitionId the id of the partition to set
    * @param preferenceList a list of participants that can serve replicas of the partition
    */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 2bd313a..d465a80 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -910,7 +910,7 @@ public class Message extends HelixProperty {
    * Get controller message id, used for scheduler-task-queue state model only
    * @return controller message id
    */
-  public String getControllerMessagId() {
+  public String getControllerMessageId() {
     return _record.getSimpleField(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
index 96d0ca7..8672e7e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -33,7 +33,6 @@ import org.apache.helix.api.id.ResourceId;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
@@ -48,10 +47,9 @@ import com.google.common.collect.Maps;
  * can be in s1.
  */
 public class ResourceAssignment extends HelixProperty {
-
   /**
    * Initialize an empty mapping
-   * @param resourceName the resource being mapped
+   * @param resourceId the resource being mapped
    */
   public ResourceAssignment(ResourceId resourceId) {
     super(resourceId.stringify());
@@ -86,14 +84,6 @@ public class ResourceAssignment extends HelixProperty {
   }
 
   /**
-   * Get the currently mapped partitions
-   * @return list of Partition objects (immutable)
-   */
-  public List<String> getMappedPartitions() {
-    return Lists.newArrayList(_record.getMapFields().keySet());
-  }
-
-  /**
    * Get the entire map of a resource
    * @return map of partition to participant to state
    */
@@ -121,19 +111,6 @@ public class ResourceAssignment extends HelixProperty {
   }
 
   /**
-   * Get the participant, state pairs for a partition
-   * @param partition the Partition to look up
-   * @return map of (participant id, state)
-   */
-  public Map<String, String> getReplicaMap(String partitionId) {
-    Map<String, String> rawReplicaMap = _record.getMapField(partitionId);
-    if (rawReplicaMap == null) {
-      return Collections.emptyMap();
-    }
-    return rawReplicaMap;
-  }
-
-  /**
    * Add participant, state pairs for a partition
    * @param partitionId the partition to set
    * @param replicaMap map of (participant name, state)
@@ -147,15 +124,6 @@ public class ResourceAssignment extends HelixProperty {
   }
 
   /**
-   * Add participant, state pairs for a partition
-   * @param partitionId the partition to set
-   * @param replicaMap map of (participant name, state)
-   */
-  public void addReplicaMap(String partitionId, Map<String, String> replicaMap) {
-    _record.setMapField(partitionId, replicaMap);
-  }
-
-  /**
    * Helper for converting a map of strings to a concrete replica map
    * @param rawMap map of participant name to state name
    * @return map of participant id to state

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
index a9a6e49..2e759e6 100644
--- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
@@ -153,7 +153,7 @@ public class StateModelDefinition extends HelixProperty {
    * Get an ordered priority list of transitions
    * @return transitions in the form SRC-DEST, the first of which is highest priority
    */
-  public List<String> getStateTransitionPriorityStringList() {
+  public List<String> getStateTransitionPriorityList() {
     return _stateTransitionPriorityList;
   }
 
@@ -161,9 +161,9 @@ public class StateModelDefinition extends HelixProperty {
    * Get an ordered priority list of transitions
    * @return Transition objects, the first of which is highest priority (immutable)
    */
-  public List<Transition> getStateTransitionPriorityList() {
+  public List<Transition> getTypedStateTransitionPriorityList() {
     ImmutableList.Builder<Transition> builder = new ImmutableList.Builder<Transition>();
-    for (String transition : getStateTransitionPriorityStringList()) {
+    for (String transition : getStateTransitionPriorityList()) {
       String fromState = transition.substring(0, transition.indexOf('-'));
       String toState = transition.substring(transition.indexOf('-') + 1);
       builder.add(Transition.from(State.from(fromState), State.from(toState)));
@@ -283,8 +283,8 @@ public class StateModelDefinition extends HelixProperty {
     Map<String, String> stateConstraintMap;
 
     /**
-     * Start building a state model with a name
-     * @param name state model name
+     * Start building a state model with a id
+     * @param stateModelDefId state model id
      */
     public Builder(StateModelDefId stateModelDefId) {
       this._statemodelName = stateModelDefId.stringify();
@@ -294,9 +294,17 @@ public class StateModelDefinition extends HelixProperty {
     }
 
     /**
+     * Start building a state model with a name
+     * @param stateModelDefId state model name
+     */
+    public Builder(String stateModelName) {
+      this(StateModelDefId.from(stateModelName));
+    }
+
+    /**
      * initial state of a replica when it starts, most commonly used initial
      * state is OFFLINE
-     * @param state
+     * @param initialState
      */
     public Builder initialState(State initialState) {
       return initialState(initialState.toString());
@@ -305,7 +313,7 @@ public class StateModelDefinition extends HelixProperty {
     /**
      * initial state of a replica when it starts, most commonly used initial
      * state is OFFLINE
-     * @param state
+     * @param initialState
      */
     public Builder initialState(String initialState) {
       this.initialState = initialState;
@@ -318,7 +326,8 @@ public class StateModelDefinition extends HelixProperty {
      * STATE2 has a constraint of 3 but only one node is up then Helix will uses
      * the priority to see STATE constraint has to be given higher preference <br/>
      * Use -1 to indicates states with no constraints, like OFFLINE
-     * @param states
+     * @param state the state to add
+     * @param priority the state priority, lower number is higher priority
      */
     public Builder addState(State state, int priority) {
       return addState(state.toString(), priority);
@@ -330,7 +339,8 @@ public class StateModelDefinition extends HelixProperty {
      * STATE2 has a constraint of 3 but only one node is up then Helix will uses
      * the priority to see STATE constraint has to be given higher preference <br/>
      * Use -1 to indicates states with no constraints, like OFFLINE
-     * @param states
+     * @param state the state to add
+     * @param priority the state priority, lower number is higher priority
      */
     public Builder addState(String state, int priority) {
       statesMap.put(state, priority);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/builder/AutoModeISBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/AutoModeISBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/AutoModeISBuilder.java
index 72b2bc9..cda2f9e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/AutoModeISBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/AutoModeISBuilder.java
@@ -22,24 +22,84 @@ package org.apache.helix.model.builder;
 import java.util.ArrayList;
 import java.util.Arrays;
 
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.model.IdealState.RebalanceMode;
 
+/**
+ * IdealState builder for SEMI_AUTO mode
+ */
 public class AutoModeISBuilder extends IdealStateBuilder {
+  /**
+   * Start building a SEMI_AUTO IdealState
+   * @param resourceName the resource
+   */
   public AutoModeISBuilder(String resourceName) {
     super(resourceName);
     setRebalancerMode(RebalanceMode.SEMI_AUTO);
   }
 
-  public void add(String partitionName) {
+  /**
+   * Start building a SEMI_AUTO IdealState
+   * @param resourceId the resource
+   */
+  public AutoModeISBuilder(ResourceId resourceId) {
+    this(resourceId.stringify());
+  }
+
+  /**
+   * Add a partition; Helix will assign replicas of the partition according to preference lists
+   * @param partitionName the name of the new partition
+   * @return AutoModeISBuilder
+   */
+  public AutoModeISBuilder add(String partitionName) {
     if (_record.getListField(partitionName) == null) {
       _record.setListField(partitionName, new ArrayList<String>());
     }
+    return this;
+  }
+
+  /**
+   * Add a partition; Helix will assign replicas of the partition according to preference lists
+   * @param partitionId the id of the new partition
+   * @return AutoModeISBuilder
+   */
+  public AutoModeISBuilder add(PartitionId partitionId) {
+    if (partitionId != null) {
+      add(partitionId.stringify());
+    }
+    return this;
   }
 
+  /**
+   * Define where replicas of a partition should live
+   * @param partitionName the partition
+   * @param instanceNames ordered list of participant names
+   * @return AutoModeISBuilder
+   */
   public AutoModeISBuilder assignPreferenceList(String partitionName, String... instanceNames) {
     add(partitionName);
     _record.getListField(partitionName).addAll(Arrays.asList(instanceNames));
     return this;
   }
 
+  /**
+   * Define where replicas of a partition should live
+   * @param partitionId the partition
+   * @param participantIds ordered list of participant ids
+   * @return AutoModeISBuilder
+   */
+  public AutoModeISBuilder assignPreferenceList(PartitionId partitionId,
+      ParticipantId... participantIds) {
+    if (partitionId != null) {
+      String[] participantNames = new String[participantIds.length];
+      for (int i = 0; i < participantIds.length; i++) {
+        participantNames[i] = participantIds[i].stringify();
+      }
+      assignPreferenceList(partitionId.stringify(), participantNames);
+    }
+    return this;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/builder/AutoRebalanceModeISBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/AutoRebalanceModeISBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/AutoRebalanceModeISBuilder.java
index bfb958d..8ac3b82 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/AutoRebalanceModeISBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/AutoRebalanceModeISBuilder.java
@@ -21,24 +21,54 @@ package org.apache.helix.model.builder;
 
 import java.util.ArrayList;
 
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.model.IdealState.RebalanceMode;
 
+/**
+ * IdealState builder for FULL_AUTO mode
+ */
 public class AutoRebalanceModeISBuilder extends IdealStateBuilder {
+  /**
+   * Start building a SEMI_AUTO IdealState
+   * @param resourceName the resource
+   */
   public AutoRebalanceModeISBuilder(String resourceName) {
     super(resourceName);
     setRebalancerMode(RebalanceMode.FULL_AUTO);
   }
 
   /**
+   * Start building a SEMI_AUTO IdealState
+   * @param resourceId the resource
+   */
+  public AutoRebalanceModeISBuilder(ResourceId resourceId) {
+    this(resourceId.stringify());
+  }
+
+  /**
    * Add a partition, Helix will automatically assign the placement and state
    * for this partition at runtime.
-   * @param partitionName
+   * @param partitionName the partition to add
+   * @return AutoRebalanceModeISBuilder
    */
   public AutoRebalanceModeISBuilder add(String partitionName) {
     if (_record.getListField(partitionName) == null) {
       _record.setListField(partitionName, new ArrayList<String>());
     }
+    return this;
+  }
 
+  /**
+   * Add a partition, Helix will automatically assign the placement and state
+   * for this partition at runtime.
+   * @param partitionId the partition to add
+   * @return AutoRebalanceModeISBuilder
+   */
+  public AutoRebalanceModeISBuilder add(PartitionId partitionId) {
+    if (partitionId != null) {
+      add(partitionId.stringify());
+    }
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/builder/ClusterConstraintsBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/ClusterConstraintsBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/ClusterConstraintsBuilder.java
index f329daa..13b2a7e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/ClusterConstraintsBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/ClusterConstraintsBuilder.java
@@ -25,11 +25,8 @@ import java.util.Map;
 import org.apache.helix.api.id.ConstraintId;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.log4j.Logger;
 
 public class ClusterConstraintsBuilder {
-  private static Logger LOG = Logger.getLogger(ClusterConstraintsBuilder.class);
-
   final private ConstraintType _constraintType;
 
   /**
@@ -61,6 +58,11 @@ public class ClusterConstraintsBuilder {
     return this;
   }
 
+  public ClusterConstraintsBuilder addConstraintAttribute(String constraintId, String attribute,
+      String value) {
+    return addConstraintAttribute(ConstraintId.from(constraintId), attribute, value);
+  }
+
   public ClusterConstraints build() {
     ClusterConstraints constraints = new ClusterConstraints(_constraintType);
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/builder/CurrentStateBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/CurrentStateBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/CurrentStateBuilder.java
index 0519979..fb6235f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/CurrentStateBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/CurrentStateBuilder.java
@@ -9,6 +9,7 @@ import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SessionId;
 import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.id.StateModelFactoryId;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.CurrentState.CurrentStateProperty;
 
@@ -39,7 +40,7 @@ public class CurrentStateBuilder {
   private final Map<PartitionId, State> _partitionStateMap;
   private SessionId _sessionId;
   private StateModelDefId _stateModelDefId;
-  private String _stateModelFactoryName;
+  private StateModelFactoryId _stateModelFactoryId;
 
   /**
    * Build a current state for a given resource
@@ -93,11 +94,11 @@ public class CurrentStateBuilder {
 
   /**
    * Set the name of the state model factory
-   * @param stateModelFactoryName state model factory identifier
+   * @param stateModelFactoryIde state model factory identifier
    * @return CurrentStateBuilder
    */
-  public CurrentStateBuilder stateModelFactory(String stateModelFactoryName) {
-    _stateModelFactoryName = stateModelFactoryName;
+  public CurrentStateBuilder stateModelFactory(StateModelFactoryId stateModelFactoryId) {
+    _stateModelFactoryId = stateModelFactoryId;
     return this;
   }
 
@@ -117,7 +118,7 @@ public class CurrentStateBuilder {
     record.setSimpleField(CurrentStateProperty.STATE_MODEL_DEF.toString(),
         _stateModelDefId.toString());
     record.setSimpleField(CurrentStateProperty.STATE_MODEL_FACTORY_NAME.toString(),
-        _stateModelFactoryName);
+        _stateModelFactoryId.toString());
     return new CurrentState(record);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/builder/CustomModeISBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/CustomModeISBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/CustomModeISBuilder.java
index 65c40a0..566452a 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/CustomModeISBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/CustomModeISBuilder.java
@@ -22,31 +22,63 @@ package org.apache.helix.model.builder;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.model.IdealState.RebalanceMode;
 
+/**
+ * IdealState builder for CUSTOMIZED mode
+ */
 public class CustomModeISBuilder extends IdealStateBuilder {
-
+  /**
+   * Start building a CUSTOMIZED IdealState
+   * @param resourceName the resource
+   */
   public CustomModeISBuilder(String resourceName) {
     super(resourceName);
     setRebalancerMode(RebalanceMode.CUSTOMIZED);
   }
 
   /**
+   * Start building a SEMI_AUTO IdealState
+   * @param resourceId the resource
+   */
+  public CustomModeISBuilder(ResourceId resourceId) {
+    this(resourceId.stringify());
+  }
+
+  /**
    * Add a sub-resource
-   * @param partitionName
+   * @param partitionName partition to add
+   * @return CustomModeISBuilder
    */
-  public void add(String partitionName) {
+  public CustomModeISBuilder add(String partitionName) {
     if (_record.getMapField(partitionName) == null) {
       _record.setMapField(partitionName, new TreeMap<String, String>());
     }
+    return this;
+  }
+
+  /**
+   * Add a sub-resource
+   * @param partitionId partition to add
+   * @return CustomModeISBuilder
+   */
+  public CustomModeISBuilder add(PartitionId partitionId) {
+    if (partitionId != null) {
+      add(partitionId.stringify());
+    }
+    return this;
   }
 
   /**
    * add an instance->state assignment
-   * @param partitionName
-   * @param instanceName
-   * @param state
-   * @return
+   * @param partitionName partition to update
+   * @param instanceName participant name
+   * @param state state the replica should be in
+   * @return CustomModeISBuilder
    */
   public CustomModeISBuilder assignInstanceAndState(String partitionName, String instanceName,
       String state) {
@@ -56,4 +88,19 @@ public class CustomModeISBuilder extends IdealStateBuilder {
     return this;
   }
 
+  /**
+   * add an instance->state assignment
+   * @param partitionId partition to update
+   * @param participantId participant to assign to
+   * @param state state the replica should be in
+   * @return CustomModeISBuilder
+   */
+  public CustomModeISBuilder assignParticipantAndState(PartitionId partitionId,
+      ParticipantId participantId, State state) {
+    if (partitionId != null && participantId != null && state != null) {
+      assignInstanceAndState(partitionId.stringify(), participantId.stringify(), state.toString());
+    }
+    return this;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/builder/ExternalViewBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/ExternalViewBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/ExternalViewBuilder.java
deleted file mode 100644
index 5fe099f..0000000
--- a/helix-core/src/main/java/org/apache/helix/model/builder/ExternalViewBuilder.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.helix.model.builder;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public class ExternalViewBuilder {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index f591a24..1563769 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -55,10 +55,10 @@ import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.NewBestPossibleStateOutput;
-import org.apache.helix.controller.stages.NewCurrentStateComputationStage;
-import org.apache.helix.controller.stages.NewResourceComputationStage;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
@@ -250,7 +250,7 @@ public class ClusterStateVerifier {
       ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
       Cluster cluster = clusterAccessor.readCluster();
       // calculate best possible state
-      NewBestPossibleStateOutput bestPossOutput = ClusterStateVerifier.calcBestPossState(cluster);
+      BestPossibleStateOutput bestPossOutput = ClusterStateVerifier.calcBestPossState(cluster);
 
       // set error states
       if (errStates != null) {
@@ -416,19 +416,19 @@ public class ClusterStateVerifier {
    * @throws Exception
    */
 
-  static NewBestPossibleStateOutput calcBestPossState(Cluster cluster) throws Exception {
+  static BestPossibleStateOutput calcBestPossState(Cluster cluster) throws Exception {
     ClusterEvent event = new ClusterEvent("sampleEvent");
     event.addAttribute("ClusterDataCache", cluster);
 
-    NewResourceComputationStage rcState = new NewResourceComputationStage();
-    NewCurrentStateComputationStage csStage = new NewCurrentStateComputationStage();
-    NewBestPossibleStateCalcStage bpStage = new NewBestPossibleStateCalcStage();
+    ResourceComputationStage rcState = new ResourceComputationStage();
+    CurrentStateComputationStage csStage = new CurrentStateComputationStage();
+    BestPossibleStateCalcStage bpStage = new BestPossibleStateCalcStage();
 
     runStage(event, rcState);
     runStage(event, csStage);
     runStage(event, bpStage);
 
-    NewBestPossibleStateOutput output =
+    BestPossibleStateOutput output =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
 
     return output;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index c478bbb..369ad68 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -36,8 +36,8 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.NewBestPossibleStateOutput;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -130,13 +130,13 @@ public class TestNewStages extends ZkUnitTestBase {
 
     // Run the stage
     try {
-      new NewBestPossibleStateCalcStage().process(event);
+      new BestPossibleStateCalcStage().process(event);
     } catch (Exception e) {
       Assert.fail(e.toString());
     }
 
     // Verify the result
-    NewBestPossibleStateOutput bestPossibleStateOutput =
+    BestPossibleStateOutput bestPossibleStateOutput =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     Assert.assertNotNull(bestPossibleStateOutput);
     ResourceId resourceId = ResourceId.from("TestDB0");

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
index 18e8f4d..cb60691 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
@@ -62,12 +62,12 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
 
-    NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
+    ReadClusterDataStage stage1 = new ReadClusterDataStage();
     runStage(event, stage1);
-    NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
+    BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
     runStage(event, stage2);
 
-    NewBestPossibleStateOutput output =
+    BestPossibleStateOutput output =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     for (int p = 0; p < 5; p++) {
       Map<ParticipantId, State> replicaMap =
@@ -100,12 +100,12 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
 
-    NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
+    ReadClusterDataStage stage1 = new ReadClusterDataStage();
     runStage(event, stage1);
-    NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
+    BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
     runStage(event, stage2);
 
-    NewBestPossibleStateOutput output =
+    BestPossibleStateOutput output =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     for (int p = 0; p < 5; p++) {
       Map<ParticipantId, State> replicaMap =

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
index d3f348e..d116182 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
@@ -52,12 +52,12 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest {
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
 
-    NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
+    ReadClusterDataStage stage1 = new ReadClusterDataStage();
     runStage(event, stage1);
-    NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
+    BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
     runStage(event, stage2);
 
-    NewBestPossibleStateOutput output =
+    BestPossibleStateOutput output =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     for (int p = 0; p < 5; p++) {
       Map<ParticipantId, State> replicaMap =