You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/16 01:51:42 UTC

[5/9] [HELIX-209] Shuffling around rebalancer code to allow for compatibility

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
deleted file mode 100644
index 4a46a4c..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
+++ /dev/null
@@ -1,317 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.State;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.ReplicatedRebalancerContext;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-public class NewMessageSelectionStage extends AbstractBaseStage {
-  private static final Logger LOG = Logger.getLogger(NewMessageSelectionStage.class);
-
-  public static class Bounds {
-    private int upper;
-    private int lower;
-
-    public Bounds(int lower, int upper) {
-      this.lower = lower;
-      this.upper = upper;
-    }
-
-    public void increaseUpperBound() {
-      upper++;
-    }
-
-    public void increaseLowerBound() {
-      lower++;
-    }
-
-    public void decreaseUpperBound() {
-      upper--;
-    }
-
-    public void decreaseLowerBound() {
-      lower--;
-    }
-
-    public int getLowerBound() {
-      return lower;
-    }
-
-    public int getUpperBound() {
-      return upper;
-    }
-
-    @Override
-    public String toString() {
-      return String.format("%d-%d", lower, upper);
-    }
-  }
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    Cluster cluster = event.getAttribute("ClusterDataCache");
-    Map<StateModelDefId, StateModelDefinition> stateModelDefMap = cluster.getStateModelMap();
-    Map<ResourceId, ResourceConfig> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
-    ResourceCurrentState currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    NewMessageOutput messageGenOutput = event.getAttribute(AttributeName.MESSAGES_ALL.toString());
-    if (cluster == null || resourceMap == null || currentStateOutput == null
-        || messageGenOutput == null) {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
-    }
-
-    NewMessageOutput output = new NewMessageOutput();
-
-    for (ResourceId resourceId : resourceMap.keySet()) {
-      ResourceConfig resource = resourceMap.get(resourceId);
-      StateModelDefinition stateModelDef =
-          stateModelDefMap.get(resource.getRebalancerConfig()
-              .getRebalancerContext(RebalancerContext.class).getStateModelDefId());
-
-      // TODO have a logical model for transition
-      Map<String, Integer> stateTransitionPriorities = getStateTransitionPriorityMap(stateModelDef);
-      Resource configResource = cluster.getResource(resourceId);
-
-      // if configResource == null, the resource has been dropped
-      Map<State, Bounds> stateConstraints =
-          computeStateConstraints(stateModelDef,
-              configResource == null ? null : configResource.getRebalancerConfig(), cluster);
-
-      // TODO fix it
-      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
-        List<Message> messages = messageGenOutput.getMessages(resourceId, partitionId);
-        List<Message> selectedMessages =
-            selectMessages(cluster.getLiveParticipantMap(),
-                currentStateOutput.getCurrentStateMap(resourceId, partitionId),
-                currentStateOutput.getPendingStateMap(resourceId, partitionId), messages,
-                stateConstraints, stateTransitionPriorities, stateModelDef.getTypedInitialState());
-        output.setMessages(resourceId, partitionId, selectedMessages);
-      }
-    }
-    event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
-  }
-
-  // TODO: This method deserves its own class. The class should not understand helix but
-  // just be
-  // able to solve the problem using the algo. I think the method is following that but if
-  // we don't move it to another class its quite easy to break that contract
-  /**
-   * greedy message selection algorithm: 1) calculate CS+PS state lower/upper-bounds 2)
-   * group messages by state transition and sorted by priority 3) from highest priority to
-   * lowest, for each message group with the same transition add message one by one and
-   * make sure state constraint is not violated update state lower/upper-bounds when a new
-   * message is selected
-   * @param currentStates
-   * @param pendingStates
-   * @param messages
-   * @param stateConstraints
-   *          : STATE -> bound (lower:upper)
-   * @param stateTransitionPriorities
-   *          : FROME_STATE-TO_STATE -> priority
-   * @return: selected messages
-   */
-  List<Message> selectMessages(Map<ParticipantId, Participant> liveParticipants,
-      Map<ParticipantId, State> currentStates, Map<ParticipantId, State> pendingStates,
-      List<Message> messages, Map<State, Bounds> stateConstraints,
-      final Map<String, Integer> stateTransitionPriorities, State initialState) {
-    if (messages == null || messages.isEmpty()) {
-      return Collections.emptyList();
-    }
-
-    List<Message> selectedMessages = new ArrayList<Message>();
-    Map<State, Bounds> bounds = new HashMap<State, Bounds>();
-
-    // count currentState, if no currentState, count as in initialState
-    for (ParticipantId liveParticipantId : liveParticipants.keySet()) {
-      State state = initialState;
-      if (currentStates.containsKey(liveParticipantId)) {
-        state = currentStates.get(liveParticipantId);
-      }
-
-      if (!bounds.containsKey(state)) {
-        bounds.put(state, new Bounds(0, 0));
-      }
-      bounds.get(state).increaseLowerBound();
-      bounds.get(state).increaseUpperBound();
-    }
-
-    // count pendingStates
-    for (ParticipantId participantId : pendingStates.keySet()) {
-      State state = pendingStates.get(participantId);
-      if (!bounds.containsKey(state)) {
-        bounds.put(state, new Bounds(0, 0));
-      }
-      // TODO: add lower bound, need to refactor pendingState to include fromState also
-      bounds.get(state).increaseUpperBound();
-    }
-
-    // group messages based on state transition priority
-    Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
-        new TreeMap<Integer, List<Message>>();
-    for (Message message : messages) {
-      State fromState = message.getTypedFromState();
-      State toState = message.getTypedToState();
-      String transition = fromState.toString() + "-" + toState.toString();
-      int priority = Integer.MAX_VALUE;
-
-      if (stateTransitionPriorities.containsKey(transition)) {
-        priority = stateTransitionPriorities.get(transition);
-      }
-
-      if (!messagesGroupByStateTransitPriority.containsKey(priority)) {
-        messagesGroupByStateTransitPriority.put(priority, new ArrayList<Message>());
-      }
-      messagesGroupByStateTransitPriority.get(priority).add(message);
-    }
-
-    // select messages
-    for (List<Message> messageList : messagesGroupByStateTransitPriority.values()) {
-      for (Message message : messageList) {
-        State fromState = message.getTypedFromState();
-        State toState = message.getTypedToState();
-
-        if (!bounds.containsKey(fromState)) {
-          LOG.error("Message's fromState is not in currentState. message: " + message);
-          continue;
-        }
-
-        if (!bounds.containsKey(toState)) {
-          bounds.put(toState, new Bounds(0, 0));
-        }
-
-        // check lower bound of fromState
-        if (stateConstraints.containsKey(fromState)) {
-          int newLowerBound = bounds.get(fromState).getLowerBound() - 1;
-          if (newLowerBound < 0) {
-            LOG.error("Number of currentState in " + fromState
-                + " is less than number of messages transiting from " + fromState);
-            continue;
-          }
-
-          if (newLowerBound < stateConstraints.get(fromState).getLowerBound()) {
-            continue;
-          }
-        }
-
-        // check upper bound of toState
-        if (stateConstraints.containsKey(toState)) {
-          int newUpperBound = bounds.get(toState).getUpperBound() + 1;
-          if (newUpperBound > stateConstraints.get(toState).getUpperBound()) {
-            continue;
-          }
-        }
-
-        selectedMessages.add(message);
-        bounds.get(fromState).increaseLowerBound();
-        bounds.get(toState).increaseUpperBound();
-      }
-    }
-
-    return selectedMessages;
-  }
-
-  /**
-   * TODO: This code is duplicate in multiple places. Can we do it in to one place in the
-   * beginning and compute the stateConstraint instance once and re use at other places.
-   * Each IdealState must have a constraint object associated with it
-   * @param stateModelDefinition
-   * @param rebalancerConfig if rebalancerConfig == null, we can't evaluate R thus no constraints
-   * @param cluster
-   * @return
-   */
-  private Map<State, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
-      RebalancerConfig rebalancerConfig, Cluster cluster) {
-    ReplicatedRebalancerContext context =
-        (rebalancerConfig != null) ? rebalancerConfig
-            .getRebalancerContext(ReplicatedRebalancerContext.class) : null;
-    Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
-
-    List<State> statePriorityList = stateModelDefinition.getTypedStatesPriorityList();
-    for (State state : statePriorityList) {
-      String numInstancesPerState =
-          cluster.getStateUpperBoundConstraint(Scope.cluster(cluster.getId()),
-              stateModelDefinition.getStateModelDefId(), state);
-      int max = -1;
-      if ("N".equals(numInstancesPerState)) {
-        max = cluster.getLiveParticipantMap().size();
-      } else if ("R".equals(numInstancesPerState)) {
-        // idealState is null when resource has been dropped,
-        // R can't be evaluated and ignore state constraints
-        if (context != null) {
-          if (context.anyLiveParticipant()) {
-            max = cluster.getLiveParticipantMap().size();
-          } else {
-            max = context.getReplicaCount();
-          }
-        }
-      } else {
-        try {
-          max = Integer.parseInt(numInstancesPerState);
-        } catch (Exception e) {
-          // use -1
-        }
-      }
-
-      if (max > -1) {
-        // if state has no constraint, will not put in map
-        stateConstraints.put(state, new Bounds(0, max));
-      }
-    }
-
-    return stateConstraints;
-  }
-
-  // TODO: if state transition priority is not provided then use lexicographical sorting
-  // so that behavior is consistent
-  private Map<String, Integer> getStateTransitionPriorityMap(StateModelDefinition stateModelDef) {
-    Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
-    List<String> stateTransitionPriorityList = stateModelDef.getStateTransitionPriorityStringList();
-    for (int i = 0; i < stateTransitionPriorityList.size(); i++) {
-      stateTransitionPriorities.put(stateTransitionPriorityList.get(i), i);
-    }
-
-    return stateTransitionPriorities;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
deleted file mode 100644
index dfea7fc..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
+++ /dev/null
@@ -1,198 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.ClusterConstraints;
-import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
-import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.ClusterConstraints.ConstraintValue;
-import org.apache.helix.model.ConstraintItem;
-import org.apache.helix.model.Message;
-import org.apache.log4j.Logger;
-
-public class NewMessageThrottleStage extends AbstractBaseStage {
-  private static final Logger LOG = Logger.getLogger(NewMessageThrottleStage.class.getName());
-
-  int valueOf(String valueStr) {
-    int value = Integer.MAX_VALUE;
-
-    try {
-      ConstraintValue valueToken = ConstraintValue.valueOf(valueStr);
-      switch (valueToken) {
-      case ANY:
-        value = Integer.MAX_VALUE;
-        break;
-      default:
-        LOG.error("Invalid constraintValue token:" + valueStr + ". Use default value:"
-            + Integer.MAX_VALUE);
-        break;
-      }
-    } catch (Exception e) {
-      try {
-        value = Integer.parseInt(valueStr);
-      } catch (NumberFormatException ne) {
-        LOG.error("Invalid constraintValue string:" + valueStr + ". Use default value:"
-            + Integer.MAX_VALUE);
-      }
-    }
-    return value;
-  }
-
-  /**
-   * constraints are selected in the order of the following rules: 1) don't select
-   * constraints with CONSTRAINT_VALUE=ANY; 2) if one constraint is more specific than the
-   * other, select the most specific one 3) if a message matches multiple constraints of
-   * incomparable specificity, select the one with the minimum value 4) if a message
-   * matches multiple constraints of incomparable specificity, and they all have the same
-   * value, select the first in alphabetic order
-   */
-  Set<ConstraintItem> selectConstraints(Set<ConstraintItem> items,
-      Map<ConstraintAttribute, String> attributes) {
-    Map<String, ConstraintItem> selectedItems = new HashMap<String, ConstraintItem>();
-    for (ConstraintItem item : items) {
-      // don't select constraints with CONSTRAINT_VALUE=ANY
-      if (item.getConstraintValue().equals(ConstraintValue.ANY.toString())) {
-        continue;
-      }
-
-      String key = item.filter(attributes).toString();
-      if (!selectedItems.containsKey(key)) {
-        selectedItems.put(key, item);
-      } else {
-        ConstraintItem existingItem = selectedItems.get(key);
-        if (existingItem.match(item.getAttributes())) {
-          // item is more specific than existingItem
-          selectedItems.put(key, item);
-        } else if (!item.match(existingItem.getAttributes())) {
-          // existingItem and item are of incomparable specificity
-          int value = valueOf(item.getConstraintValue());
-          int existingValue = valueOf(existingItem.getConstraintValue());
-          if (value < existingValue) {
-            // item's constraint value is less than that of existingItem
-            selectedItems.put(key, item);
-          } else if (value == existingValue) {
-            if (item.toString().compareTo(existingItem.toString()) < 0) {
-              // item is ahead of existingItem in alphabetic order
-              selectedItems.put(key, item);
-            }
-          }
-        }
-      }
-    }
-    return new HashSet<ConstraintItem>(selectedItems.values());
-  }
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    Cluster cluster = event.getAttribute("ClusterDataCache");
-    NewMessageOutput msgSelectionOutput =
-        event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
-    Map<ResourceId, ResourceConfig> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
-
-    if (cluster == null || resourceMap == null || msgSelectionOutput == null) {
-      throw new StageException("Missing attributes in event: " + event
-          + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
-    }
-
-    NewMessageOutput output = new NewMessageOutput();
-
-    // TODO fix it
-    ClusterConstraints constraint = cluster.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
-    Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
-
-    if (constraint != null) {
-      // go through all pending messages, they should be counted but not throttled
-      for (ParticipantId participantId : cluster.getLiveParticipantMap().keySet()) {
-        Participant liveParticipant = cluster.getLiveParticipantMap().get(participantId);
-        throttle(throttleCounterMap, constraint, new ArrayList<Message>(liveParticipant
-            .getMessageMap().values()), false);
-      }
-    }
-
-    // go through all new messages, throttle if necessary
-    // assume messages should be sorted by state transition priority in messageSelection stage
-    for (ResourceId resourceId : resourceMap.keySet()) {
-      ResourceConfig resource = resourceMap.get(resourceId);
-      // TODO fix it
-      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
-        List<Message> messages = msgSelectionOutput.getMessages(resourceId, partitionId);
-        if (constraint != null && messages != null && messages.size() > 0) {
-          messages = throttle(throttleCounterMap, constraint, messages, true);
-        }
-        output.setMessages(resourceId, partitionId, messages);
-      }
-    }
-
-    event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output);
-  }
-
-  private List<Message> throttle(Map<String, Integer> throttleMap, ClusterConstraints constraint,
-      List<Message> messages, final boolean needThrottle) {
-
-    List<Message> throttleOutputMsgs = new ArrayList<Message>();
-    for (Message message : messages) {
-      Map<ConstraintAttribute, String> msgAttr = ClusterConstraints.toConstraintAttributes(message);
-
-      Set<ConstraintItem> matches = constraint.match(msgAttr);
-      matches = selectConstraints(matches, msgAttr);
-
-      boolean msgThrottled = false;
-      for (ConstraintItem item : matches) {
-        String key = item.filter(msgAttr).toString();
-        if (!throttleMap.containsKey(key)) {
-          throttleMap.put(key, valueOf(item.getConstraintValue()));
-        }
-        int value = throttleMap.get(key);
-        throttleMap.put(key, --value);
-
-        if (needThrottle && value < 0) {
-          msgThrottled = true;
-
-          if (LOG.isDebugEnabled()) {
-            // TODO: printout constraint item that throttles the message
-            LOG.debug("message: " + message + " is throttled by constraint: " + item);
-          }
-        }
-      }
-
-      if (!msgThrottled) {
-        throttleOutputMsgs.add(message);
-      }
-    }
-
-    return throttleOutputMsgs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
deleted file mode 100644
index 26050f8..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.accessor.ClusterAccessor;
-import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
-import org.apache.log4j.Logger;
-
-public class NewReadClusterDataStage extends AbstractBaseStage {
-  private static final Logger LOG = Logger.getLogger(NewReadClusterDataStage.class.getName());
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    long startTime = System.currentTimeMillis();
-    LOG.info("START ReadClusterDataStage.process()");
-
-    HelixManager manager = event.getAttribute("helixmanager");
-    if (manager == null) {
-      throw new StageException("HelixManager attribute value is null");
-    }
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    ClusterId clusterId = ClusterId.from(manager.getClusterName());
-    ClusterAccessor clusterAccessor = new ClusterAccessor(clusterId, accessor);
-
-    Cluster cluster = clusterAccessor.readCluster();
-
-    ClusterStatusMonitor clusterStatusMonitor =
-        (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
-    if (clusterStatusMonitor != null) {
-      // TODO fix it
-      // int disabledInstances = 0;
-      // int disabledPartitions = 0;
-      // for (InstanceConfig config : _cache._instanceConfigMap.values()) {
-      // if (config.getInstanceEnabled() == false) {
-      // disabledInstances++;
-      // }
-      // if (config.getDisabledPartitions() != null) {
-      // disabledPartitions += config.getDisabledPartitions().size();
-      // }
-      // }
-      // clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(),
-      // _cache._instanceConfigMap.size(), disabledInstances, disabledPartitions);
-    }
-
-    event.addAttribute("ClusterDataCache", cluster);
-
-    long endTime = System.currentTimeMillis();
-    LOG.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
deleted file mode 100644
index b531bd7..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ /dev/null
@@ -1,138 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.model.CurrentState;
-import org.apache.log4j.Logger;
-
-/**
- * This stage computes all the resources in a cluster. The resources are
- * computed from IdealStates -> this gives all the resources currently active
- * CurrentState for liveInstance-> Helps in finding resources that are inactive
- * and needs to be dropped
- */
-public class NewResourceComputationStage extends AbstractBaseStage {
-  private static Logger LOG = Logger.getLogger(NewResourceComputationStage.class);
-
-  @Override
-  public void process(ClusterEvent event) throws StageException {
-    Cluster cluster = event.getAttribute("ClusterDataCache");
-    if (cluster == null) {
-      throw new StageException("Missing attributes in event: " + event + ". Requires Cluster");
-    }
-
-    Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
-    Map<ResourceId, ResourceConfig> csResCfgMap = getCurStateResourceCfgMap(cluster);
-
-    // ideal-state may be removed, add all resource config in current-state but not in ideal-state
-    for (ResourceId resourceId : csResCfgMap.keySet()) {
-      if (!cluster.getResourceMap().keySet().contains(resourceId)) {
-        resCfgMap.put(resourceId, csResCfgMap.get(resourceId));
-      }
-    }
-
-    for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
-      Resource resource = cluster.getResource(resourceId);
-      RebalancerConfig rebalancerCfg = resource.getRebalancerConfig();
-
-      ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
-      resCfgBuilder.bucketSize(resource.getBucketSize());
-      resCfgBuilder.batchMessageMode(resource.getBatchMessageMode());
-      resCfgBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
-      resCfgBuilder.rebalancerContext(rebalancerCfg.getRebalancerContext(RebalancerContext.class));
-      resCfgMap.put(resourceId, resCfgBuilder.build());
-    }
-
-    event.addAttribute(AttributeName.RESOURCES.toString(), resCfgMap);
-  }
-
-  /**
-   * Get resource config's from current-state
-   * @param cluster
-   * @return resource config map or empty map if not available
-   * @throws StageException
-   */
-  Map<ResourceId, ResourceConfig> getCurStateResourceCfgMap(Cluster cluster) throws StageException {
-    Map<ResourceId, ResourceConfig.Builder> resCfgBuilderMap =
-        new HashMap<ResourceId, ResourceConfig.Builder>();
-
-    Map<ResourceId, PartitionedRebalancerContext.Builder> rebCtxBuilderMap =
-        new HashMap<ResourceId, PartitionedRebalancerContext.Builder>();
-
-    for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
-      for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
-        CurrentState currentState = liveParticipant.getCurrentStateMap().get(resourceId);
-
-        if (currentState.getStateModelDefRef() == null) {
-          LOG.error("state model def is null." + "resource:" + currentState.getResourceId()
-              + ", partitions: " + currentState.getPartitionStateMap().keySet()
-              + ", states: " + currentState.getPartitionStateMap().values());
-          throw new StageException("State model def is null for resource:"
-              + currentState.getResourceId());
-        }
-
-        if (!resCfgBuilderMap.containsKey(resourceId)) {
-          PartitionedRebalancerContext.Builder rebCtxBuilder =
-              new PartitionedRebalancerContext.Builder(resourceId);
-          rebCtxBuilder.stateModelDefId(currentState.getStateModelDefId());
-          rebCtxBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
-              .getStateModelFactoryName()));
-          rebCtxBuilderMap.put(resourceId, rebCtxBuilder);
-
-          ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
-          resCfgBuilder.bucketSize(currentState.getBucketSize());
-          resCfgBuilder.batchMessageMode(currentState.getBatchMessageMode());
-          resCfgBuilderMap.put(resourceId, resCfgBuilder);
-        }
-
-        PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
-        for (PartitionId partitionId : currentState.getTypedPartitionStateMap().keySet()) {
-          rebCtxBuilder.addPartition(new Partition(partitionId));
-        }
-      }
-    }
-
-    Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
-    for (ResourceId resourceId : resCfgBuilderMap.keySet()) {
-      ResourceConfig.Builder resCfgBuilder = resCfgBuilderMap.get(resourceId);
-      PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
-      resCfgBuilder.rebalancerContext(rebCtxBuilder.build());
-      resCfgMap.put(resourceId, resCfgBuilder.build());
-    }
-
-    return resCfgMap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
deleted file mode 100644
index 51c9284..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
+++ /dev/null
@@ -1,151 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerProperties;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.Message;
-import org.apache.log4j.Logger;
-
-public class NewTaskAssignmentStage extends AbstractBaseStage {
-  private static Logger logger = Logger.getLogger(NewTaskAssignmentStage.class);
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    long startTime = System.currentTimeMillis();
-    logger.info("START TaskAssignmentStage.process()");
-
-    HelixManager manager = event.getAttribute("helixmanager");
-    Map<ResourceId, ResourceConfig> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
-    NewMessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
-    Cluster cluster = event.getAttribute("ClusterDataCache");
-    Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
-
-    if (manager == null || resourceMap == null || messageOutput == null || cluster == null
-        || liveParticipantMap == null) {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache|liveInstanceMap");
-    }
-
-    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
-    List<Message> messagesToSend = new ArrayList<Message>();
-    for (ResourceId resourceId : resourceMap.keySet()) {
-      ResourceConfig resource = resourceMap.get(resourceId);
-      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
-        List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
-        messagesToSend.addAll(messages);
-      }
-    }
-
-    List<Message> outputMessages =
-        batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveParticipantMap,
-            manager.getProperties());
-    sendMessages(dataAccessor, outputMessages);
-
-    long endTime = System.currentTimeMillis();
-    logger.info("END TaskAssignmentStage.process(). took: " + (endTime - startTime) + " ms");
-
-  }
-
-  List<Message> batchMessage(Builder keyBuilder, List<Message> messages,
-      Map<ResourceId, ResourceConfig> resourceMap,
-      Map<ParticipantId, Participant> liveParticipantMap, HelixManagerProperties properties) {
-    // group messages by its CurrentState path + "/" + fromState + "/" + toState
-    Map<String, Message> batchMessages = new HashMap<String, Message>();
-    List<Message> outputMessages = new ArrayList<Message>();
-
-    Iterator<Message> iter = messages.iterator();
-    while (iter.hasNext()) {
-      Message message = iter.next();
-      ResourceId resourceId = message.getResourceId();
-      ResourceConfig resource = resourceMap.get(resourceId);
-
-      ParticipantId participantId = ParticipantId.from(message.getTgtName());
-      Participant liveParticipant = liveParticipantMap.get(participantId);
-      String participantVersion = null;
-      if (liveParticipant != null) {
-        participantVersion = liveParticipant.getRunningInstance().getVersion().toString();
-      }
-
-      if (resource == null || !resource.getBatchMessageMode() || participantVersion == null
-          || !properties.isFeatureSupported("batch_message", participantVersion)) {
-        outputMessages.add(message);
-        continue;
-      }
-
-      String key =
-          keyBuilder.currentState(message.getTgtName(), message.getTypedTgtSessionId().stringify(),
-              message.getResourceId().stringify()).getPath()
-              + "/" + message.getTypedFromState() + "/" + message.getTypedToState();
-
-      if (!batchMessages.containsKey(key)) {
-        Message batchMessage = new Message(message.getRecord());
-        batchMessage.setBatchMessageMode(true);
-        outputMessages.add(batchMessage);
-        batchMessages.put(key, batchMessage);
-      }
-      batchMessages.get(key).addPartitionName(message.getPartitionId().stringify());
-    }
-
-    return outputMessages;
-  }
-
-  protected void sendMessages(HelixDataAccessor dataAccessor, List<Message> messages) {
-    if (messages == null || messages.isEmpty()) {
-      return;
-    }
-
-    Builder keyBuilder = dataAccessor.keyBuilder();
-
-    List<PropertyKey> keys = new ArrayList<PropertyKey>();
-    for (Message message : messages) {
-      logger.info("Sending Message " + message.getMessageId() + " to " + message.getTgtName()
-          + " transit " + message.getPartitionId() + "|" + message.getPartitionIds() + " from:"
-          + message.getTypedFromState() + " to:" + message.getTypedToState());
-
-      // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to "
-      // + message.getTgtName() + " transit " + message.getPartitionId() + "|"
-      // + message.getPartitionIds() + " from: " + message.getFromState() + " to: "
-      // + message.getToState());
-
-      keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
-    }
-
-    dataAccessor.createChildren(keys, new ArrayList<Message>(messages));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index be0b7f0..31dbb08 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -35,7 +35,7 @@ public class PersistAssignmentStage extends AbstractBaseStage {
     HelixManager helixManager = event.getAttribute("helixmanager");
     HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
     ResourceAccessor resourceAccessor = new ResourceAccessor(accessor);
-    NewBestPossibleStateOutput assignments =
+    BestPossibleStateOutput assignments =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     for (ResourceId resourceId : assignments.getAssignedResources()) {
       ResourceAssignment assignment = assignments.getResourceAssignment(resourceId);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index ce81f1f..44fddb6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -21,53 +21,53 @@ package org.apache.helix.controller.stages;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
-@Deprecated
 public class ReadClusterDataStage extends AbstractBaseStage {
-  private static final Logger logger = Logger.getLogger(ReadClusterDataStage.class.getName());
-  ClusterDataCache _cache;
-
-  public ReadClusterDataStage() {
-    _cache = new ClusterDataCache();
-  }
+  private static final Logger LOG = Logger.getLogger(ReadClusterDataStage.class.getName());
 
   @Override
   public void process(ClusterEvent event) throws Exception {
     long startTime = System.currentTimeMillis();
-    logger.info("START ReadClusterDataStage.process()");
+    LOG.info("START ReadClusterDataStage.process()");
 
     HelixManager manager = event.getAttribute("helixmanager");
     if (manager == null) {
       throw new StageException("HelixManager attribute value is null");
     }
-    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
-    _cache.refresh(dataAccessor);
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    ClusterId clusterId = ClusterId.from(manager.getClusterName());
+    ClusterAccessor clusterAccessor = new ClusterAccessor(clusterId, accessor);
+
+    Cluster cluster = clusterAccessor.readCluster();
 
     ClusterStatusMonitor clusterStatusMonitor =
         (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
     if (clusterStatusMonitor != null) {
-      int disabledInstances = 0;
-      int disabledPartitions = 0;
-      for (InstanceConfig config : _cache._instanceConfigMap.values()) {
-        if (config.getInstanceEnabled() == false) {
-          disabledInstances++;
-        }
-        if (config.getDisabledPartitions() != null) {
-          disabledPartitions += config.getDisabledPartitions().size();
-        }
-      }
-      clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(),
-          _cache._instanceConfigMap.size(), disabledInstances, disabledPartitions);
+      // TODO fix it
+      // int disabledInstances = 0;
+      // int disabledPartitions = 0;
+      // for (InstanceConfig config : _cache._instanceConfigMap.values()) {
+      // if (config.getInstanceEnabled() == false) {
+      // disabledInstances++;
+      // }
+      // if (config.getDisabledPartitions() != null) {
+      // disabledPartitions += config.getDisabledPartitions().size();
+      // }
+      // }
+      // clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(),
+      // _cache._instanceConfigMap.size(), disabledInstances, disabledPartitions);
     }
 
-    event.addAttribute("ClusterDataCache", _cache);
+    event.addAttribute("ClusterDataCache", cluster);
 
     long endTime = System.currentTimeMillis();
-    logger.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
+    LOG.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
index ae873c7..859c1d0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
@@ -23,10 +23,8 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.log4j.Logger;
 
 public class ReadHealthDataStage extends AbstractBaseStage {
-  private static final Logger LOG = Logger.getLogger(ReadHealthDataStage.class.getName());
   HealthDataCache _cache;
 
   public ReadHealthDataStage() {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index da38ee2..dc56b89 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -19,16 +19,23 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-import java.util.LinkedHashMap;
+import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.api.rebalancer.PartitionedRebalancerContext;
+import org.apache.helix.api.rebalancer.RebalancerConfig;
+import org.apache.helix.api.rebalancer.RebalancerContext;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Resource;
 import org.apache.log4j.Logger;
 
 /**
@@ -37,102 +44,95 @@ import org.apache.log4j.Logger;
  * CurrentState for liveInstance-> Helps in finding resources that are inactive
  * and needs to be dropped
  */
-@Deprecated
 public class ResourceComputationStage extends AbstractBaseStage {
   private static Logger LOG = Logger.getLogger(ResourceComputationStage.class);
 
   @Override
-  public void process(ClusterEvent event) throws Exception {
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    if (cache == null) {
-      throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
+  public void process(ClusterEvent event) throws StageException {
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    if (cluster == null) {
+      throw new StageException("Missing attributes in event: " + event + ". Requires Cluster");
     }
 
-    Map<String, IdealState> idealStates = cache.getIdealStates();
+    Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
+    Map<ResourceId, ResourceConfig> csResCfgMap = getCurStateResourceCfgMap(cluster);
 
-    Map<String, Resource> resourceMap = new LinkedHashMap<String, Resource>();
+    // ideal-state may be removed, add all resource config in current-state but not in ideal-state
+    for (ResourceId resourceId : csResCfgMap.keySet()) {
+      if (!cluster.getResourceMap().keySet().contains(resourceId)) {
+        resCfgMap.put(resourceId, csResCfgMap.get(resourceId));
+      }
+    }
 
-    if (idealStates != null && idealStates.size() > 0) {
-      for (IdealState idealState : idealStates.values()) {
-        Set<String> partitionSet = idealState.getPartitionSet();
-        String resourceName = idealState.getResourceName();
+    for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
+      Resource resource = cluster.getResource(resourceId);
+      RebalancerConfig rebalancerCfg = resource.getRebalancerConfig();
 
-        for (String partition : partitionSet) {
-          addPartition(partition, resourceName, resourceMap);
-          Resource resource = resourceMap.get(resourceName);
-          resource.setStateModelDefRef(idealState.getStateModelDefRef());
-          resource.setStateModelFactoryName(idealState.getStateModelFactoryName());
-          resource.setBucketSize(idealState.getBucketSize());
-          resource.setBatchMessageMode(idealState.getBatchMessageMode());
-        }
-      }
+      ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
+      resCfgBuilder.bucketSize(resource.getBucketSize());
+      resCfgBuilder.batchMessageMode(resource.getBatchMessageMode());
+      resCfgBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
+      resCfgBuilder.rebalancerContext(rebalancerCfg.getRebalancerContext(RebalancerContext.class));
+      resCfgMap.put(resourceId, resCfgBuilder.build());
     }
 
-    // It's important to get partitions from CurrentState as well since the
-    // idealState might be removed.
-    Map<String, LiveInstance> availableInstances = cache.getLiveInstances();
+    event.addAttribute(AttributeName.RESOURCES.toString(), resCfgMap);
+  }
 
-    if (availableInstances != null && availableInstances.size() > 0) {
-      for (LiveInstance instance : availableInstances.values()) {
-        String instanceName = instance.getInstanceName();
-        String clientSessionId = instance.getTypedSessionId().stringify();
+  /**
+   * Get resource config's from current-state
+   * @param cluster
+   * @return resource config map or empty map if not available
+   * @throws StageException
+   */
+  Map<ResourceId, ResourceConfig> getCurStateResourceCfgMap(Cluster cluster) throws StageException {
+    Map<ResourceId, ResourceConfig.Builder> resCfgBuilderMap =
+        new HashMap<ResourceId, ResourceConfig.Builder>();
+
+    Map<ResourceId, PartitionedRebalancerContext.Builder> rebCtxBuilderMap =
+        new HashMap<ResourceId, PartitionedRebalancerContext.Builder>();
+
+    for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
+      for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
+        CurrentState currentState = liveParticipant.getCurrentStateMap().get(resourceId);
+
+        if (currentState.getStateModelDefRef() == null) {
+          LOG.error("state model def is null." + "resource:" + currentState.getResourceId()
+              + ", partitions: " + currentState.getPartitionStateMap().keySet()
+              + ", states: " + currentState.getPartitionStateMap().values());
+          throw new StageException("State model def is null for resource:"
+              + currentState.getResourceId());
+        }
 
-        Map<String, CurrentState> currentStateMap =
-            cache.getCurrentState(instanceName, clientSessionId);
-        if (currentStateMap == null || currentStateMap.size() == 0) {
-          continue;
+        if (!resCfgBuilderMap.containsKey(resourceId)) {
+          PartitionedRebalancerContext.Builder rebCtxBuilder =
+              new PartitionedRebalancerContext.Builder(resourceId);
+          rebCtxBuilder.stateModelDefId(currentState.getStateModelDefId());
+          rebCtxBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
+              .getStateModelFactoryName()));
+          rebCtxBuilderMap.put(resourceId, rebCtxBuilder);
+
+          ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
+          resCfgBuilder.bucketSize(currentState.getBucketSize());
+          resCfgBuilder.batchMessageMode(currentState.getBatchMessageMode());
+          resCfgBuilderMap.put(resourceId, resCfgBuilder);
         }
-        for (CurrentState currentState : currentStateMap.values()) {
-
-          String resourceName = currentState.getResourceName();
-          Map<String, String> resourceStateMap = currentState.getPartitionStateMap();
-
-          // don't overwrite ideal state settings
-          if (!resourceMap.containsKey(resourceName)) {
-            addResource(resourceName, resourceMap);
-            Resource resource = resourceMap.get(resourceName);
-            resource.setStateModelDefRef(currentState.getStateModelDefRef());
-            resource.setStateModelFactoryName(currentState.getStateModelFactoryName());
-            resource.setBucketSize(currentState.getBucketSize());
-            resource.setBatchMessageMode(currentState.getBatchMessageMode());
-          }
-
-          if (currentState.getStateModelDefRef() == null) {
-            LOG.error("state model def is null." + "resource:" + currentState.getResourceName()
-                + ", partitions: " + currentState.getPartitionStateMap().keySet()
-                + ", states: " + currentState.getPartitionStateMap().values());
-            throw new StageException("State model def is null for resource:"
-                + currentState.getResourceName());
-          }
-
-          for (String partition : resourceStateMap.keySet()) {
-            addPartition(partition, resourceName, resourceMap);
-          }
+
+        PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
+        for (PartitionId partitionId : currentState.getTypedPartitionStateMap().keySet()) {
+          rebCtxBuilder.addPartition(new Partition(partitionId));
         }
       }
     }
 
-    event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
-  }
-
-  private void addResource(String resource, Map<String, Resource> resourceMap) {
-    if (resource == null || resourceMap == null) {
-      return;
-    }
-    if (!resourceMap.containsKey(resource)) {
-      resourceMap.put(resource, new Resource(resource));
-    }
-  }
-
-  private void addPartition(String partition, String resourceName, Map<String, Resource> resourceMap) {
-    if (resourceName == null || partition == null || resourceMap == null) {
-      return;
-    }
-    if (!resourceMap.containsKey(resourceName)) {
-      resourceMap.put(resourceName, new Resource(resourceName));
+    Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
+    for (ResourceId resourceId : resCfgBuilderMap.keySet()) {
+      ResourceConfig.Builder resCfgBuilder = resCfgBuilderMap.get(resourceId);
+      PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
+      resCfgBuilder.rebalancerContext(rebCtxBuilder.build());
+      resCfgMap.put(resourceId, resCfgBuilder.build());
     }
-    Resource resource = resourceMap.get(resourceName);
-    resource.addPartition(partition);
 
+    return resCfgMap;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index c942db9..02188be 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -30,16 +30,17 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerProperties;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
 import org.apache.log4j.Logger;
 
-@Deprecated
 public class TaskAssignmentStage extends AbstractBaseStage {
   private static Logger logger = Logger.getLogger(TaskAssignmentStage.class);
 
@@ -49,30 +50,30 @@ public class TaskAssignmentStage extends AbstractBaseStage {
     logger.info("START TaskAssignmentStage.process()");
 
     HelixManager manager = event.getAttribute("helixmanager");
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-    MessageThrottleStageOutput messageOutput =
-        event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
-
-    if (manager == null || resourceMap == null || messageOutput == null || cache == null
-        || liveInstanceMap == null) {
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
+    MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
+
+    if (manager == null || resourceMap == null || messageOutput == null || cluster == null
+        || liveParticipantMap == null) {
       throw new StageException("Missing attributes in event:" + event
           + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache|liveInstanceMap");
     }
 
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
     List<Message> messagesToSend = new ArrayList<Message>();
-    for (String resourceName : resourceMap.keySet()) {
-      Resource resource = resourceMap.get(resourceName);
-      for (Partition partition : resource.getPartitions()) {
-        List<Message> messages = messageOutput.getMessages(resourceName, partition);
+    for (ResourceId resourceId : resourceMap.keySet()) {
+      ResourceConfig resource = resourceMap.get(resourceId);
+      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
+        List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
         messagesToSend.addAll(messages);
       }
     }
 
     List<Message> outputMessages =
-        batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveInstanceMap,
+        batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveParticipantMap,
             manager.getProperties());
     sendMessages(dataAccessor, outputMessages);
 
@@ -82,8 +83,8 @@ public class TaskAssignmentStage extends AbstractBaseStage {
   }
 
   List<Message> batchMessage(Builder keyBuilder, List<Message> messages,
-      Map<String, Resource> resourceMap, Map<String, LiveInstance> liveInstanceMap,
-      HelixManagerProperties properties) {
+      Map<ResourceId, ResourceConfig> resourceMap,
+      Map<ParticipantId, Participant> liveParticipantMap, HelixManagerProperties properties) {
     // group messages by its CurrentState path + "/" + fromState + "/" + toState
     Map<String, Message> batchMessages = new HashMap<String, Message>();
     List<Message> outputMessages = new ArrayList<Message>();
@@ -92,13 +93,13 @@ public class TaskAssignmentStage extends AbstractBaseStage {
     while (iter.hasNext()) {
       Message message = iter.next();
       ResourceId resourceId = message.getResourceId();
-      Resource resource = resourceMap.get(resourceId.stringify());
+      ResourceConfig resource = resourceMap.get(resourceId);
 
-      String instanceName = message.getTgtName();
-      LiveInstance liveInstance = liveInstanceMap.get(instanceName);
+      ParticipantId participantId = ParticipantId.from(message.getTgtName());
+      Participant liveParticipant = liveParticipantMap.get(participantId);
       String participantVersion = null;
-      if (liveInstance != null) {
-        participantVersion = liveInstance.getTypedHelixVersion().toString();
+      if (liveParticipant != null) {
+        participantVersion = liveParticipant.getRunningInstance().getVersion().toString();
       }
 
       if (resource == null || !resource.getBatchMessageMode() || participantVersion == null
@@ -137,10 +138,10 @@ public class TaskAssignmentStage extends AbstractBaseStage {
           + " transit " + message.getPartitionId() + "|" + message.getPartitionIds() + " from:"
           + message.getTypedFromState() + " to:" + message.getTypedToState());
 
-      // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to " +
-      // message.getTgtName()
-      // + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
-      // + " from: " + message.getFromState() + " to: " + message.getToState());
+      // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to "
+      // + message.getTgtName() + " transit " + message.getPartitionId() + "|"
+      // + message.getPartitionIds() + " from: " + message.getFromState() + " to: "
+      // + message.getToState());
 
       keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 7d84258..805f6bf 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -38,7 +38,7 @@ import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.rebalancer.context.RebalancerRef;
+import org.apache.helix.api.rebalancer.RebalancerRef;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Function;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index f591a24..1563769 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -55,10 +55,10 @@ import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.NewBestPossibleStateOutput;
-import org.apache.helix.controller.stages.NewCurrentStateComputationStage;
-import org.apache.helix.controller.stages.NewResourceComputationStage;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
@@ -250,7 +250,7 @@ public class ClusterStateVerifier {
       ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
       Cluster cluster = clusterAccessor.readCluster();
       // calculate best possible state
-      NewBestPossibleStateOutput bestPossOutput = ClusterStateVerifier.calcBestPossState(cluster);
+      BestPossibleStateOutput bestPossOutput = ClusterStateVerifier.calcBestPossState(cluster);
 
       // set error states
       if (errStates != null) {
@@ -416,19 +416,19 @@ public class ClusterStateVerifier {
    * @throws Exception
    */
 
-  static NewBestPossibleStateOutput calcBestPossState(Cluster cluster) throws Exception {
+  static BestPossibleStateOutput calcBestPossState(Cluster cluster) throws Exception {
     ClusterEvent event = new ClusterEvent("sampleEvent");
     event.addAttribute("ClusterDataCache", cluster);
 
-    NewResourceComputationStage rcState = new NewResourceComputationStage();
-    NewCurrentStateComputationStage csStage = new NewCurrentStateComputationStage();
-    NewBestPossibleStateCalcStage bpStage = new NewBestPossibleStateCalcStage();
+    ResourceComputationStage rcState = new ResourceComputationStage();
+    CurrentStateComputationStage csStage = new CurrentStateComputationStage();
+    BestPossibleStateCalcStage bpStage = new BestPossibleStateCalcStage();
 
     runStage(event, rcState);
     runStage(event, csStage);
     runStage(event, bpStage);
 
-    NewBestPossibleStateOutput output =
+    BestPossibleStateOutput output =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
 
     return output;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
index 85330be..f48ebbc 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
@@ -45,10 +45,10 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.api.rebalancer.CustomRebalancerContext;
+import org.apache.helix.api.rebalancer.PartitionedRebalancerContext;
+import org.apache.helix.api.rebalancer.RebalancerContext;
+import org.apache.helix.api.rebalancer.SemiAutoRebalancerContext;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index c478bbb..7fe3314 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -33,11 +33,11 @@ import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.api.rebalancer.SemiAutoRebalancerContext;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.NewBestPossibleStateOutput;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -130,13 +130,13 @@ public class TestNewStages extends ZkUnitTestBase {
 
     // Run the stage
     try {
-      new NewBestPossibleStateCalcStage().process(event);
+      new BestPossibleStateCalcStage().process(event);
     } catch (Exception e) {
       Assert.fail(e.toString());
     }
 
     // Verify the result
-    NewBestPossibleStateOutput bestPossibleStateOutput =
+    BestPossibleStateOutput bestPossibleStateOutput =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     Assert.assertNotNull(bestPossibleStateOutput);
     ResourceId resourceId = ResourceId.from("TestDB0");

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java b/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
index 74781cd..0a578c1 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
@@ -9,8 +9,8 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.api.rebalancer.FullAutoRebalancerContext;
+import org.apache.helix.api.rebalancer.SemiAutoRebalancerContext;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
index 5bbe54f..8650475 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
@@ -8,6 +8,9 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.rebalancer.CustomRebalancerContext;
+import org.apache.helix.api.rebalancer.RebalancerConfig;
+import org.apache.helix.api.rebalancer.RebalancerContext;
 import org.apache.helix.model.ResourceConfiguration;
 import org.testng.Assert;
 import org.testng.annotations.Test;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index ecb8151..6279087 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -39,10 +39,10 @@ import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.rebalancer.PartitionedRebalancerContext;
+import org.apache.helix.api.rebalancer.RebalancerContext;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
index 18e8f4d..cb60691 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
@@ -62,12 +62,12 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
 
-    NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
+    ReadClusterDataStage stage1 = new ReadClusterDataStage();
     runStage(event, stage1);
-    NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
+    BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
     runStage(event, stage2);
 
-    NewBestPossibleStateOutput output =
+    BestPossibleStateOutput output =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     for (int p = 0; p < 5; p++) {
       Map<ParticipantId, State> replicaMap =
@@ -100,12 +100,12 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
 
-    NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
+    ReadClusterDataStage stage1 = new ReadClusterDataStage();
     runStage(event, stage1);
-    NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
+    BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
     runStage(event, stage2);
 
-    NewBestPossibleStateOutput output =
+    BestPossibleStateOutput output =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     for (int p = 0; p < 5; p++) {
       Map<ParticipantId, State> replicaMap =

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
index d3f348e..d116182 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
@@ -52,12 +52,12 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest {
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
 
-    NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
+    ReadClusterDataStage stage1 = new ReadClusterDataStage();
     runStage(event, stage1);
-    NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
+    BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
     runStage(event, stage2);
 
-    NewBestPossibleStateOutput output =
+    BestPossibleStateOutput output =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     for (int p = 0; p < 5; p++) {
       Map<ParticipantId, State> replicaMap =

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
index fb113b9..9d1dd04 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
@@ -78,13 +78,13 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
           .put("minimum_supported_version.participant", minSupportedParticipantVersion);
     }
     event.addAttribute("helixmanager", manager);
-    runStage(event, new NewReadClusterDataStage());
+    runStage(event, new ReadClusterDataStage());
   }
 
   @Test
   public void testCompatible() {
     prepare("0.4.0", "0.4.0");
-    NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
+    CompatibilityCheckStage stage = new CompatibilityCheckStage();
     StageContext context = new StageContext();
     stage.init(context);
     stage.preProcess();
@@ -99,7 +99,7 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
   @Test
   public void testNullParticipantVersion() {
     prepare("0.4.0", null);
-    NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
+    CompatibilityCheckStage stage = new CompatibilityCheckStage();
     StageContext context = new StageContext();
     stage.init(context);
     stage.preProcess();
@@ -115,7 +115,7 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
   @Test
   public void testNullControllerVersion() {
     prepare(null, "0.4.0");
-    NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
+    CompatibilityCheckStage stage = new CompatibilityCheckStage();
     StageContext context = new StageContext();
     stage.init(context);
     stage.preProcess();
@@ -131,7 +131,7 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
   @Test
   public void testIncompatible() {
     prepare("0.6.1-incubating-SNAPSHOT", "0.3.4", "0.4");
-    NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
+    CompatibilityCheckStage stage = new CompatibilityCheckStage();
     StageContext context = new StageContext();
     stage.init(context);
     stage.preProcess();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index 3412e0a..65d551d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -48,8 +48,8 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
     List<IdealState> idealStates = setupIdealState(5, resources, 10, 1, RebalanceMode.SEMI_AUTO);
     Map<ResourceId, ResourceConfig> resourceMap = getResourceMap(idealStates);
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
-    NewCurrentStateComputationStage stage = new NewCurrentStateComputationStage();
-    runStage(event, new NewReadClusterDataStage());
+    CurrentStateComputationStage stage = new CurrentStateComputationStage();
+    runStage(event, new ReadClusterDataStage());
     runStage(event, stage);
     ResourceCurrentState output = event.getAttribute(AttributeName.CURRENT_STATE.toString());
     AssertJUnit.assertEquals(
@@ -69,8 +69,8 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
     setupLiveInstances(5);
 
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
-    NewCurrentStateComputationStage stage = new NewCurrentStateComputationStage();
-    runStage(event, new NewReadClusterDataStage());
+    CurrentStateComputationStage stage = new CurrentStateComputationStage();
+    runStage(event, new ReadClusterDataStage());
     runStage(event, stage);
     ResourceCurrentState output1 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
     AssertJUnit.assertEquals(
@@ -89,7 +89,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
     Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.message("localhost_" + 3, message.getId()), message);
 
-    runStage(event, new NewReadClusterDataStage());
+    runStage(event, new ReadClusterDataStage());
     runStage(event, stage);
     ResourceCurrentState output2 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
     State pendingState =
@@ -114,7 +114,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
     accessor.setProperty(
         keyBuilder.currentState("localhost_3", "session_dead", "testResourceName"),
         stateWithDeadSession);
-    runStage(event, new NewReadClusterDataStage());
+    runStage(event, new ReadClusterDataStage());
     runStage(event, stage);
     ResourceCurrentState output3 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
     State currentState =

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index 0bd8795..ba61361 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -78,7 +78,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     ClusterEvent event = new ClusterEvent("testEvent");
     event.addAttribute("helixmanager", manager);
 
-    NewMessageThrottleStage throttleStage = new NewMessageThrottleStage();
+    MessageThrottleStage throttleStage = new MessageThrottleStage();
     try {
       runStage(event, throttleStage);
       Assert.fail("Should throw exception since DATA_CACHE is null");
@@ -87,7 +87,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     }
 
     Pipeline dataRefresh = new Pipeline();
-    dataRefresh.addStage(new NewReadClusterDataStage());
+    dataRefresh.addStage(new ReadClusterDataStage());
     runPipeline(event, dataRefresh);
 
     try {
@@ -96,7 +96,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     } catch (Exception e) {
       // OK
     }
-    runStage(event, new NewResourceComputationStage());
+    runStage(event, new ResourceComputationStage());
 
     try {
       runStage(event, throttleStage);
@@ -104,7 +104,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     } catch (Exception e) {
       // OK
     }
-    NewMessageOutput msgSelectOutput = new NewMessageOutput();
+    MessageOutput msgSelectOutput = new MessageOutput();
     List<Message> selectMessages = new ArrayList<Message>();
     Message msg =
         createMessage(MessageType.STATE_TRANSITION, MessageId.from("msgId-001"), "OFFLINE",
@@ -117,7 +117,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
 
     runStage(event, throttleStage);
 
-    NewMessageOutput msgThrottleOutput =
+    MessageOutput msgThrottleOutput =
         event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
     Assert.assertEquals(
         msgThrottleOutput.getMessages(ResourceId.from("TestDB"), PartitionId.from("TestDB_0"))
@@ -221,7 +221,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     ClusterConstraints constraint =
         accessor.getProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()));
 
-    NewMessageThrottleStage throttleStage = new NewMessageThrottleStage();
+    MessageThrottleStage throttleStage = new MessageThrottleStage();
 
     // test constraintSelection
     // message1: hit contraintSelection rule1 and rule2
@@ -271,10 +271,10 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     event.addAttribute("helixmanager", manager);
 
     Pipeline dataRefresh = new Pipeline();
-    dataRefresh.addStage(new NewReadClusterDataStage());
+    dataRefresh.addStage(new ReadClusterDataStage());
     runPipeline(event, dataRefresh);
-    runStage(event, new NewResourceComputationStage());
-    NewMessageOutput msgSelectOutput = new NewMessageOutput();
+    runStage(event, new ResourceComputationStage());
+    MessageOutput msgSelectOutput = new MessageOutput();
 
     Message msg3 =
         createMessage(MessageType.STATE_TRANSITION, MessageId.from("msgId-003"), "OFFLINE",
@@ -306,7 +306,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
 
     runStage(event, throttleStage);
 
-    NewMessageOutput msgThrottleOutput =
+    MessageOutput msgThrottleOutput =
         event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
     List<Message> throttleMessages =
         msgThrottleOutput.getMessages(ResourceId.from("TestDB"), PartitionId.from("TestDB_0"));