You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/09/05 22:11:52 UTC

[1/2] helix rebalancer refactor using logical models

Updated Branches:
  refs/heads/helix-logical-model 9c7de4c33 -> 5d0e048e1


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
index 4038c69..9745c64 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
@@ -27,10 +27,14 @@ import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.IdealState;
@@ -75,15 +79,22 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
     public int getUpperBound() {
       return upper;
     }
+
+    @Override
+    public String toString() {
+      return String.format("%d-%d", lower, upper);
+    }
   }
 
   @Override
   public void process(ClusterEvent event) throws Exception {
     Cluster cluster = event.getAttribute("ClusterDataCache");
+    Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
+        event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
     Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
     NewCurrentStateOutput currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    MessageGenerationOutput messageGenOutput =
+    NewMessageOutput messageGenOutput =
         event.getAttribute(AttributeName.MESSAGES_ALL.toString());
     if (cluster == null || resourceMap == null || currentStateOutput == null
         || messageGenOutput == null) {
@@ -91,29 +102,28 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
           + ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
     }
 
-    MessageSelectionStageOutput output = new MessageSelectionStageOutput();
+    NewMessageOutput output = new NewMessageOutput();
 
     for (ResourceId resourceId : resourceMap.keySet()) {
       Resource resource = resourceMap.get(resourceId);
-      // TODO fix it
-      StateModelDefinition stateModelDef = null;
-      // cache.getStateModelDef(resource.getStateModelDefRef());
+      StateModelDefinition stateModelDef =
+          stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
 
+      // TODO have a logical model for transition
       Map<String, Integer> stateTransitionPriorities = getStateTransitionPriorityMap(stateModelDef);
-      // IdealState idealState = cache.getIdealState(resourceName);
-      Map<String, Bounds> stateConstraints =
+      Map<State, Bounds> stateConstraints =
           computeStateConstraints(stateModelDef, resource.getRebalancerConfig(), cluster);
 
       // TODO fix it
-      // for (Partition partition : resource.getPartitions()) {
-      // List<Message> messages = messageGenOutput.getMessages(resourceId.stringify(), partition);
-      // List<Message> selectedMessages =
-      // selectMessages(cache.getLiveInstances(),
-      // currentStateOutput.getCurrentStateMap(resourceName, partition),
-      // currentStateOutput.getPendingStateMap(resourceName, partition), messages,
-      // stateConstraints, stateTransitionPriorities, stateModelDef.getInitialStateString());
-      // output.addMessages(resourceId.stringify(), partition, selectedMessages);
-      // }
+      for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+        List<Message> messages = messageGenOutput.getMessages(resourceId, partitionId);
+        List<Message> selectedMessages =
+            selectMessages(cluster.getLiveParticipantMap(),
+            currentStateOutput.getCurrentStateMap(resourceId, partitionId),
+            currentStateOutput.getPendingStateMap(resourceId, partitionId), messages,
+                stateConstraints, stateTransitionPriorities, stateModelDef.getInitialState());
+        output.setMessages(resourceId, partitionId, selectedMessages);
+      }
     }
     event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
   }
@@ -137,22 +147,22 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
    *          : FROME_STATE-TO_STATE -> priority
    * @return: selected messages
    */
-  List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
-      Map<String, String> currentStates, Map<String, String> pendingStates, List<Message> messages,
-      Map<String, Bounds> stateConstraints, final Map<String, Integer> stateTransitionPriorities,
-      String initialState) {
+  List<Message> selectMessages(Map<ParticipantId, Participant> liveParticipants,
+      Map<ParticipantId, State> currentStates, Map<ParticipantId, State> pendingStates,
+      List<Message> messages, Map<State, Bounds> stateConstraints,
+      final Map<String, Integer> stateTransitionPriorities, State initialState) {
     if (messages == null || messages.isEmpty()) {
       return Collections.emptyList();
     }
 
     List<Message> selectedMessages = new ArrayList<Message>();
-    Map<String, Bounds> bounds = new HashMap<String, Bounds>();
+    Map<State, Bounds> bounds = new HashMap<State, Bounds>();
 
     // count currentState, if no currentState, count as in initialState
-    for (String instance : liveInstances.keySet()) {
-      String state = initialState;
-      if (currentStates.containsKey(instance)) {
-        state = currentStates.get(instance);
+    for (ParticipantId liveParticipantId : liveParticipants.keySet()) {
+      State state = initialState;
+      if (currentStates.containsKey(liveParticipantId)) {
+        state = currentStates.get(liveParticipantId);
       }
 
       if (!bounds.containsKey(state)) {
@@ -163,8 +173,8 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
     }
 
     // count pendingStates
-    for (String instance : pendingStates.keySet()) {
-      String state = pendingStates.get(instance);
+    for (ParticipantId participantId : pendingStates.keySet()) {
+      State state = pendingStates.get(participantId);
       if (!bounds.containsKey(state)) {
         bounds.put(state, new Bounds(0, 0));
       }
@@ -178,7 +188,7 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
     for (Message message : messages) {
       State fromState = message.getFromState();
       State toState = message.getToState();
-      String transition = fromState + "-" + toState;
+      String transition = fromState.toString() + "-" + toState.toString();
       int priority = Integer.MAX_VALUE;
 
       if (stateTransitionPriorities.containsKey(transition)) {
@@ -203,7 +213,7 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
         }
 
         if (!bounds.containsKey(toState)) {
-          bounds.put(toState.toString(), new Bounds(0, 0));
+          bounds.put(toState, new Bounds(0, 0));
         }
 
         // check lower bound of fromState
@@ -243,13 +253,13 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
    * beginning and compute the stateConstraint instance once and re use at other places.
    * Each IdealState must have a constraint object associated with it
    */
-  private Map<String, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
+  private Map<State, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
       RebalancerConfig rebalancerConfig, Cluster cluster) {
-    Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
+    Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
 
-    List<String> statePriorityList = stateModelDefinition.getStatesPriorityStringList();
-    for (String state : statePriorityList) {
-      String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state);
+    List<State> statePriorityList = stateModelDefinition.getStatesPriorityList();
+    for (State state : statePriorityList) {
+      String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state.toString());
       int max = -1;
       if ("N".equals(numInstancesPerState)) {
         max = cluster.getLiveParticipantMap().size();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
index e45cd38..5bea5b4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
@@ -27,6 +27,9 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -115,7 +118,7 @@ public class NewMessageThrottleStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
     Cluster cluster = event.getAttribute("ClusterDataCache");
-    MessageSelectionStageOutput msgSelectionOutput =
+    NewMessageOutput msgSelectionOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
     Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
 
@@ -124,34 +127,33 @@ public class NewMessageThrottleStage extends AbstractBaseStage {
           + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
     }
 
-    MessageThrottleStageOutput output = new MessageThrottleStageOutput();
+    NewMessageOutput output = new NewMessageOutput();
 
     // TODO fix it
-    ClusterConstraints constraint = null;
-    // cache.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
+    ClusterConstraints constraint = cluster.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
     Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
 
-    // TODO fix it
-    // if (constraint != null) {
-    // // go through all pending messages, they should be counted but not throttled
-    // for (String instance : cache.getLiveInstances().keySet()) {
-    // throttle(throttleCounterMap, constraint, new ArrayList<Message>(cache.getMessages(instance)
-    // .values()), false);
-    // }
-    // }
+    if (constraint != null) {
+      // go through all pending messages, they should be counted but not throttled
+      for (ParticipantId participantId : cluster.getLiveParticipantMap().keySet()) {
+        Participant liveParticipant = cluster.getLiveParticipantMap().get(participantId);
+        throttle(throttleCounterMap, constraint, new ArrayList<Message>(liveParticipant
+            .getMessageMap().values()), false);
+      }
+    }
 
     // go through all new messages, throttle if necessary
     // assume messages should be sorted by state transition priority in messageSelection stage
     for (ResourceId resourceId : resourceMap.keySet()) {
       Resource resource = resourceMap.get(resourceId);
       // TODO fix it
-      // for (Partition partition : resource.getPartitions()) {
-      // List<Message> messages = msgSelectionOutput.getMessages(resourceName, partition);
-      // if (constraint != null && messages != null && messages.size() > 0) {
-      // messages = throttle(throttleCounterMap, constraint, messages, true);
-      // }
-      // output.addMessages(resourceName, partition, messages);
-      // }
+      for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+        List<Message> messages = msgSelectionOutput.getMessages(resourceId, partitionId);
+        if (constraint != null && messages != null && messages.size() > 0) {
+          messages = throttle(throttleCounterMap, constraint, messages, true);
+        }
+        output.setMessages(resourceId, partitionId, messages);
+      }
     }
 
     event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
new file mode 100644
index 0000000..ed487a1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
@@ -0,0 +1,85 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.ClusterAccessor;
+import org.apache.helix.api.ClusterId;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.StateModelDefinitionAccessor;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.log4j.Logger;
+
+public class NewReadClusterDataStage extends AbstractBaseStage {
+  private static final Logger LOG = Logger.getLogger(NewReadClusterDataStage.class.getName());
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    long startTime = System.currentTimeMillis();
+    LOG.info("START ReadClusterDataStage.process()");
+
+    HelixManager manager = event.getAttribute("helixmanager");
+    if (manager == null) {
+      throw new StageException("HelixManager attribute value is null");
+    }
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    ClusterId clusterId = Id.cluster(manager.getClusterName());
+    ClusterAccessor clusterAccessor = new ClusterAccessor(clusterId, accessor);
+    StateModelDefinitionAccessor stateModelDefAccessor =
+        new StateModelDefinitionAccessor(clusterId, accessor);
+
+    Cluster cluster = clusterAccessor.readCluster();
+    Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
+        stateModelDefAccessor.readStateModelDefinitions();
+
+    ClusterStatusMonitor clusterStatusMonitor =
+        (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+    if (clusterStatusMonitor != null) {
+      // TODO fix it
+      // int disabledInstances = 0;
+      // int disabledPartitions = 0;
+      // for (InstanceConfig config : _cache._instanceConfigMap.values()) {
+      // if (config.getInstanceEnabled() == false) {
+      // disabledInstances++;
+      // }
+      // if (config.getDisabledPartitions() != null) {
+      // disabledPartitions += config.getDisabledPartitions().size();
+      // }
+      // }
+      // clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(),
+      // _cache._instanceConfigMap.size(), disabledInstances, disabledPartitions);
+    }
+
+    event.addAttribute("ClusterDataCache", cluster);
+    event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefMap);
+
+    long endTime = System.currentTimeMillis();
+    LOG.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java
deleted file mode 100644
index 3359b50..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.ResourceId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.rebalancer.NewRebalancer;
-import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.util.HelixUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Check and invoke custom implementation idealstate rebalancers.<br/>
- * If the resourceConfig has specified className of the customized rebalancer, <br/>
- * the rebalancer will be invoked to re-write the idealstate of the resource<br/>
- */
-public class NewRebalanceIdealStateStage extends AbstractBaseStage {
-  private static final Logger LOG = Logger.getLogger(NewRebalanceIdealStateStage.class.getName());
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    Cluster cluster = event.getAttribute("ClusterDataCache");
-    NewCurrentStateOutput currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
-
-    // Map<String, IdealState> updatedIdealStates = new HashMap<String, IdealState>();
-    for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
-      // IdealState currentIdealState = idealStateMap.get(resourceName);
-      Resource resource = cluster.getResource(resourceId);
-      RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
-      if (rebalancerConfig.getRebalancerMode() == RebalanceMode.USER_DEFINED
-          && rebalancerConfig.getRebalancerClassName() != null) {
-        String rebalancerClassName = rebalancerConfig.getRebalancerClassName();
-        LOG.info("resource " + resourceId + " use idealStateRebalancer " + rebalancerClassName);
-        try {
-          NewRebalancer balancer =
-              (NewRebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
-
-          // TODO add state model def
-          ResourceAssignment resourceAssignment =
-              balancer.computeResourceMapping(resource, cluster, null);
-
-          // TODO impl this
-          // currentIdealState.updateFromAssignment(resourceAssignment);
-          // updatedIdealStates.put(resourceName, currentIdealState);
-        } catch (Exception e) {
-          LOG.error("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
-        }
-      }
-    }
-
-    // TODO
-    // if (updatedIdealStates.size() > 0) {
-      // cache.getIdealStates().putAll(updatedIdealStates);
-    // }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
index b8c1ecf..af23eb2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -21,7 +21,6 @@ package org.apache.helix.controller.stages;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
@@ -67,7 +66,7 @@ public class NewResourceComputationStage extends AbstractBaseStage {
 
     // include all partitions from CurrentState as well since idealState might be removed
     for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
-      for ( ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
+      for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
         CurrentState currentState = liveParticipant.getCurrentStateMap().get(resourceId);
 
         if (currentState.getStateModelDefRef() == null) {
@@ -80,13 +79,15 @@ public class NewResourceComputationStage extends AbstractBaseStage {
 
         // don't overwrite ideal state configs
         if (!resourceBuilderMap.containsKey(resourceId)) {
-          RebalancerConfig.Builder rebalancerConfigBuilder = new RebalancerConfig.Builder();
+          RebalancerConfig.Builder rebalancerConfigBuilder =
+              new RebalancerConfig.Builder(resourceId);
           rebalancerConfigBuilder.stateModelDef(currentState.getStateModelDefId());
-          rebalancerConfigBuilder.stateModelFactoryId(new StateModelFactoryId(currentState.getStateModelFactoryName()));
+          rebalancerConfigBuilder.stateModelFactoryId(new StateModelFactoryId(currentState
+              .getStateModelFactoryName()));
           rebalancerConfigBuilder.bucketSize(currentState.getBucketSize());
           rebalancerConfigBuilder.batchMessageMode(currentState.getBatchMessageMode());
 
-          org.apache.helix.api.Resource.Builder resourceBuilder = new org.apache.helix.api.Resource.Builder(resourceId);
+          Resource.Builder resourceBuilder = new Resource.Builder(resourceId);
           resourceBuilder.rebalancerConfig(rebalancerConfigBuilder.build());
           resourceBuilderMap.put(resourceId, resourceBuilder);
         }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
index 95862ae..2b8a0c8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
@@ -31,8 +31,10 @@ import org.apache.helix.HelixManagerProperties;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Id;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -52,7 +54,7 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
 
     HelixManager manager = event.getAttribute("helixmanager");
     Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-    MessageThrottleStageOutput messageOutput =
+    NewMessageOutput messageOutput =
         event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
     Cluster cluster = event.getAttribute("ClusterDataCache");
     Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
@@ -67,11 +69,10 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
     List<Message> messagesToSend = new ArrayList<Message>();
     for (ResourceId resourceId : resourceMap.keySet()) {
       Resource resource = resourceMap.get(resourceId);
-      // TODO fix it
-      // for (Partition partition : resource.getPartitions()) {
-      // List<Message> messages = messageOutput.getMessages(resourceName, partition);
-      // messagesToSend.addAll(messages);
-      // }
+      for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+        List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
+        messagesToSend.addAll(messages);
+      }
     }
 
     List<Message> outputMessages =
@@ -95,9 +96,9 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
     while (iter.hasNext()) {
       Message message = iter.next();
       ResourceId resourceId = message.getResourceId();
-      Resource resource = resourceMap.get(resourceId.stringify());
+      Resource resource = resourceMap.get(resourceId);
 
-      String participantId = message.getTgtName();
+      ParticipantId participantId = Id.participant(message.getTgtName());
       Participant liveParticipant = liveParticipantMap.get(participantId);
       String participantVersion = null;
       if (liveParticipant != null) {
@@ -141,10 +142,10 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
           + " transit " + message.getPartitionId() + "|" + message.getPartitionIds() + " from:"
           + message.getFromState() + " to:" + message.getToState());
 
-      // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to " +
-      // message.getTgtName()
-      // + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
-      // + " from: " + message.getFromState() + " to: " + message.getToState());
+      // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to "
+      // + message.getTgtName() + " transit " + message.getPartitionId() + "|"
+      // + message.getPartitionId() + " from: " + message.getFromState() + " to: "
+      // + message.getToState());
 
       keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
index f16bb39..b6facea 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
@@ -29,6 +29,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 
@@ -66,7 +67,9 @@ public class RebalanceIdealStateStage extends AbstractBaseStage {
           ResourceAssignment resourceAssignment =
               balancer.computeResourceMapping(resource, currentIdealState, currentStateOutput,
                   cache);
-          currentIdealState.updateFromAssignment(resourceAssignment);
+          StateModelDefinition stateModelDef =
+              cache.getStateModelDef(currentIdealState.getStateModelDefRef());
+          currentIdealState.updateFromAssignment(resourceAssignment, stateModelDef);
           updatedIdealStates.put(resourceName, currentIdealState);
         } catch (Exception e) {
           LOG.error("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index 087d2fb..835af6e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -410,6 +410,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
       switch (type) {
       case EXTERNALVIEW:
         if (value.getBucketSize() == 0) {
+          System.out.println("set: " + value.getRecord());
           records.add(value.getRecord());
         } else {
           _baseDataAccessor.remove(path, options);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 0f690db..7fb641f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -712,6 +712,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     for (int i = 0; i < paths.size(); i++) {
       success[i] = (results.get(i)._retCode == RetCode.OK);
     }
+
+    for (int i = 0; i < paths.size(); i++) {
+      String path = paths.get(i);
+      T record = records.get(i);
+      if (path.indexOf("EXTERNALVIEW") != -1) {
+        System.out.println("path: " + path + ", record: " + record + ", success: " + success[i]);
+      }
+    }
     return success;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index ffff483..24ec7c9 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -29,6 +29,7 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 
 import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Id;
@@ -38,14 +39,18 @@ import org.apache.helix.api.RebalancerRef;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.State;
 import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.StateModelFactoryId;
 import org.apache.helix.controller.rebalancer.Rebalancer;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Function;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
 
 /**
  * The ideal states of all partitions in a resource
@@ -459,6 +464,14 @@ public class IdealState extends HelixProperty {
   }
 
   /**
+   * Set the state model associated with this resource
+   * @param stateModel state model identifier
+   */
+  public void setStateModelDefId(StateModelDefId stateModelDefId) {
+    setStateModelDefRef(stateModelDefId.stringify());
+  }
+
+  /**
    * Set the number of partitions of this resource
    * @param numPartitions the number of partitions
    */
@@ -540,6 +553,14 @@ public class IdealState extends HelixProperty {
   }
 
   /**
+   * Set the state model factory associated with this resource
+   * @param name state model factory id
+   */
+  public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+    setStateModelFactoryName(stateModelFactoryId.stringify());
+  }
+
+  /**
    * Get the state model factory associated with this resource
    * @return state model factory name
    */
@@ -549,6 +570,14 @@ public class IdealState extends HelixProperty {
   }
 
   /**
+   * Get the state model factory associated with this resource
+   * @return state model factory id
+   */
+  public StateModelFactoryId getStateModelFactoryId() {
+    return Id.stateModelFactory(getStateModelFactoryName());
+  }
+
+  /**
    * Set the frequency with which to rebalance
    * @return the rebalancing timer period
    */
@@ -613,13 +642,39 @@ public class IdealState extends HelixProperty {
     return _record.getSimpleField(IdealStateProperty.INSTANCE_GROUP_TAG.toString());
   }
 
-  public void updateFromAssignment(ResourceAssignment assignment) {
+  /**
+   * Update the ideal state from a ResourceAssignment computed during a rebalance
+   * @param assignment the new resource assignment
+   * @param stateModelDef state model of the resource
+   */
+  public void updateFromAssignment(ResourceAssignment assignment, StateModelDefinition stateModelDef) {
+    // clear all preference lists and maps
     _record.getMapFields().clear();
     _record.getListFields().clear();
+
+    // assign a partition at a time
     for (PartitionId partition : assignment.getMappedPartitions()) {
+      List<ParticipantId> preferenceList = new ArrayList<ParticipantId>();
+      Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
+
+      // invert the map to get in state order
       Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partition);
-      setParticipantStateMap(partition, replicaMap);
-      setPreferenceList(partition, new ArrayList<ParticipantId>(replicaMap.keySet()));
+      ListMultimap<State, ParticipantId> inverseMap = ArrayListMultimap.create();
+      Multimaps.invertFrom(Multimaps.forMap(replicaMap), inverseMap);
+
+      // update the ideal state in order of state priorities
+      for (State state : stateModelDef.getStatesPriorityList()) {
+        if (!state.equals(State.from(HelixDefinedState.DROPPED))
+            && !state.equals(State.from(HelixDefinedState.ERROR))) {
+          List<ParticipantId> stateParticipants = inverseMap.get(state);
+          for (ParticipantId participant : stateParticipants) {
+            preferenceList.add(participant);
+            participantStateMap.put(participant, state);
+          }
+        }
+      }
+      setPreferenceList(partition, preferenceList);
+      setParticipantStateMap(partition, participantStateMap);
     }
   }
 
@@ -674,12 +729,13 @@ public class IdealState extends HelixProperty {
     if (rawPreferenceList == null) {
       return Collections.emptyList();
     }
-    return Lists.transform(rawPreferenceList, new Function<String, ParticipantId>() {
-      @Override
-      public ParticipantId apply(String participantName) {
-        return Id.participant(participantName);
-      }
-    });
+    return Lists.transform(new ArrayList<String>(rawPreferenceList),
+        new Function<String, ParticipantId>() {
+          @Override
+          public ParticipantId apply(String participantName) {
+            return Id.participant(participantName);
+          }
+        });
   }
 
   /**
@@ -710,12 +766,13 @@ public class IdealState extends HelixProperty {
     if (preferenceList == null) {
       return Collections.emptyList();
     }
-    return Lists.transform(preferenceList, new Function<ParticipantId, String>() {
-      @Override
-      public String apply(ParticipantId participantId) {
-        return participantId.stringify();
-      }
-    });
+    return Lists.transform(new ArrayList<ParticipantId>(preferenceList),
+        new Function<ParticipantId, String>() {
+          @Override
+          public String apply(ParticipantId participantId) {
+            return participantId.stringify();
+          }
+        });
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/test/java/org/apache/helix/api/TestId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestId.java b/helix-core/src/test/java/org/apache/helix/api/TestId.java
index 05da8a3..57c01e7 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestId.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestId.java
@@ -41,6 +41,7 @@ public class TestId {
     final String sessionName = "Session";
     final String processName = "Process";
     final String stateModelName = "StateModel";
+    final String stateModelFactoryName = "StateModelFactory";
     final String messageName = "Message";
     Assert.assertEquals(Id.resource(resourceName).stringify(), resourceName);
     Assert.assertEquals(Id.cluster(clusterName).stringify(), clusterName);
@@ -48,6 +49,8 @@ public class TestId {
     Assert.assertEquals(Id.session(sessionName).stringify(), sessionName);
     Assert.assertEquals(Id.process(processName).stringify(), processName);
     Assert.assertEquals(Id.stateModelDef(stateModelName).stringify(), stateModelName);
+    Assert.assertEquals(Id.stateModelFactory(stateModelFactoryName).stringify(),
+        stateModelFactoryName);
     Assert.assertEquals(Id.message(messageName).stringify(), messageName);
   }
 
@@ -72,6 +75,7 @@ public class TestId {
     Assert.assertNull(Id.session(null));
     Assert.assertNull(Id.process(null));
     Assert.assertNull(Id.stateModelDef(null));
+    Assert.assertNull(Id.stateModelFactory(null));
     Assert.assertNull(Id.message(null));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index 98ae60b..cc26596 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -20,20 +20,33 @@ package org.apache.helix.api;
  */
 
 import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.controller.rebalancer.NewAutoRebalancer;
+import org.apache.helix.controller.rebalancer.NewCustomRebalancer;
+import org.apache.helix.controller.rebalancer.NewSemiAutoRebalancer;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.NewBestPossibleStateOutput;
+import org.apache.helix.controller.stages.NewCurrentStateOutput;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.mock.controller.ClusterController;
 import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.StateModelConfigGenerator;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -42,6 +55,7 @@ import org.testng.annotations.Test;
 public class TestNewStages extends ZkUnitTestBase {
   final int n = 2;
   final int p = 8;
+  final int r = 2;
   MockParticipant[] _participants = new MockParticipant[n];
   ClusterController _controller;
 
@@ -88,6 +102,146 @@ public class TestNewStages extends ZkUnitTestBase {
     System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
   }
 
+  @Test
+  public void testBasicBestPossibleStateCalcStage() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+    // Set up the event
+    ClusterAccessor clusterAccessor = new ClusterAccessor(_clusterId, _dataAccessor);
+    Cluster cluster = clusterAccessor.readCluster();
+    ClusterEvent event = new ClusterEvent(testName);
+    event.addAttribute(AttributeName.CURRENT_STATE.toString(), new NewCurrentStateOutput());
+    event.addAttribute(AttributeName.RESOURCES.toString(), cluster.getResourceMap());
+    event.addAttribute("ClusterDataCache", cluster);
+    Map<StateModelDefId, StateModelDefinition> stateModelMap =
+        new HashMap<StateModelDefId, StateModelDefinition>();
+    stateModelMap.put(Id.stateModelDef("MasterSlave"), new StateModelDefinition(
+        StateModelConfigGenerator.generateConfigForMasterSlave()));
+    event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelMap);
+
+    // Run the stage
+    try {
+      new NewBestPossibleStateCalcStage().process(event);
+    } catch (Exception e) {
+      Assert.fail(e.toString());
+    }
+
+    // Verify the result
+    NewBestPossibleStateOutput bestPossibleStateOutput =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+    Assert.assertNotNull(bestPossibleStateOutput);
+    ResourceId resourceId = new ResourceId("TestDB0");
+    ResourceAssignment assignment = bestPossibleStateOutput.getResourceAssignment(resourceId);
+    Assert.assertNotNull(assignment);
+    Resource resource = cluster.getResource(resourceId);
+    verifySemiAutoRebalance(resource, assignment);
+
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testClusterRebalancers() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+    ClusterAccessor clusterAccessor = new ClusterAccessor(_clusterId, _dataAccessor);
+    Cluster cluster = clusterAccessor.readCluster();
+
+    ResourceId resourceId = new ResourceId("TestDB0");
+    Resource resource = cluster.getResource(resourceId);
+    StateModelDefinition masterSlave =
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+    NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
+    ResourceAssignment fullAutoResult =
+        new NewAutoRebalancer().computeResourceMapping(resource, cluster, masterSlave,
+            currentStateOutput);
+    verifyFullAutoRebalance(resource, fullAutoResult);
+    ResourceAssignment semiAutoResult =
+        new NewSemiAutoRebalancer().computeResourceMapping(resource, cluster, masterSlave,
+            currentStateOutput);
+    verifySemiAutoRebalance(resource, semiAutoResult);
+    ResourceAssignment customResult =
+        new NewCustomRebalancer().computeResourceMapping(resource, cluster, masterSlave,
+            currentStateOutput);
+    verifyCustomRebalance(resource, customResult);
+
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  /**
+   * Check that a full auto rebalance is run, and at least one replica per partition is mapped
+   * @param resource the resource to verify
+   * @param assignment the assignment to verify
+   */
+  private void verifyFullAutoRebalance(Resource resource, ResourceAssignment assignment) {
+    Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getPartitionSet().size());
+    for (PartitionId partitionId : assignment.getMappedPartitions()) {
+      Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
+      Assert.assertTrue(replicaMap.size() <= r);
+      Assert.assertTrue(replicaMap.size() > 0);
+      boolean hasMaster = false;
+      for (State state : replicaMap.values()) {
+        if (state.equals(State.from("MASTER"))) {
+          Assert.assertFalse(hasMaster);
+          hasMaster = true;
+        }
+      }
+      Assert.assertTrue(hasMaster);
+    }
+  }
+
+  /**
+   * Check that a semi auto rebalance is run, and all partitions are mapped by preference list
+   * @param resource the resource to verify
+   * @param assignment the assignment to verify
+   */
+  private void verifySemiAutoRebalance(Resource resource, ResourceAssignment assignment) {
+    Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getPartitionSet().size());
+    RebalancerConfig config = resource.getRebalancerConfig();
+    for (PartitionId partitionId : assignment.getMappedPartitions()) {
+      List<ParticipantId> preferenceList = config.getPreferenceList(partitionId);
+      Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
+      Assert.assertEquals(replicaMap.size(), preferenceList.size());
+      Assert.assertEquals(replicaMap.size(), r);
+      boolean hasMaster = false;
+      for (ParticipantId participant : preferenceList) {
+        Assert.assertTrue(replicaMap.containsKey(participant));
+        State state = replicaMap.get(participant);
+        if (state.equals(State.from("MASTER"))) {
+          Assert.assertFalse(hasMaster);
+          hasMaster = true;
+        }
+      }
+      Assert.assertEquals(replicaMap.get(preferenceList.get(0)), State.from("MASTER"));
+    }
+  }
+
+  /**
+   * For vanilla customized rebalancing, the resource assignment should match the preference map
+   * @param resource the resource to verify
+   * @param assignment the assignment to verify
+   */
+  private void verifyCustomRebalance(Resource resource, ResourceAssignment assignment) {
+    Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getPartitionSet().size());
+    RebalancerConfig config = resource.getRebalancerConfig();
+    for (PartitionId partitionId : assignment.getMappedPartitions()) {
+      Map<ParticipantId, State> preferenceMap = config.getPreferenceMap(partitionId);
+      Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
+      Assert.assertEquals(replicaMap.size(), preferenceMap.size());
+      Assert.assertEquals(replicaMap.size(), r);
+      for (ParticipantId participant : preferenceMap.keySet()) {
+        Assert.assertTrue(replicaMap.containsKey(participant));
+        Assert.assertEquals(replicaMap.get(participant), preferenceMap.get(participant));
+      }
+    }
+  }
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -106,7 +260,7 @@ public class TestNewStages extends ZkUnitTestBase {
         1, // resources
         p, // partitions per resource
         n, // number of nodes
-        2, // replicas
+        r, // replicas
         "MasterSlave", true); // do rebalance
 
     _controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);


[2/2] git commit: helix rebalancer refactor using logical models

Posted by zz...@apache.org.
helix rebalancer refactor using logical models


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/5d0e048e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/5d0e048e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/5d0e048e

Branch: refs/heads/helix-logical-model
Commit: 5d0e048e134a6f21200a67618fdbe852ca1d7592
Parents: 9c7de4c
Author: zzhang <zz...@apache.org>
Authored: Thu Sep 5 13:11:36 2013 -0700
Committer: zzhang <zz...@apache.org>
Committed: Thu Sep 5 13:11:36 2013 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/api/Cluster.java |  23 +-
 .../org/apache/helix/api/ClusterAccessor.java   |  20 +-
 .../apache/helix/api/ParticipantAccessor.java   |   8 +-
 .../org/apache/helix/api/RebalancerConfig.java  | 117 +++++++---
 .../org/apache/helix/api/RebalancerRef.java     |  20 +-
 .../java/org/apache/helix/api/Resource.java     |  70 +++---
 .../apache/helix/api/SchedulerTaskConfig.java   |  47 ++++
 .../controller/GenericHelixController.java      |  31 ++-
 .../rebalancer/NewAutoRebalancer.java           |  35 ++-
 .../rebalancer/NewCustomRebalancer.java         |  13 +-
 .../controller/rebalancer/NewRebalancer.java    |   4 +-
 .../rebalancer/NewSemiAutoRebalancer.java       |   5 +-
 .../util/NewConstraintBasedAssignment.java      |  13 +-
 .../helix/controller/stages/AttributeName.java  |   3 +-
 .../stages/NewBestPossibleStateCalcStage.java   | 104 +++++----
 .../stages/NewBestPossibleStateOutput.java      |  19 +-
 .../stages/NewCompatibilityCheckStage.java      |  68 ++++++
 .../stages/NewCurrentStateOutput.java           |  19 +-
 .../stages/NewMessageGenerationPhase.java       | 233 -------------------
 .../stages/NewMessageGenerationStage.java       | 211 +++++++++++++++++
 .../controller/stages/NewMessageOutput.java     |  75 ++++++
 .../stages/NewMessageSelectionStage.java        |  78 ++++---
 .../stages/NewMessageThrottleStage.java         |  40 ++--
 .../stages/NewReadClusterDataStage.java         |  85 +++++++
 .../stages/NewRebalanceIdealStateStage.java     |  84 -------
 .../stages/NewResourceComputationStage.java     |  11 +-
 .../stages/NewTaskAssignmentStage.java          |  25 +-
 .../stages/RebalanceIdealStateStage.java        |   5 +-
 .../helix/manager/zk/ZKHelixDataAccessor.java   |   1 +
 .../helix/manager/zk/ZkBaseDataAccessor.java    |   8 +
 .../java/org/apache/helix/model/IdealState.java |  87 +++++--
 .../test/java/org/apache/helix/api/TestId.java  |   4 +
 .../org/apache/helix/api/TestNewStages.java     | 156 ++++++++++++-
 33 files changed, 1158 insertions(+), 564 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index 193b238..e890fb4 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -22,6 +22,9 @@ package org.apache.helix.api;
 import java.util.Collections;
 import java.util.Map;
 
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+
 import com.google.common.collect.ImmutableMap;
 
 /**
@@ -59,6 +62,8 @@ public class Cluster {
 
   private final ClusterConfig _config = null;
 
+  private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+
   /**
    * construct a cluster
    * @param id
@@ -69,7 +74,7 @@ public class Cluster {
    */
   public Cluster(ClusterId id, Map<ResourceId, Resource> resourceMap,
       Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
-      ControllerId leaderId) {
+      ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap) {
 
     _id = id;
 
@@ -89,6 +94,8 @@ public class Cluster {
 
     _leaderId = leaderId;
 
+    _constraintMap = ImmutableMap.copyOf(constraintMap);
+
     // TODO impl this when we persist controllers and spectators on zookeeper
     _controllerMap = ImmutableMap.copyOf(controllerMap);
     _spectatorMap = Collections.emptyMap();
@@ -159,4 +166,18 @@ public class Cluster {
     return _spectatorMap;
   }
 
+  /**
+   * @return
+   */
+  public Map<ConstraintType, ClusterConstraints> getConstraintMap() {
+    return _constraintMap;
+  }
+
+  /**
+   * @param type
+   * @return
+   */
+  public ClusterConstraints getConstraint(ConstraintType type) {
+    return _constraintMap.get(type);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
index 5902a24..04d5831 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
@@ -27,6 +27,8 @@ import java.util.Map;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
@@ -139,12 +141,18 @@ public class ClusterAccessor {
 
     LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
 
+    /**
+     * map of constraint-type to constraints
+     */
+    Map<String, ClusterConstraints> constraintMap =
+        _accessor.getChildValuesMap(_keyBuilder.constraints());
+
     Map<ResourceId, Resource> resourceMap = new HashMap<ResourceId, Resource>();
     for (String resourceName : idealStateMap.keySet()) {
       IdealState idealState = idealStateMap.get(resourceName);
 
       // TODO pass resource assignment
-      ResourceId resourceId = new ResourceId(resourceName);
+      ResourceId resourceId = Id.resource(resourceName);
       resourceMap.put(resourceId, new Resource(resourceId, idealState, null));
     }
 
@@ -167,7 +175,15 @@ public class ClusterAccessor {
       controllerMap.put(leaderId, new Controller(leaderId, leader, true));
     }
 
-    return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId);
+    Map<ConstraintType, ClusterConstraints> clusterConstraintMap =
+        new HashMap<ConstraintType, ClusterConstraints>();
+    for (String constraintType : constraintMap.keySet()) {
+      clusterConstraintMap.put(ConstraintType.valueOf(constraintType),
+          constraintMap.get(constraintType));
+    }
+
+    return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
+        clusterConstraintMap);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
index d2ae927..da2c433 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
@@ -307,9 +307,11 @@ public class ParticipantAccessor {
     }
 
     Map<MessageId, Message> msgMap = new HashMap<MessageId, Message>();
-    for (String msgId : instanceMsgMap.keySet()) {
-      Message message = instanceMsgMap.get(msgId);
-      msgMap.put(new MessageId(msgId), message);
+    if (instanceMsgMap != null) {
+      for (String msgId : instanceMsgMap.keySet()) {
+        Message message = instanceMsgMap.get(msgId);
+        msgMap.put(new MessageId(msgId), message);
+      }
     }
 
     Map<ResourceId, CurrentState> curStateMap = new HashMap<ResourceId, CurrentState>();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
index 2baf63b..4ac254d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
@@ -19,18 +19,25 @@ package org.apache.helix.api;
  * under the License.
  */
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.ResourceAssignment;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Captures the configuration properties necessary for rebalancing
+ */
 public class RebalancerConfig {
   private final RebalanceMode _rebalancerMode;
   private final RebalancerRef _rebalancerRef;
   private final StateModelDefId _stateModelDefId;
   private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
+  private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
   private final ResourceAssignment _resourceAssignment;
   private final int _replicaCount;
   private final String _participantGroupTag;
@@ -39,20 +46,38 @@ public class RebalancerConfig {
   private final boolean _batchMessageMode;
   private final StateModelFactoryId _stateModelFactoryId;
 
-  public RebalancerConfig(RebalanceMode mode, RebalancerRef rebalancerRef,
-      StateModelDefId stateModelDefId, ResourceAssignment resourceAssignment, int bucketSize,
-      boolean batchMessageMode, StateModelFactoryId stateModelFactoryId) {
-    _rebalancerMode = mode;
-    _rebalancerRef = rebalancerRef;
-    _stateModelDefId = stateModelDefId;
+  /**
+   * Instantiate the configuration of a rebalance task
+   * @param idealState the physical ideal state
+   * @param resourceAssignment last mapping of a resource
+   */
+  public RebalancerConfig(IdealState idealState, ResourceAssignment resourceAssignment) {
+    _rebalancerMode = idealState.getRebalanceMode();
+    _rebalancerRef = idealState.getRebalancerRef();
+    _stateModelDefId = idealState.getStateModelDefId();
+    _replicaCount = Integer.parseInt(idealState.getReplicas());
+    _participantGroupTag = idealState.getInstanceGroupTag();
+    _maxPartitionsPerParticipant = idealState.getMaxPartitionsPerInstance();
+    _bucketSize = idealState.getBucketSize();
+    _batchMessageMode = idealState.getBatchMessageMode();
+    _stateModelFactoryId = idealState.getStateModelFactoryId();
+
+    // Build preference lists and maps
+    ImmutableMap.Builder<PartitionId, List<ParticipantId>> preferenceLists =
+        new ImmutableMap.Builder<PartitionId, List<ParticipantId>>();
+    ImmutableMap.Builder<PartitionId, Map<ParticipantId, State>> preferenceMaps =
+        new ImmutableMap.Builder<PartitionId, Map<ParticipantId, State>>();
+    for (PartitionId partitionId : idealState.getPartitionSet()) {
+      preferenceLists.put(partitionId,
+          ImmutableList.copyOf(idealState.getPreferenceList(partitionId)));
+      preferenceMaps.put(partitionId,
+          ImmutableMap.copyOf(idealState.getParticipantStateMap(partitionId)));
+    }
+    _preferenceLists = preferenceLists.build();
+    _preferenceMaps = preferenceMaps.build();
+
+    // Leave the resource assignment as is
     _resourceAssignment = resourceAssignment;
-    _preferenceLists = Collections.emptyMap(); // TODO: stub
-    _replicaCount = 0; // TODO: stub
-    _participantGroupTag = null; // TODO: stub
-    _maxPartitionsPerParticipant = Integer.MAX_VALUE; // TODO: stub
-    _bucketSize = bucketSize;
-    _batchMessageMode = batchMessageMode;
-    _stateModelFactoryId = stateModelFactoryId;
   }
 
   /**
@@ -97,6 +122,15 @@ public class RebalancerConfig {
   }
 
   /**
+   * Get the preference map of participants and states for a given partition
+   * @param partitionId the partition to look up
+   * @return a mapping of participant to state for each replica
+   */
+  public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
+    return _preferenceMaps.get(partitionId);
+  }
+
+  /**
    * Get the number of replicas each partition should have
    * @return replica count
    */
@@ -144,29 +178,27 @@ public class RebalancerConfig {
     return _stateModelFactoryId;
   }
 
-  // TODO impl this
-  public String getRebalancerClassName() {
-    throw new UnsupportedOperationException("impl this");
-  }
-
   /**
    * Assembles a RebalancerConfig
    */
   public static class Builder {
-    private RebalanceMode _mode = RebalanceMode.NONE;
-    private RebalancerRef _rebalancerRef;
-    private StateModelDefId _stateModelDefId;
+    private final IdealState _idealState;
     private ResourceAssignment _resourceAssignment;
-    private int _bucketSize;
-    private boolean _batchMessageMode;
-    private StateModelFactoryId _stateModelFactoryId;
+
+    /**
+     * Configure the rebalancer for a resource
+     * @param resourceId the resource to rebalance
+     */
+    public Builder(ResourceId resourceId) {
+      _idealState = new IdealState(resourceId);
+    }
 
     /**
      * Set the rebalancer mode
      * @param mode {@link RebalanceMode}
      */
     public Builder rebalancerMode(RebalanceMode mode) {
-      _mode = mode;
+      _idealState.setRebalanceMode(mode);
       return this;
     }
 
@@ -176,7 +208,7 @@ public class RebalancerConfig {
      * @return Builder
      */
     public Builder rebalancer(RebalancerRef rebalancerRef) {
-      _rebalancerRef = rebalancerRef;
+      _idealState.setRebalancerRef(rebalancerRef);
       return this;
     }
 
@@ -186,7 +218,7 @@ public class RebalancerConfig {
      * @return Builder
      */
     public Builder stateModelDef(StateModelDefId stateModelDefId) {
-      _stateModelDefId = stateModelDefId;
+      _idealState.setStateModelDefId(stateModelDefId);
       return this;
     }
 
@@ -206,7 +238,7 @@ public class RebalancerConfig {
      * @return Builder
      */
     public Builder bucketSize(int bucketSize) {
-      _bucketSize = bucketSize;
+      _idealState.setBucketSize(bucketSize);
       return this;
     }
 
@@ -216,7 +248,27 @@ public class RebalancerConfig {
      * @return Builder
      */
     public Builder batchMessageMode(boolean batchMessageMode) {
-      _batchMessageMode = batchMessageMode;
+      _idealState.setBatchMessageMode(batchMessageMode);
+      return this;
+    }
+
+    /**
+     * Set the number of replicas
+     * @param replicaCount number of replicas
+     * @return Builder
+     */
+    public Builder replicaCount(int replicaCount) {
+      _idealState.setReplicas(Integer.toString(replicaCount));
+      return this;
+    }
+
+    /**
+     * Set the maximum number of partitions to assign to any participant
+     * @param maxPartitions
+     * @return Builder
+     */
+    public Builder maxPartitionsPerParticipant(int maxPartitions) {
+      _idealState.setMaxPartitionsPerInstance(maxPartitions);
       return this;
     }
 
@@ -226,7 +278,7 @@ public class RebalancerConfig {
      * @return Builder
      */
     public Builder stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
-      _stateModelFactoryId = stateModelFactoryId;
+      _idealState.setStateModelFactoryId(stateModelFactoryId);
       return this;
     }
 
@@ -235,8 +287,7 @@ public class RebalancerConfig {
      * @return a fully defined rebalancer configuration
      */
     public RebalancerConfig build() {
-      return new RebalancerConfig(_mode, _rebalancerRef, _stateModelDefId, _resourceAssignment,
-          _bucketSize, _batchMessageMode, _stateModelFactoryId);
+      return new RebalancerConfig(_idealState, _resourceAssignment);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
index 7f33be7..5f22898 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
@@ -19,10 +19,13 @@ package org.apache.helix.api;
  * under the License.
  */
 
-import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.rebalancer.NewRebalancer;
 import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
 
 public class RebalancerRef {
+  private static final Logger LOG = Logger.getLogger(RebalancerRef.class);
+
   private final String _rebalancerClassName;
 
   public RebalancerRef(String rebalancerClassName) {
@@ -32,18 +35,11 @@ public class RebalancerRef {
   /**
    * @return
    */
-  public Rebalancer getRebalancer() {
+  public NewRebalancer getRebalancer() {
     try {
-      return (Rebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
-    } catch (InstantiationException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    } catch (IllegalAccessException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    } catch (ClassNotFoundException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      return (NewRebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
+    } catch (Exception e) {
+      LOG.warn("Exception while invoking custom rebalancer class:" + _rebalancerClassName, e);
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index f976fad..0c8b730 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -26,6 +26,7 @@ import java.util.Set;
 
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.ResourceAssignment;
 
 import com.google.common.collect.ImmutableMap;
@@ -37,11 +38,11 @@ import com.google.common.collect.ImmutableSet;
 public class Resource {
   private final ResourceId _id;
   private final RebalancerConfig _rebalancerConfig;
+  private final SchedulerTaskConfig _schedulerTaskConfig;
 
   private final Map<PartitionId, Partition> _partitionMap;
 
   private final ExternalView _externalView;
-  private final ExternalView _pendingExternalView;
 
   /**
    * Construct a resource
@@ -50,19 +51,37 @@ public class Resource {
    */
   public Resource(ResourceId id, IdealState idealState, ResourceAssignment resourceAssignment) {
     _id = id;
-    _rebalancerConfig = new RebalancerConfig(idealState.getRebalanceMode(), idealState.getRebalancerRef(),
-            idealState.getStateModelDefId(), resourceAssignment, idealState.getBucketSize(),
-            idealState.getBatchMessageMode(), Id.stateModelFactory(
-                idealState.getStateModelFactoryName()));
+    _rebalancerConfig = new RebalancerConfig(idealState, resourceAssignment);
 
     Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
+    Map<PartitionId, Map<String, String>> schedulerTaskConfig =
+        new HashMap<PartitionId, Map<String, String>>();
+    Map<String, Integer> transitionTimeoutMap = new HashMap<String, Integer>();
     for (PartitionId partitionId : idealState.getPartitionSet()) {
       partitionMap.put(partitionId, new Partition(partitionId));
+
+      // TODO refactor it
+      Map<String, String> taskConfigMap = idealState.getRecord().getMapField(partitionId.stringify());
+      if (taskConfigMap != null) {
+        schedulerTaskConfig.put(partitionId, taskConfigMap);
+      }
+
+      // TODO refactor it
+      for (String simpleKey : idealState.getRecord().getSimpleFields().keySet()) {
+        if (simpleKey.indexOf("_" + Message.Attributes.TIMEOUT) != -1) {
+          try {
+            int timeout = Integer.parseInt(idealState.getRecord().getSimpleField(simpleKey));
+            transitionTimeoutMap.put(simpleKey, timeout);
+          } catch (Exception e) {
+            // ignore
+          }
+        }
+      }
     }
     _partitionMap = ImmutableMap.copyOf(partitionMap);
+    _schedulerTaskConfig = new SchedulerTaskConfig(transitionTimeoutMap, schedulerTaskConfig);
 
     _externalView = null;
-    _pendingExternalView = null; // TODO: stub
   }
 
   /**
@@ -75,12 +94,12 @@ public class Resource {
    */
   public Resource(ResourceId id, Map<PartitionId, Partition> partitionMap,
       ExternalView externalView,
-      ExternalView pendingExternalView, RebalancerConfig rebalancerConfig) {
+      RebalancerConfig rebalancerConfig, SchedulerTaskConfig schedulerTaskConfig) {
     _id = id;
     _partitionMap = ImmutableMap.copyOf(partitionMap);
     _externalView = externalView;
-    _pendingExternalView = pendingExternalView;
     _rebalancerConfig = rebalancerConfig;
+    _schedulerTaskConfig = schedulerTaskConfig;
   }
 
   /**
@@ -116,14 +135,6 @@ public class Resource {
     return _externalView;
   }
 
-  /**
-   * Get the pending external view of the resource based on unprocessed messages
-   * @return the external view of the resource
-   */
-  public ExternalView getPendingExternalView() {
-    return _pendingExternalView;
-  }
-
   public RebalancerConfig getRebalancerConfig() {
     return _rebalancerConfig;
   }
@@ -132,6 +143,10 @@ public class Resource {
     return _id;
   }
 
+  public SchedulerTaskConfig getSchedulerTaskConfig() {
+    return _schedulerTaskConfig;
+  }
+
   /**
    * Assembles a Resource
    */
@@ -139,8 +154,8 @@ public class Resource {
     private final ResourceId _id;
     private final Map<PartitionId, Partition> _partitionMap;
     private ExternalView _externalView;
-    private ExternalView _pendingExternalView;
     private RebalancerConfig _rebalancerConfig;
+    private SchedulerTaskConfig _schedulerTaskConfig;
 
     /**
      * Build a Resource with an id
@@ -184,22 +199,21 @@ public class Resource {
     }
 
     /**
-     * Set the pending external view of this resource
-     * @param extView replica placements as a result of pending messages
+     * Set the rebalancer configuration
+     * @param rebalancerConfig properties of interest for rebalancing
      * @return Builder
      */
-    public Builder pendingExternalView(ExternalView pendingExtView) {
-      _pendingExternalView = pendingExtView;
+    public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
+      _rebalancerConfig = rebalancerConfig;
       return this;
     }
 
     /**
-     * Set the rebalancer configuration
-     * @param rebalancerConfig properties of interest for rebalancing
-     * @return Builder
+     * @param schedulerTaskConfig
+     * @return
      */
-    public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
-      _rebalancerConfig = rebalancerConfig;
+    public Builder schedulerTaskConfig(SchedulerTaskConfig schedulerTaskConfig) {
+      _schedulerTaskConfig = schedulerTaskConfig;
       return this;
     }
 
@@ -208,8 +222,8 @@ public class Resource {
      * @return instantiated Resource
      */
     public Resource build() {
-      return new Resource(_id, _partitionMap, _externalView, _pendingExternalView,
-          _rebalancerConfig);
+      return new Resource(_id, _partitionMap, _externalView, _rebalancerConfig,
+          _schedulerTaskConfig);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/api/SchedulerTaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/SchedulerTaskConfig.java b/helix-core/src/main/java/org/apache/helix/api/SchedulerTaskConfig.java
new file mode 100644
index 0000000..ac7cb3a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/SchedulerTaskConfig.java
@@ -0,0 +1,47 @@
+package org.apache.helix.api;
+
+import java.util.Map;
+
+import org.apache.helix.model.Message;
+
+import com.google.common.collect.ImmutableMap;
+
+public class SchedulerTaskConfig {
+  // TODO refactor using Transition logical model
+  private final Map<String, Integer> _transitionTimeoutMap;
+
+  // TODO refactor this when understand inner message format
+  private final Map<PartitionId, Map<String, String>> _schedulerTaskConfig;
+
+  public SchedulerTaskConfig(Map<String, Integer> transitionTimeoutMap,
+      Map<PartitionId, Map<String, String>> schedulerTaskConfig) {
+    _transitionTimeoutMap = ImmutableMap.copyOf(transitionTimeoutMap);
+    _schedulerTaskConfig = ImmutableMap.copyOf(schedulerTaskConfig);
+  }
+
+  public Map<String, String> getTaskConfig(PartitionId partitionId) {
+    return _schedulerTaskConfig.get(partitionId);
+  }
+
+  public Integer getTransitionTimeout(String transition) {
+    return _transitionTimeoutMap.get(transition);
+  }
+
+  public Integer getTimeout(String transition, PartitionId partitionId) {
+    Integer timeout = getTransitionTimeout(transition);
+    if (timeout == null) {
+      Map<String, String> taskConfig = getTaskConfig(partitionId);
+      if (taskConfig != null) {
+        String timeoutStr = taskConfig.get(Message.Attributes.TIMEOUT.toString());
+        if (timeoutStr != null) {
+          try {
+            timeout = Integer.parseInt(timeoutStr);
+          } catch (Exception e) {
+            // ignore
+          }
+        }
+      }
+    }
+    return timeout;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 314733f..6570dc4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -52,6 +52,16 @@ import org.apache.helix.controller.stages.ExternalViewComputeStage;
 import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.NewCompatibilityCheckStage;
+import org.apache.helix.controller.stages.NewCurrentStateComputationStage;
+import org.apache.helix.controller.stages.NewExternalViewComputeStage;
+import org.apache.helix.controller.stages.NewMessageGenerationStage;
+import org.apache.helix.controller.stages.NewMessageSelectionStage;
+import org.apache.helix.controller.stages.NewMessageThrottleStage;
+import org.apache.helix.controller.stages.NewReadClusterDataStage;
+import org.apache.helix.controller.stages.NewResourceComputationStage;
+import org.apache.helix.controller.stages.NewTaskAssignmentStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
 import org.apache.helix.controller.stages.RebalanceIdealStateStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
@@ -175,23 +185,22 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
 
       // cluster data cache refresh
       Pipeline dataRefresh = new Pipeline();
-      dataRefresh.addStage(new ReadClusterDataStage());
+      dataRefresh.addStage(new NewReadClusterDataStage());
 
       // rebalance pipeline
       Pipeline rebalancePipeline = new Pipeline();
-      rebalancePipeline.addStage(new CompatibilityCheckStage());
-      rebalancePipeline.addStage(new ResourceComputationStage());
-      rebalancePipeline.addStage(new CurrentStateComputationStage());
-      rebalancePipeline.addStage(new RebalanceIdealStateStage());
-      rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-      rebalancePipeline.addStage(new MessageGenerationPhase());
-      rebalancePipeline.addStage(new MessageSelectionStage());
-      rebalancePipeline.addStage(new MessageThrottleStage());
-      rebalancePipeline.addStage(new TaskAssignmentStage());
+      rebalancePipeline.addStage(new NewCompatibilityCheckStage());
+      rebalancePipeline.addStage(new NewResourceComputationStage());
+      rebalancePipeline.addStage(new NewCurrentStateComputationStage());
+      rebalancePipeline.addStage(new NewBestPossibleStateCalcStage());
+      rebalancePipeline.addStage(new NewMessageGenerationStage());
+      rebalancePipeline.addStage(new NewMessageSelectionStage());
+      rebalancePipeline.addStage(new NewMessageThrottleStage());
+      rebalancePipeline.addStage(new NewTaskAssignmentStage());
 
       // external view generation
       Pipeline externalViewPipeline = new Pipeline();
-      externalViewPipeline.addStage(new ExternalViewComputeStage());
+      externalViewPipeline.addStage(new NewExternalViewComputeStage());
 
       registry.register("idealStateChange", dataRefresh, rebalancePipeline);
       registry.register("currentStateChange", dataRefresh, rebalancePipeline, externalViewPipeline);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
index 563b7e2..8821082 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
@@ -38,6 +38,7 @@ import org.apache.helix.api.Resource;
 import org.apache.helix.api.State;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.controller.stages.NewCurrentStateOutput;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
@@ -66,7 +67,7 @@ public class NewAutoRebalancer implements NewRebalancer {
 
   @Override
   public ResourceAssignment computeResourceMapping(Resource resource, Cluster cluster,
-      StateModelDefinition stateModelDef) {
+      StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
     // Compute a preference list based on the current ideal state
     List<Partition> partitions = new ArrayList<Partition>(resource.getPartitionSet());
     List<String> partitionNames = Lists.transform(partitions, Functions.toStringFunction());
@@ -75,15 +76,15 @@ public class NewAutoRebalancer implements NewRebalancer {
     Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
     int replicas = config.getReplicaCount();
 
-    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
-    stateCountMap =
+    LinkedHashMap<String, Integer> stateCountMap =
         ConstraintBasedAssignment.stateCount(stateModelDef, liveParticipants.size(), replicas);
     List<ParticipantId> liveParticipantList =
         new ArrayList<ParticipantId>(liveParticipants.keySet());
     List<ParticipantId> allParticipantList =
         new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
     List<String> liveNodes = Lists.transform(liveParticipantList, Functions.toStringFunction());
-    Map<PartitionId, Map<ParticipantId, State>> currentMapping = currentMapping(resource);
+    Map<PartitionId, Map<ParticipantId, State>> currentMapping =
+        currentMapping(resource, currentStateOutput, stateCountMap);
 
     // If there are nodes tagged with resource, use only those nodes
     Set<String> taggedNodes = new HashSet<String>();
@@ -136,21 +137,37 @@ public class NewAutoRebalancer implements NewRebalancer {
       Map<ParticipantId, State> bestStateForPartition =
           NewConstraintBasedAssignment.computeAutoBestStateForPartition(liveParticipants,
               stateModelDef, preferenceList,
-              resource.getExternalView().getStateMap(partition.getId()),
+              currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId()),
               disabledParticipantsForPartition);
       partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
     }
     return partitionMapping;
   }
 
-  private Map<PartitionId, Map<ParticipantId, State>> currentMapping(Resource resource) {
+  private Map<PartitionId, Map<ParticipantId, State>> currentMapping(Resource resource,
+      NewCurrentStateOutput currentStateOutput, Map<String, Integer> stateCountMap) {
     Map<PartitionId, Map<ParticipantId, State>> map =
         new HashMap<PartitionId, Map<ParticipantId, State>>();
 
     for (Partition partition : resource.getPartitionSet()) {
-      Map<ParticipantId, State> stateMap = new HashMap<ParticipantId, State>();
-      stateMap.putAll(resource.getExternalView().getStateMap(partition.getId()));
-      stateMap.putAll(resource.getPendingExternalView().getStateMap(partition.getId()));
+      Map<ParticipantId, State> curStateMap =
+          currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId());
+      map.put(partition.getId(), new HashMap<ParticipantId, State>());
+      for (ParticipantId node : curStateMap.keySet()) {
+        State state = curStateMap.get(node);
+        if (stateCountMap.containsKey(state)) {
+          map.get(partition.getId()).put(node, state);
+        }
+      }
+
+      Map<ParticipantId, State> pendingStateMap =
+          currentStateOutput.getPendingStateMap(resource.getId(), partition.getId());
+      for (ParticipantId node : pendingStateMap.keySet()) {
+        State state = pendingStateMap.get(node);
+        if (stateCountMap.containsKey(state)) {
+          map.get(partition.getId()).put(node, state);
+        }
+      }
     }
     return map;
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
index 600d848..8d000f5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
@@ -32,6 +32,7 @@ import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.State;
 import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.controller.stages.NewCurrentStateOutput;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -51,7 +52,7 @@ public class NewCustomRebalancer implements NewRebalancer {
 
   @Override
   public ResourceAssignment computeResourceMapping(Resource resource, Cluster cluster,
-      StateModelDefinition stateModelDef) {
+      StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Processing resource:" + resource.getId());
     }
@@ -59,13 +60,13 @@ public class NewCustomRebalancer implements NewRebalancer {
     RebalancerConfig config = resource.getRebalancerConfig();
     for (Partition partition : resource.getPartitionSet()) {
       Map<ParticipantId, State> currentStateMap =
-          resource.getExternalView().getStateMap(partition.getId());
+          currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId());
       Set<ParticipantId> disabledInstancesForPartition =
           NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
               partition.getId());
       Map<ParticipantId, State> bestStateForPartition =
           computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap(), stateModelDef,
-              config.getResourceAssignment().getReplicaMap(partition.getId()), currentStateMap,
+              config.getPreferenceMap(partition.getId()), currentStateMap,
               disabledInstancesForPartition);
       partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
     }
@@ -74,11 +75,11 @@ public class NewCustomRebalancer implements NewRebalancer {
 
   /**
    * compute best state for resource in CUSTOMIZED rebalancer mode
-   * @param cache
+   * @param liveParticipantMap
    * @param stateModelDef
    * @param idealStateMap
    * @param currentStateMap
-   * @param disabledInstancesForPartition
+   * @param disabledParticipantsForPartition
    * @return
    */
   private Map<ParticipantId, State> computeCustomizedBestStateForPartition(
@@ -87,7 +88,7 @@ public class NewCustomRebalancer implements NewRebalancer {
       Set<ParticipantId> disabledParticipantsForPartition) {
     Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
 
-    // if the ideal state is deleted, idealStateMap will be null/empty and
+    // if the resource is deleted, idealStateMap will be null/empty and
     // we should drop all resources.
     if (currentStateMap != null) {
       for (ParticipantId participantId : currentStateMap.keySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
index 253723f..70c9ca7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
@@ -21,6 +21,7 @@ package org.apache.helix.controller.rebalancer;
 
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Resource;
+import org.apache.helix.controller.stages.NewCurrentStateOutput;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 
@@ -37,7 +38,8 @@ public interface NewRebalancer {
    * @param resource the resource for which a mapping will be computed
    * @param cluster a snapshot of the entire cluster state
    * @param stateModelDef the state model for which to rebalance the resource
+   * @param currentStateOutput a combination of the current states and pending current states
    */
   ResourceAssignment computeResourceMapping(final Resource resource, final Cluster cluster,
-      final StateModelDefinition stateModelDef);
+      final StateModelDefinition stateModelDef, final NewCurrentStateOutput currentStateOutput);
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
index 472e7d3..27bb513 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
@@ -30,6 +30,7 @@ import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.State;
 import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.controller.stages.NewCurrentStateOutput;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -49,7 +50,7 @@ public class NewSemiAutoRebalancer implements NewRebalancer {
 
   @Override
   public ResourceAssignment computeResourceMapping(Resource resource, Cluster cluster,
-      StateModelDefinition stateModelDef) {
+      StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Processing resource:" + resource.getId());
     }
@@ -57,7 +58,7 @@ public class NewSemiAutoRebalancer implements NewRebalancer {
     RebalancerConfig config = resource.getRebalancerConfig();
     for (Partition partition : resource.getPartitionSet()) {
       Map<ParticipantId, State> currentStateMap =
-          resource.getExternalView().getStateMap(partition.getId());
+          currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId());
       Set<ParticipantId> disabledInstancesForPartition =
           NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
               partition.getId());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
index feb3214..224853b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
@@ -43,8 +43,8 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Sets;
 
 /**
- * Collection of functions that will compute the best possible states given the live instances and
- * an ideal state.
+ * Collection of functions that will compute the best possible state based on the participants and
+ * the rebalancer configuration of a resource.
  */
 public class NewConstraintBasedAssignment {
   private static Logger logger = Logger.getLogger(NewConstraintBasedAssignment.class);
@@ -57,8 +57,9 @@ public class NewConstraintBasedAssignment {
    */
   public static Set<ParticipantId> getDisabledParticipants(
       final Map<ParticipantId, Participant> participantMap, final PartitionId partitionId) {
+    Set<ParticipantId> participantSet = new HashSet<ParticipantId>(participantMap.keySet());
     Set<ParticipantId> disabledParticipantsForPartition =
-        Sets.filter(participantMap.keySet(), new Predicate<ParticipantId>() {
+        Sets.filter(participantSet, new Predicate<ParticipantId>() {
           @Override
           public boolean apply(ParticipantId participantId) {
             return participantMap.get(participantId).getDisablePartitionIds().contains(partitionId);
@@ -87,7 +88,7 @@ public class NewConstraintBasedAssignment {
   }
 
   /**
-   * compute best state for resource in AUTO ideal state mode
+   * compute best state for resource in SEMI_AUTO and FULL_AUTO modes
    * @param liveParticipantMap map of id to live participants
    * @param stateModelDef
    * @param participantPreferenceList
@@ -102,7 +103,7 @@ public class NewConstraintBasedAssignment {
       Set<ParticipantId> disabledParticipantsForPartition) {
     Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
 
-    // if the ideal state is deleted, instancePreferenceList will be empty and
+    // if the resource is deleted, instancePreferenceList will be empty and
     // we should drop all resources.
     if (currentStateMap != null) {
       for (ParticipantId participantId : currentStateMap.keySet()) {
@@ -119,7 +120,7 @@ public class NewConstraintBasedAssignment {
       }
     }
 
-    // ideal state is deleted
+    // resource is deleted
     if (participantPreferenceList == null) {
       return participantStateMap;
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index ae0278b..9abf67c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -26,5 +26,6 @@ public enum AttributeName {
   MESSAGES_ALL,
   MESSAGES_SELECTED,
   MESSAGES_THROTTLE,
-  LOCAL_STATE
+  LOCAL_STATE,
+  STATE_MODEL_DEFINITIONS
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
index 995bb74..bc14297 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -20,30 +20,26 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.Map;
+import java.util.Set;
 
-import org.apache.helix.HelixManager;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Id;
 import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.ResourceId;
-import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.AutoRebalancer;
-import org.apache.helix.controller.rebalancer.CustomRebalancer;
 import org.apache.helix.controller.rebalancer.NewAutoRebalancer;
 import org.apache.helix.controller.rebalancer.NewCustomRebalancer;
 import org.apache.helix.controller.rebalancer.NewRebalancer;
 import org.apache.helix.controller.rebalancer.NewSemiAutoRebalancer;
-import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
-import org.apache.helix.model.IdealState;
+import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
 import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.util.HelixUtil;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
 /**
@@ -51,13 +47,14 @@ import org.apache.log4j.Logger;
  * IdealState,StateModel,LiveInstance
  */
 public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
-  private static final Logger LOG = Logger.getLogger(NewBestPossibleStateCalcStage.class
-      .getName());
+  private static final Logger LOG = Logger.getLogger(NewBestPossibleStateCalcStage.class.getName());
 
   @Override
   public void process(ClusterEvent event) throws Exception {
     long startTime = System.currentTimeMillis();
-    LOG.info("START BestPossibleStateCalcStage.process()");
+    if (LOG.isInfoEnabled()) {
+      LOG.info("START BestPossibleStateCalcStage.process()");
+    }
 
     NewCurrentStateOutput currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
@@ -74,49 +71,70 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
 
     long endTime = System.currentTimeMillis();
-    LOG.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
+    if (LOG.isInfoEnabled()) {
+      LOG.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
+    }
+  }
+
+  /**
+   * Fallback for cases when the resource has been dropped, but current state exists
+   * @param cluster cluster snapshot
+   * @param resourceId the resource for which to generate an assignment
+   * @param currentStateOutput full snapshot of the current state
+   * @param stateModelDef state model the resource follows
+   * @return assignment for the dropped resource
+   */
+  private ResourceAssignment mapDroppedResource(Cluster cluster, ResourceId resourceId,
+      NewCurrentStateOutput currentStateOutput, StateModelDefinition stateModelDef) {
+    ResourceAssignment partitionMapping = new ResourceAssignment(resourceId);
+    Set<PartitionId> mappedPartitions =
+        currentStateOutput.getCurrentStateMappedPartitions(resourceId);
+    if (mappedPartitions == null) {
+      return partitionMapping;
+    }
+    for (PartitionId partitionId : mappedPartitions) {
+      Set<ParticipantId> disabledParticipantsForPartition =
+          NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+              partitionId);
+      partitionMapping.addReplicaMap(partitionId, NewConstraintBasedAssignment
+          .computeAutoBestStateForPartition(cluster.getLiveParticipantMap(), stateModelDef, null,
+              currentStateOutput.getCurrentStateMap(resourceId, partitionId),
+              disabledParticipantsForPartition));
+    }
+    return partitionMapping;
   }
 
   // TODO check this
   private NewBestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
       Map<ResourceId, Resource> resourceMap, NewCurrentStateOutput currentStateOutput) {
-    // for each ideal state
-    // read the state model def
-    // for each resource
-    // get the preference list
-    // for each instanceName check if its alive then assign a state
-    // ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-
     NewBestPossibleStateOutput output = new NewBestPossibleStateOutput();
+    Map<StateModelDefId, StateModelDefinition> stateModelDefs =
+        event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
 
     for (ResourceId resourceId : resourceMap.keySet()) {
       LOG.debug("Processing resource:" + resourceId);
-
-      Resource resource = resourceMap.get(resourceId);
-      // Ideal state may be gone. In that case we need to get the state model name
+      // Resource may be gone. In that case we need to get the state model name
       // from the current state
-      // IdealState idealState = cache.getIdealState(resourceName);
-
       Resource existResource = cluster.getResource(resourceId);
       if (existResource == null) {
-        // if ideal state is deleted, use an empty one
-        LOG.info("resource:" + resourceId + " does not exist anymore");
-        // TODO
-        // existResource = new Resource();
+        // if resource is deleted, then we do not know which rebalancer to use
+        // instead, just mark all partitions of the resource as dropped
+        if (LOG.isInfoEnabled()) {
+          LOG.info("resource:" + resourceId + " does not exist anymore");
+        }
+        StateModelDefinition stateModelDef =
+            stateModelDefs.get(currentStateOutput.getResourceStateModelDef(resourceId));
+        ResourceAssignment droppedAssignment =
+            mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
+        output.setResourceAssignment(resourceId, droppedAssignment);
+        continue;
       }
 
       RebalancerConfig rebalancerConfig = existResource.getRebalancerConfig();
       NewRebalancer rebalancer = null;
       if (rebalancerConfig.getRebalancerMode() == RebalanceMode.USER_DEFINED
-          && rebalancerConfig.getRebalancerClassName() != null) {
-        String rebalancerClassName = rebalancerConfig.getRebalancerClassName();
-        LOG.info("resource " + resourceId + " use idealStateRebalancer " + rebalancerClassName);
-        try {
-          rebalancer =
-              (NewRebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
-        } catch (Exception e) {
-          LOG.warn("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
-        }
+          && rebalancerConfig.getRebalancerRef() != null) {
+        rebalancer = rebalancerConfig.getRebalancerRef().getRebalancer();
       }
       if (rebalancer == null) {
         if (rebalancerConfig.getRebalancerMode() == RebalanceMode.FULL_AUTO) {
@@ -128,9 +146,15 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
         }
       }
 
-      // TODO pass state model definition
+      StateModelDefinition stateModelDef =
+          stateModelDefs.get(rebalancerConfig.getStateModelDefId());
       ResourceAssignment resourceAssignment =
-          rebalancer.computeResourceMapping(resource, cluster, null);
+          rebalancer.computeResourceMapping(existResource, cluster, stateModelDef,
+              currentStateOutput);
+      if (resourceAssignment == null) {
+        resourceAssignment =
+            mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
+      }
 
       output.setResourceAssignment(resourceId, resourceAssignment);
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
index 474f463..d5ee850 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
@@ -1,5 +1,6 @@
 package org.apache.helix.controller.stages;
 
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.helix.api.ResourceId;
@@ -9,11 +10,25 @@ public class NewBestPossibleStateOutput {
 
   Map<ResourceId, ResourceAssignment> _resourceAssignmentMap;
 
+  public NewBestPossibleStateOutput() {
+    _resourceAssignmentMap = new HashMap<ResourceId, ResourceAssignment>();
+  }
+
   /**
-   * @param resourceId
-   * @param resourceAssignment
+   * Set the computed resource assignment for a resource
+   * @param resourceId the resource to set
+   * @param resourceAssignment the computed assignment
    */
   public void setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
     _resourceAssignmentMap.put(resourceId, resourceAssignment);
   }
+
+  /**
+   * Get the resource assignment computed for a resource
+   * @param resourceId resource to look up
+   * @return ResourceAssignment computed by the best possible state calculation
+   */
+  public ResourceAssignment getResourceAssignment(ResourceId resourceId) {
+    return _resourceAssignmentMap.get(resourceId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewCompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCompatibilityCheckStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCompatibilityCheckStage.java
new file mode 100644
index 0000000..7478609
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCompatibilityCheckStage.java
@@ -0,0 +1,68 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.HelixVersion;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.log4j.Logger;
+
+/**
+ * controller checks if participant version is compatible
+ */
+public class NewCompatibilityCheckStage extends AbstractBaseStage {
+  private static final Logger LOG = Logger.getLogger(NewCompatibilityCheckStage.class.getName());
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    HelixManager manager = event.getAttribute("helixmanager");
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    if (manager == null || cluster == null) {
+      throw new StageException("Missing attributes in event:" + event
+          + ". Requires HelixManager | DataCache");
+    }
+
+    HelixManagerProperties properties = manager.getProperties();
+    // Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
+    Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
+    for (Participant liveParticipant : liveParticipants.values()) {
+      HelixVersion version = liveParticipant.getRunningInstance().getVersion();
+      String participantVersion = (version != null) ? version.toString() : null;
+      if (!properties.isParticipantCompatible(participantVersion)) {
+        String errorMsg =
+            "incompatible participant. pipeline will not continue. " + "controller: "
+                + manager.getInstanceName() + ", controllerVersion: " + properties.getVersion()
+                + ", minimumSupportedParticipantVersion: "
+                + properties.getProperty("minimum_supported_version.participant")
+                + ", participant: " + liveParticipant.getId() + ", participantVersion: "
+                + participantVersion;
+        LOG.error(errorMsg);
+        throw new StageException(errorMsg);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
index 417512e..d8bbfe3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
@@ -22,6 +22,7 @@ package org.apache.helix.controller.stages;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
@@ -29,7 +30,6 @@ import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.State;
 import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.Partition;
 
 public class NewCurrentStateOutput {
   /**
@@ -135,7 +135,7 @@ public class NewCurrentStateOutput {
    * @return state
    */
   static State getState(Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap,
-    ResourceId resourceId, PartitionId partitionId, ParticipantId participantId) {
+      ResourceId resourceId, PartitionId partitionId, ParticipantId participantId) {
     Map<PartitionId, Map<ParticipantId, State>> map = stateMap.get(resourceId);
     if (map != null) {
       Map<ParticipantId, State> instanceStateMap = map.get(partitionId);
@@ -221,12 +221,25 @@ public class NewCurrentStateOutput {
   }
 
   /**
+   * Get the partitions mapped in the current state
+   * @param resourceId resource to look up
+   * @return set of mapped partitions, or empty set if there are none
+   */
+  public Set<PartitionId> getCurrentStateMappedPartitions(ResourceId resourceId) {
+    Map<PartitionId, Map<ParticipantId, State>> currentStateMap = _currentStateMap.get(resourceId);
+    if (currentStateMap != null) {
+      return currentStateMap.keySet();
+    }
+    return Collections.emptySet();
+  }
+
+  /**
    * @param resourceId
    * @param partitionId
    * @return
    */
   public Map<ParticipantId, State> getPendingStateMap(ResourceId resourceId, PartitionId partitionId) {
-    return getStateMap(_currentStateMap, resourceId, partitionId);
+    return getStateMap(_pendingStateMap, resourceId, partitionId);
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationPhase.java
deleted file mode 100644
index 0fdfe56..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationPhase.java
+++ /dev/null
@@ -1,233 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Id;
-import org.apache.helix.api.MessageId;
-import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.ResourceId;
-import org.apache.helix.api.SessionId;
-import org.apache.helix.api.State;
-import org.apache.helix.api.StateModelDefId;
-import org.apache.helix.api.StateModelFactoryId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * Compares the currentState, pendingState with IdealState and generate messages
- */
-public class NewMessageGenerationPhase extends AbstractBaseStage {
-  private static Logger LOG = Logger.getLogger(NewMessageGenerationPhase.class);
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    HelixManager manager = event.getAttribute("helixmanager");
-    Cluster cluster = event.getAttribute("ClusterDataCache");
-    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-    NewCurrentStateOutput currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    BestPossibleStateOutput bestPossibleStateOutput =
-        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
-    if (manager == null || cluster == null || resourceMap == null || currentStateOutput == null
-        || bestPossibleStateOutput == null) {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
-    }
-
-    // Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-    // Map<String, String> sessionIdMap = new HashMap<String, String>();
-
-    // for (LiveInstance liveInstance : liveInstances.values()) {
-    // sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId().stringify());
-    // }
-    MessageGenerationOutput output = new MessageGenerationOutput();
-
-    for (ResourceId resourceId : resourceMap.keySet()) {
-      Resource resource = resourceMap.get(resourceId);
-      int bucketSize = resource.getRebalancerConfig().getBucketSize();
-
-      // TODO fix it
-      StateModelDefinition stateModelDef = null;
-      // cache.getStateModelDef(resource.getStateModelDefRef());
-
-      for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
-        // TODO fix it
-        Map<ParticipantId, State> instanceStateMap = null;
-        // bestPossibleStateOutput.getInstanceStateMap(resourceId, partition);
-
-        // we should generate message based on the desired-state priority
-        // so keep generated messages in a temp map keyed by state
-        // desired-state->list of generated-messages
-        Map<State, List<Message>> messageMap = new HashMap<State, List<Message>>();
-
-        for (ParticipantId participantId : instanceStateMap.keySet()) {
-          State desiredState = instanceStateMap.get(participantId);
-
-          State currentState =
-              currentStateOutput.getCurrentState(resourceId, partitionId, participantId);
-          if (currentState == null) {
-            // TODO fix it
-            // currentState = stateModelDef.getInitialStateString();
-          }
-
-          if (desiredState.equals(currentState)) {
-            continue;
-          }
-
-          State pendingState =
-              currentStateOutput.getPendingState(resourceId, partitionId, participantId);
-
-          // TODO fix it
-          State nextState = new State("");
-          // stateModelDef.getNextStateForTransition(currentState, desiredState);
-          if (nextState == null) {
-            LOG.error("Unable to find a next state for partition: " + partitionId
-                + " from stateModelDefinition"
-                + stateModelDef.getClass() + " from:" + currentState + " to:" + desiredState);
-            continue;
-          }
-
-          if (pendingState != null) {
-            if (nextState.equals(pendingState)) {
-              LOG.debug("Message already exists for " + participantId + " to transit "
-                  + partitionId + " from " + currentState + " to " + nextState);
-            } else if (currentState.equals(pendingState)) {
-              LOG.info("Message hasn't been removed for " + participantId + " to transit"
-                  + partitionId + " to " + pendingState + ", desiredState: "
-                  + desiredState);
-            } else {
-              LOG.info("IdealState changed before state transition completes for " + partitionId
-                  + " on " + participantId + ", pendingState: "
-                  + pendingState + ", currentState: " + currentState + ", nextState: " + nextState);
-            }
-          } else {
-            // TODO check if instance is alive
-            SessionId sessionId =
-                cluster.getLiveParticipantMap().get(participantId).getRunningInstance()
-                    .getSessionId();
-            Message message =
-                createMessage(manager, resourceId, partitionId, participantId, currentState,
-                    nextState, sessionId, new StateModelDefId(stateModelDef.getId()), resource
-                        .getRebalancerConfig()
-                        .getStateModelFactoryId(), bucketSize);
-
-            // TODO fix this
-            // IdealState idealState = cache.getIdealState(resourceName);
-            // if (idealState != null
-            // && idealState.getStateModelDefRef().equalsIgnoreCase(
-            // DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-            // if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
-            // message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
-            // idealState.getRecord().getMapField(partition.getPartitionName()));
-            // }
-            // }
-            // Set timeout of needed
-            // String stateTransition =
-            // currentState + "-" + nextState + "_" + Message.Attributes.TIMEOUT;
-            // if (idealState != null) {
-            // String timeOutStr = idealState.getRecord().getSimpleField(stateTransition);
-            // if (timeOutStr == null
-            // && idealState.getStateModelDefRef().equalsIgnoreCase(
-            // DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-            // // scheduled task queue
-            // if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
-            // timeOutStr =
-            // idealState.getRecord().getMapField(partition.getPartitionName())
-            // .get(Message.Attributes.TIMEOUT.toString());
-            // }
-            // }
-            // if (timeOutStr != null) {
-            // try {
-            // int timeout = Integer.parseInt(timeOutStr);
-            // if (timeout > 0) {
-            // message.setExecutionTimeout(timeout);
-            // }
-            // } catch (Exception e) {
-            // logger.error("", e);
-            // }
-            // }
-            // }
-            // message.getRecord().setSimpleField("ClusterEventName", event.getName());
-
-            if (!messageMap.containsKey(desiredState)) {
-              messageMap.put(desiredState, new ArrayList<Message>());
-            }
-            messageMap.get(desiredState).add(message);
-          }
-        }
-
-        // add generated messages to output according to state priority
-        List<String> statesPriorityList = stateModelDef.getStatesPriorityStringList();
-        for (String state : statesPriorityList) {
-          if (messageMap.containsKey(state)) {
-            for (Message message : messageMap.get(state)) {
-              // TODO fix it
-              // output.addMessage(resourceId, partitionId, message);
-            }
-          }
-        }
-
-      } // end of for-each-partition
-    }
-    event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
-  }
-
-  private Message createMessage(HelixManager manager, ResourceId resourceId,
-      PartitionId partitionId, ParticipantId participantId, State currentState, State nextState,
-      SessionId sessionId, StateModelDefId stateModelDefId,
-      StateModelFactoryId stateModelFactoryId, int bucketSize) {
-  // MessageId uuid = Id.message(UUID.randomUUID().toString());
-  // Message message = new Message(MessageType.STATE_TRANSITION, uuid);
-  // message.setSrcName(manager.getInstanceName());
-  // message.setTgtName(instanceName);
-  // message.setMsgState(MessageState.NEW);
-  // message.setPartitionId(Id.partition(partitionName));
-  // message.setResourceId(Id.resource(resourceName));
-  // message.setFromState(State.from(currentState));
-  // message.setToState(State.from(nextState));
-  // message.setTgtSessionId(Id.session(sessionId));
-  // message.setSrcSessionId(Id.session(manager.getSessionId()));
-  // message.setStateModelDef(Id.stateModelDef(stateModelDefName));
-  // message.setStateModelFactoryName(stateModelFactoryName);
-  // message.setBucketSize(bucketSize);
-  //
-  // return message;
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
new file mode 100644
index 0000000..0a72dc0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
@@ -0,0 +1,211 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.MessageId;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.SessionId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.StateModelFactoryId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * Compares the currentState, pendingState with IdealState and generate messages
+ */
+public class NewMessageGenerationStage extends AbstractBaseStage {
+  private static Logger LOG = Logger.getLogger(NewMessageGenerationStage.class);
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    HelixManager manager = event.getAttribute("helixmanager");
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
+        event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
+    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    NewCurrentStateOutput currentStateOutput =
+        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    NewBestPossibleStateOutput bestPossibleStateOutput =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+    if (manager == null || cluster == null || resourceMap == null || currentStateOutput == null
+        || bestPossibleStateOutput == null) {
+      throw new StageException("Missing attributes in event:" + event
+          + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
+    }
+
+    NewMessageOutput output = new NewMessageOutput();
+
+    for (ResourceId resourceId : resourceMap.keySet()) {
+      Resource resource = resourceMap.get(resourceId);
+      int bucketSize = resource.getRebalancerConfig().getBucketSize();
+
+      StateModelDefinition stateModelDef =
+          stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
+
+      ResourceAssignment resourceAssignment =
+          bestPossibleStateOutput.getResourceAssignment(resourceId);
+      for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+        Map<ParticipantId, State> instanceStateMap = resourceAssignment.getReplicaMap(partitionId);
+
+        // we should generate message based on the desired-state priority
+        // so keep generated messages in a temp map keyed by state
+        // desired-state->list of generated-messages
+        Map<State, List<Message>> messageMap = new HashMap<State, List<Message>>();
+
+        for (ParticipantId participantId : instanceStateMap.keySet()) {
+          State desiredState = instanceStateMap.get(participantId);
+
+          State currentState =
+              currentStateOutput.getCurrentState(resourceId, partitionId, participantId);
+          if (currentState == null) {
+            currentState = stateModelDef.getInitialState();
+          }
+
+          if (desiredState.equals(currentState)) {
+            continue;
+          }
+
+          State pendingState =
+              currentStateOutput.getPendingState(resourceId, partitionId, participantId);
+
+          // TODO fix it
+          State nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
+          if (nextState == null) {
+            LOG.error("Unable to find a next state for partition: " + partitionId
+                + " from stateModelDefinition" + stateModelDef.getClass() + " from:" + currentState
+                + " to:" + desiredState);
+            continue;
+          }
+
+          if (pendingState != null) {
+            if (nextState.equals(pendingState)) {
+              LOG.debug("Message already exists for " + participantId + " to transit "
+                  + partitionId + " from " + currentState + " to " + nextState);
+            } else if (currentState.equals(pendingState)) {
+              LOG.info("Message hasn't been removed for " + participantId + " to transit"
+                  + partitionId + " to " + pendingState + ", desiredState: " + desiredState);
+            } else {
+              LOG.info("IdealState changed before state transition completes for " + partitionId
+                  + " on " + participantId + ", pendingState: " + pendingState + ", currentState: "
+                  + currentState + ", nextState: " + nextState);
+            }
+          } else {
+            // TODO check if instance is alive
+            SessionId sessionId =
+                cluster.getLiveParticipantMap().get(participantId).getRunningInstance()
+                    .getSessionId();
+            Message message =
+                createMessage(manager, resourceId, partitionId, participantId, currentState,
+                    nextState, sessionId, new StateModelDefId(stateModelDef.getId()), resource
+                        .getRebalancerConfig().getStateModelFactoryId(), bucketSize);
+
+            // TODO refactor set timeout logic, it's really messy
+            RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+            if (rebalancerConfig != null
+                && rebalancerConfig.getStateModelDefId().stringify()
+                    .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+              if (resource.getPartitionSet().size() > 0) {
+                // TODO refactor it
+                message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
+                    resource.getSchedulerTaskConfig().getTaskConfig(partitionId));
+              }
+            }
+
+            // Set timeout of needed
+            String stateTransition =
+                currentState + "-" + nextState + "_" + Message.Attributes.TIMEOUT;
+            if (resource.getSchedulerTaskConfig() != null) {
+              Integer timeout =
+                  resource.getSchedulerTaskConfig().getTimeout(stateTransition, partitionId);
+              if (timeout != null && timeout > 0) {
+                message.setExecutionTimeout(timeout);
+              }
+            }
+            message.getRecord().setSimpleField("ClusterEventName", event.getName());
+
+            if (!messageMap.containsKey(desiredState)) {
+              messageMap.put(desiredState, new ArrayList<Message>());
+            }
+            messageMap.get(desiredState).add(message);
+          }
+        }
+
+        // add generated messages to output according to state priority
+        List<State> statesPriorityList = stateModelDef.getStatesPriorityList();
+        for (State state : statesPriorityList) {
+          if (messageMap.containsKey(state)) {
+            for (Message message : messageMap.get(state)) {
+              output.addMessage(resourceId, partitionId, message);
+            }
+          }
+        }
+
+      } // end of for-each-partition
+    }
+    event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
+    // System.out.println("output: " + output);
+  }
+
+  private Message createMessage(HelixManager manager, ResourceId resourceId,
+      PartitionId partitionId, ParticipantId participantId, State currentState, State nextState,
+      SessionId participantSessionId, StateModelDefId stateModelDefId,
+      StateModelFactoryId stateModelFactoryId, int bucketSize) {
+    MessageId uuid = Id.message(UUID.randomUUID().toString());
+    Message message = new Message(MessageType.STATE_TRANSITION, uuid);
+    message.setSrcName(manager.getInstanceName());
+    message.setTgtName(participantId.stringify());
+    message.setMsgState(MessageState.NEW);
+    message.setPartitionId(partitionId);
+    message.setResourceId(resourceId);
+    message.setFromState(currentState);
+    message.setToState(nextState);
+    message.setTgtSessionId(participantSessionId);
+    message.setSrcSessionId(Id.session(manager.getSessionId()));
+    message.setStateModelDef(stateModelDefId);
+    message.setStateModelFactoryName(stateModelFactoryId.stringify());
+    message.setBucketSize(bucketSize);
+
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageOutput.java
new file mode 100644
index 0000000..3dd5211
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageOutput.java
@@ -0,0 +1,75 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.model.Message;
+
+public class NewMessageOutput {
+
+  private final Map<ResourceId, Map<PartitionId, List<Message>>> _messagesMap;
+
+  public NewMessageOutput() {
+    _messagesMap = new HashMap<ResourceId, Map<PartitionId, List<Message>>>();
+
+  }
+
+  public void addMessage(ResourceId resourceId, PartitionId partitionId, Message message) {
+    if (!_messagesMap.containsKey(resourceId)) {
+      _messagesMap.put(resourceId, new HashMap<PartitionId, List<Message>>());
+    }
+    if (!_messagesMap.get(resourceId).containsKey(partitionId)) {
+      _messagesMap.get(resourceId).put(partitionId, new ArrayList<Message>());
+
+    }
+    _messagesMap.get(resourceId).get(partitionId).add(message);
+
+  }
+
+  public void setMessages(ResourceId resourceId, PartitionId partitionId,
+      List<Message> selectedMessages) {
+    if (!_messagesMap.containsKey(resourceId)) {
+      _messagesMap.put(resourceId, new HashMap<PartitionId, List<Message>>());
+    }
+    _messagesMap.get(resourceId).put(partitionId, selectedMessages);
+
+  }
+
+  public List<Message> getMessages(ResourceId resourceId, PartitionId partitionId) {
+    Map<PartitionId, List<Message>> map = _messagesMap.get(resourceId);
+    if (map != null) {
+      return map.get(partitionId);
+    }
+    return Collections.emptyList();
+
+  }
+
+  @Override
+  public String toString() {
+    return _messagesMap.toString();
+  }
+}