You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/09/04 22:15:01 UTC

[2/2] git commit: [HELIX-109] Review Helix model package, first half of rebalance-pipeline refactor

[HELIX-109] Review Helix model package, first half of rebalance-pipeline refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/9c7de4c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/9c7de4c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/9c7de4c3

Branch: refs/heads/helix-logical-model
Commit: 9c7de4c33ec57fc677a8b05153655a2306b14c68
Parents: 75b534d
Author: zzhang <zz...@apache.org>
Authored: Wed Sep 4 13:14:33 2013 -0700
Committer: zzhang <zz...@apache.org>
Committed: Wed Sep 4 13:14:33 2013 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/helix/api/Id.java  |  15 +-
 .../apache/helix/api/ParticipantAccessor.java   |   2 +-
 .../org/apache/helix/api/RebalancerConfig.java  |  74 ++++-
 .../java/org/apache/helix/api/Resource.java     |  69 +++--
 .../org/apache/helix/api/ResourceAccessor.java  |   2 +
 .../main/java/org/apache/helix/api/State.java   |   6 +-
 .../helix/api/StateModelDefinitionAccessor.java |  65 +++++
 .../apache/helix/api/StateModelFactoryId.java   |  34 +++
 .../stages/NewBestPossibleStateCalcStage.java   | 140 +++++++++
 .../stages/NewBestPossibleStateOutput.java      |  19 ++
 .../stages/NewCurrentStateComputationStage.java | 143 +++++++++
 .../stages/NewCurrentStateOutput.java           | 242 ++++++++++++++++
 .../stages/NewMessageGenerationPhase.java       | 233 +++++++++++++++
 .../stages/NewMessageSelectionStage.java        | 290 +++++++++++++++++++
 .../stages/NewMessageThrottleStage.java         | 196 +++++++++++++
 .../stages/NewRebalanceIdealStateStage.java     |  84 ++++++
 .../stages/NewResourceComputationStage.java     | 108 +++++++
 .../stages/NewTaskAssignmentStage.java          | 154 ++++++++++
 .../org/apache/helix/api/TestNewStages.java     | 142 +++++++++
 19 files changed, 1990 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/api/Id.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Id.java b/helix-core/src/main/java/org/apache/helix/api/Id.java
index 7024d22..6c9556a 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Id.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Id.java
@@ -20,7 +20,7 @@ package org.apache.helix.api;
  */
 
 /**
- * 
+ *
  */
 public abstract class Id implements Comparable<Id> {
   public abstract String stringify();
@@ -35,7 +35,7 @@ public abstract class Id implements Comparable<Id> {
     if (that instanceof Id) {
       return this.stringify().equals(((Id) that).stringify());
     } else if (that instanceof String) {
-      return this.stringify().equals((String) that);
+      return this.stringify().equals(that);
     }
     return false;
   }
@@ -149,6 +149,17 @@ public abstract class Id implements Comparable<Id> {
   }
 
   /**
+   * @param stateModelFactoryId
+   * @return
+   */
+  public static StateModelFactoryId stateModelFactory(String stateModelFactoryId) {
+    if (stateModelFactoryId == null) {
+      return null;
+    }
+    return new StateModelFactoryId(stateModelFactoryId);
+  }
+
+  /**
    * Get a concrete message id
    * @param messageId string message identifier
    * @return MsgId

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
index 4bc502e..d2ae927 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
@@ -321,7 +321,7 @@ public class ParticipantAccessor {
     }
 
     return new Participant(participantId, hostName, port, isEnabled, disabledPartitionIdSet, tags,
-        runningInstance, null, msgMap);
+        runningInstance, curStateMap, msgMap);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
index e2e33eb..2baf63b 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
@@ -35,9 +35,13 @@ public class RebalancerConfig {
   private final int _replicaCount;
   private final String _participantGroupTag;
   private final int _maxPartitionsPerParticipant;
+  private final int _bucketSize;
+  private final boolean _batchMessageMode;
+  private final StateModelFactoryId _stateModelFactoryId;
 
   public RebalancerConfig(RebalanceMode mode, RebalancerRef rebalancerRef,
-      StateModelDefId stateModelDefId, ResourceAssignment resourceAssignment) {
+      StateModelDefId stateModelDefId, ResourceAssignment resourceAssignment, int bucketSize,
+      boolean batchMessageMode, StateModelFactoryId stateModelFactoryId) {
     _rebalancerMode = mode;
     _rebalancerRef = rebalancerRef;
     _stateModelDefId = stateModelDefId;
@@ -46,6 +50,9 @@ public class RebalancerConfig {
     _replicaCount = 0; // TODO: stub
     _participantGroupTag = null; // TODO: stub
     _maxPartitionsPerParticipant = Integer.MAX_VALUE; // TODO: stub
+    _bucketSize = bucketSize;
+    _batchMessageMode = batchMessageMode;
+    _stateModelFactoryId = stateModelFactoryId;
   }
 
   /**
@@ -114,6 +121,35 @@ public class RebalancerConfig {
   }
 
   /**
+   * Get bucket size
+   * @return bucket size
+   */
+  public int getBucketSize() {
+    return _bucketSize;
+  }
+
+  /**
+   * Get batch message mode
+   * @return true if in batch message mode, false otherwise
+   */
+  public boolean getBatchMessageMode() {
+    return _batchMessageMode;
+  }
+
+  /**
+   * Get state model factory id
+   * @return state model factory id
+   */
+  public StateModelFactoryId getStateModelFactoryId() {
+    return _stateModelFactoryId;
+  }
+
+  // TODO impl this
+  public String getRebalancerClassName() {
+    throw new UnsupportedOperationException("impl this");
+  }
+
+  /**
    * Assembles a RebalancerConfig
    */
   public static class Builder {
@@ -121,6 +157,9 @@ public class RebalancerConfig {
     private RebalancerRef _rebalancerRef;
     private StateModelDefId _stateModelDefId;
     private ResourceAssignment _resourceAssignment;
+    private int _bucketSize;
+    private boolean _batchMessageMode;
+    private StateModelFactoryId _stateModelFactoryId;
 
     /**
      * Set the rebalancer mode
@@ -162,11 +201,42 @@ public class RebalancerConfig {
     }
 
     /**
+     * Set bucket size
+     * @param bucketSize
+     * @return Builder
+     */
+    public Builder bucketSize(int bucketSize) {
+      _bucketSize = bucketSize;
+      return this;
+    }
+
+    /**
+     * Set batch message mode
+     * @param batchMessageMode
+     * @return Builder
+     */
+    public Builder batchMessageMode(boolean batchMessageMode) {
+      _batchMessageMode = batchMessageMode;
+      return this;
+    }
+
+    /**
+     * Set state model factory
+     * @param stateModelFactoryId
+     * @return Builder
+     */
+    public Builder stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+      _stateModelFactoryId = stateModelFactoryId;
+      return this;
+    }
+
+    /**
      * Assemble a RebalancerConfig
      * @return a fully defined rebalancer configuration
      */
     public RebalancerConfig build() {
-      return new RebalancerConfig(_mode, _rebalancerRef, _stateModelDefId, _resourceAssignment);
+      return new RebalancerConfig(_mode, _rebalancerRef, _stateModelDefId, _resourceAssignment,
+          _bucketSize, _batchMessageMode, _stateModelFactoryId);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index a916025..f976fad 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -19,13 +19,16 @@ package org.apache.helix.api;
  * under the License.
  */
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceAssignment;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 /**
@@ -35,32 +38,28 @@ public class Resource {
   private final ResourceId _id;
   private final RebalancerConfig _rebalancerConfig;
 
-  private final Set<Partition> _partitionSet;
+  private final Map<PartitionId, Partition> _partitionMap;
 
   private final ExternalView _externalView;
   private final ExternalView _pendingExternalView;
 
-  // TODO move construct logic to ResourceAccessor
   /**
    * Construct a resource
    * @param idealState
    * @param currentStateMap map of participant-id to current state
    */
-  public Resource(ResourceId id, IdealState idealState, ResourceAssignment rscAssignment) {
+  public Resource(ResourceId id, IdealState idealState, ResourceAssignment resourceAssignment) {
     _id = id;
-    // _rebalancerMode = idealState.getRebalanceMode();
-    // _rebalancerRef = new RebalancerRef(idealState.getRebalancerClassName());
-    // _stateModelDefId = new StateModelDefId(idealState.getStateModelDefRef());
-    _rebalancerConfig = null;
+    _rebalancerConfig = new RebalancerConfig(idealState.getRebalanceMode(), idealState.getRebalancerRef(),
+            idealState.getStateModelDefId(), resourceAssignment, idealState.getBucketSize(),
+            idealState.getBatchMessageMode(), Id.stateModelFactory(
+                idealState.getStateModelFactoryName()));
 
-    Set<Partition> partitionSet = new HashSet<Partition>();
+    Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
     for (PartitionId partitionId : idealState.getPartitionSet()) {
-      partitionSet.add(new Partition(partitionId));
+      partitionMap.put(partitionId, new Partition(partitionId));
     }
-    _partitionSet = ImmutableSet.copyOf(partitionSet);
-
-    // TODO
-    // _resourceAssignment = null;
+    _partitionMap = ImmutableMap.copyOf(partitionMap);
 
     _externalView = null;
     _pendingExternalView = null; // TODO: stub
@@ -74,10 +73,11 @@ public class Resource {
    * @param pendingExternalView pending external view based on unprocessed messages
    * @param rebalancerConfig configuration properties for rebalancing this resource
    */
-  public Resource(ResourceId id, Set<Partition> partitionSet, ExternalView externalView,
+  public Resource(ResourceId id, Map<PartitionId, Partition> partitionMap,
+      ExternalView externalView,
       ExternalView pendingExternalView, RebalancerConfig rebalancerConfig) {
     _id = id;
-    _partitionSet = ImmutableSet.copyOf(partitionSet);
+    _partitionMap = ImmutableMap.copyOf(partitionMap);
     _externalView = externalView;
     _pendingExternalView = pendingExternalView;
     _rebalancerConfig = rebalancerConfig;
@@ -87,8 +87,25 @@ public class Resource {
    * Get the set of partitions of the resource
    * @return set of partitions or empty set if none
    */
+  public Map<PartitionId, Partition> getPartitionMap() {
+    return _partitionMap;
+  }
+
+  /**
+   * @param partitionId
+   * @return
+   */
+  public Partition getPartition(PartitionId partitionId) {
+    return _partitionMap.get(partitionId);
+  }
+
+  /**
+   * @return
+   */
   public Set<Partition> getPartitionSet() {
-    return _partitionSet;
+    Set<Partition> partitionSet = new HashSet<Partition>();
+    partitionSet.addAll(_partitionMap.values());
+    return ImmutableSet.copyOf(partitionSet);
   }
 
   /**
@@ -120,7 +137,7 @@ public class Resource {
    */
   public static class Builder {
     private final ResourceId _id;
-    private final Set<Partition> _partitionSet;
+    private final Map<PartitionId, Partition> _partitionMap;
     private ExternalView _externalView;
     private ExternalView _pendingExternalView;
     private RebalancerConfig _rebalancerConfig;
@@ -131,7 +148,7 @@ public class Resource {
      */
     public Builder(ResourceId id) {
       _id = id;
-      _partitionSet = new HashSet<Partition>();
+      _partitionMap = new HashMap<PartitionId, Partition>();
     }
 
     /**
@@ -140,7 +157,19 @@ public class Resource {
      * @return Builder
      */
     public Builder addPartition(Partition partition) {
-      _partitionSet.add(partition);
+      _partitionMap.put(partition.getId(), partition);
+      return this;
+    }
+
+    /**
+     * Add a set of partitions
+     * @param partitions
+     * @return Builder
+     */
+    public Builder addPartitions(Set<Partition> partitions) {
+      for (Partition partition : partitions) {
+        addPartition(partition);
+      }
       return this;
     }
 
@@ -179,7 +208,7 @@ public class Resource {
      * @return instantiated Resource
      */
     public Resource build() {
-      return new Resource(_id, _partitionSet, _externalView, _pendingExternalView,
+      return new Resource(_id, _partitionMap, _externalView, _pendingExternalView,
           _rebalancerConfig);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
index ba7cd3d..c3bcb37 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
@@ -23,6 +23,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.ResourceAssignment;
 
 public class ResourceAccessor {
@@ -69,4 +70,5 @@ public class ResourceAccessor {
   public void dropExternalView(ResourceId resourceId) {
     _accessor.removeProperty(_keyBuilder.externalView(resourceId.stringify()));
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/api/State.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/State.java b/helix-core/src/main/java/org/apache/helix/api/State.java
index b8c38ea..5d0af41 100644
--- a/helix-core/src/main/java/org/apache/helix/api/State.java
+++ b/helix-core/src/main/java/org/apache/helix/api/State.java
@@ -22,13 +22,13 @@ import org.apache.helix.HelixDefinedState;
  */
 
 /**
- * 
+ *
  */
 public class State {
   private final String _state;
 
   public State(String state) {
-    _state = state;
+    _state = state.toUpperCase();
   }
 
   @Override
@@ -41,7 +41,7 @@ public class State {
     if (that instanceof State) {
       return this.toString().equals(((State) that).toString());
     } else if (that instanceof String) {
-      return _state.equals((String) that);
+      return _state.equals(that);
     }
     return false;
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java b/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
new file mode 100644
index 0000000..89b1979
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
@@ -0,0 +1,65 @@
+package org.apache.helix.api;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.ImmutableMap;
+
+public class StateModelDefinitionAccessor {
+  private static Logger LOG = Logger.getLogger(StateModelDefinitionAccessor.class);
+
+  private final HelixDataAccessor _accessor;
+  private final PropertyKey.Builder _keyBuilder;
+  private final ClusterId _clusterId;
+
+  /**
+   * @param clusterId
+   * @param accessor
+   */
+  public StateModelDefinitionAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
+    _accessor = accessor;
+    _keyBuilder = accessor.keyBuilder();
+    _clusterId = clusterId;
+  }
+
+  /**
+   * @return
+   */
+  public Map<StateModelDefId, StateModelDefinition> readStateModelDefinitions() {
+    Map<String, StateModelDefinition> stateModelDefs =
+        _accessor.getChildValuesMap(_keyBuilder.stateModelDefs());
+    Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
+        new HashMap<StateModelDefId, StateModelDefinition>();
+
+    for (String stateModelDefName : stateModelDefs.keySet()) {
+      stateModelDefMap.put(new StateModelDefId(stateModelDefName),
+          stateModelDefs.get(stateModelDefName));
+    }
+
+    return ImmutableMap.copyOf(stateModelDefMap);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/api/StateModelFactoryId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/StateModelFactoryId.java b/helix-core/src/main/java/org/apache/helix/api/StateModelFactoryId.java
new file mode 100644
index 0000000..37be836
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/StateModelFactoryId.java
@@ -0,0 +1,34 @@
+package org.apache.helix.api;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class StateModelFactoryId extends Id {
+  private final String _id;
+
+  public StateModelFactoryId(String id) {
+    _id = id;
+  }
+
+  @Override
+  public String stringify() {
+    return _id;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
new file mode 100644
index 0000000..995bb74
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -0,0 +1,140 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.AutoRebalancer;
+import org.apache.helix.controller.rebalancer.CustomRebalancer;
+import org.apache.helix.controller.rebalancer.NewAutoRebalancer;
+import org.apache.helix.controller.rebalancer.NewCustomRebalancer;
+import org.apache.helix.controller.rebalancer.NewRebalancer;
+import org.apache.helix.controller.rebalancer.NewSemiAutoRebalancer;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * For partition compute best possible (instance,state) pair based on
+ * IdealState,StateModel,LiveInstance
+ */
+public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
+  private static final Logger LOG = Logger.getLogger(NewBestPossibleStateCalcStage.class
+      .getName());
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    long startTime = System.currentTimeMillis();
+    LOG.info("START BestPossibleStateCalcStage.process()");
+
+    NewCurrentStateOutput currentStateOutput =
+        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+
+    if (currentStateOutput == null || resourceMap == null || cluster == null) {
+      throw new StageException("Missing attributes in event:" + event
+          + ". Requires CURRENT_STATE|RESOURCES|DataCache");
+    }
+
+    NewBestPossibleStateOutput bestPossibleStateOutput =
+        compute(cluster, event, resourceMap, currentStateOutput);
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
+
+    long endTime = System.currentTimeMillis();
+    LOG.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
+  }
+
+  // TODO check this
+  private NewBestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
+      Map<ResourceId, Resource> resourceMap, NewCurrentStateOutput currentStateOutput) {
+    // for each ideal state
+    // read the state model def
+    // for each resource
+    // get the preference list
+    // for each instanceName check if its alive then assign a state
+    // ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+
+    NewBestPossibleStateOutput output = new NewBestPossibleStateOutput();
+
+    for (ResourceId resourceId : resourceMap.keySet()) {
+      LOG.debug("Processing resource:" + resourceId);
+
+      Resource resource = resourceMap.get(resourceId);
+      // Ideal state may be gone. In that case we need to get the state model name
+      // from the current state
+      // IdealState idealState = cache.getIdealState(resourceName);
+
+      Resource existResource = cluster.getResource(resourceId);
+      if (existResource == null) {
+        // if ideal state is deleted, use an empty one
+        LOG.info("resource:" + resourceId + " does not exist anymore");
+        // TODO
+        // existResource = new Resource();
+      }
+
+      RebalancerConfig rebalancerConfig = existResource.getRebalancerConfig();
+      NewRebalancer rebalancer = null;
+      if (rebalancerConfig.getRebalancerMode() == RebalanceMode.USER_DEFINED
+          && rebalancerConfig.getRebalancerClassName() != null) {
+        String rebalancerClassName = rebalancerConfig.getRebalancerClassName();
+        LOG.info("resource " + resourceId + " use idealStateRebalancer " + rebalancerClassName);
+        try {
+          rebalancer =
+              (NewRebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
+        } catch (Exception e) {
+          LOG.warn("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
+        }
+      }
+      if (rebalancer == null) {
+        if (rebalancerConfig.getRebalancerMode() == RebalanceMode.FULL_AUTO) {
+          rebalancer = new NewAutoRebalancer();
+        } else if (rebalancerConfig.getRebalancerMode() == RebalanceMode.SEMI_AUTO) {
+          rebalancer = new NewSemiAutoRebalancer();
+        } else {
+          rebalancer = new NewCustomRebalancer();
+        }
+      }
+
+      // TODO pass state model definition
+      ResourceAssignment resourceAssignment =
+          rebalancer.computeResourceMapping(resource, cluster, null);
+
+      output.setResourceAssignment(resourceId, resourceAssignment);
+    }
+
+    return output;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
new file mode 100644
index 0000000..474f463
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
@@ -0,0 +1,19 @@
+package org.apache.helix.controller.stages;
+
+import java.util.Map;
+
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.model.ResourceAssignment;
+
+public class NewBestPossibleStateOutput {
+
+  Map<ResourceId, ResourceAssignment> _resourceAssignmentMap;
+
+  /**
+   * @param resourceId
+   * @param resourceAssignment
+   */
+  public void setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
+    _resourceAssignmentMap.put(resourceId, resourceAssignment);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
new file mode 100644
index 0000000..0f8c33e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
@@ -0,0 +1,143 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.MessageId;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.SessionId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+
+/**
+ * For each LiveInstances select currentState and message whose sessionId matches
+ * sessionId from LiveInstance Get Partition,State for all the resources computed in
+ * previous State [ResourceComputationStage]
+ */
+public class NewCurrentStateComputationStage extends AbstractBaseStage {
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+
+    if (cluster == null || resourceMap == null) {
+      throw new StageException("Missing attributes in event:" + event
+          + ". Requires DataCache|RESOURCE");
+    }
+
+    NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
+
+    for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
+      ParticipantId participantId = liveParticipant.getId();
+
+      // add pending messages
+      Map<MessageId, Message> instanceMsgs = liveParticipant.getMessageMap();
+      for (Message message : instanceMsgs.values()) {
+        if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())) {
+          continue;
+        }
+
+        if (!liveParticipant.getRunningInstance().getSessionId().equals(message.getTgtSessionId())) {
+          continue;
+        }
+
+        ResourceId resourceId = message.getResourceId();
+        Resource resource = resourceMap.get(resourceId);
+        if (resource == null) {
+          continue;
+        }
+
+        if (!message.getBatchMessageMode()) {
+          PartitionId partitionId = message.getPartitionId();
+          Partition partition = resource.getPartition(partitionId);
+          if (partition != null) {
+            currentStateOutput.setPendingState(resourceId, partitionId, participantId,
+                message.getToState());
+          } else {
+            // log
+          }
+        } else {
+          List<PartitionId> partitionNames = message.getPartitionIds();
+          if (!partitionNames.isEmpty()) {
+            for (PartitionId partitionId : partitionNames) {
+              Partition partition = resource.getPartition(partitionId);
+              if (partition != null) {
+                currentStateOutput.setPendingState(resourceId, partitionId,
+                    participantId,
+                    message.getToState());
+              } else {
+                // log
+              }
+            }
+          }
+        }
+      }
+
+      // add current state
+      SessionId sessionId = liveParticipant.getRunningInstance().getSessionId();
+      Map<ResourceId, CurrentState> curStateMap = liveParticipant.getCurrentStateMap();
+      for (CurrentState curState : curStateMap.values()) {
+        if (!sessionId.equals(curState.getSessionId())) {
+          continue;
+        }
+
+        ResourceId resourceId = curState.getResourceId();
+        StateModelDefId stateModelDefId = curState.getStateModelDefId();
+        Resource resource = resourceMap.get(resourceId);
+        if (resource == null) {
+          continue;
+        }
+
+        if (stateModelDefId != null) {
+          currentStateOutput.setResourceStateModelDef(resourceId, stateModelDefId);
+        }
+
+        currentStateOutput.setBucketSize(resourceId, curState.getBucketSize());
+
+        Map<PartitionId, State> partitionStateMap = curState.getPartitionStateMap();
+        for (PartitionId partitionId : partitionStateMap.keySet()) {
+          Partition partition = resource.getPartition(partitionId);
+          if (partition != null) {
+            currentStateOutput.setCurrentState(resourceId, partitionId, participantId,
+                curState.getState(partitionId));
+
+          } else {
+            // log
+          }
+        }
+      }
+    }
+
+    event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
new file mode 100644
index 0000000..417512e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
@@ -0,0 +1,242 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Partition;
+
+public class NewCurrentStateOutput {
+  /**
+   * map of resource-id to map of partition-id to map of participant-id to state
+   * represent current-state for the participant
+   */
+  private final Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> _currentStateMap;
+
+  /**
+   * map of resource-id to map of partition-id to map of participant-id to state
+   * represent pending messages for the participant
+   */
+  private final Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> _pendingStateMap;
+
+  /**
+   * map of resource-id to state model definition id
+   */
+  private final Map<ResourceId, StateModelDefId> _resourceStateModelMap;
+
+  /**
+   * map of resource-id to current-state config's
+   */
+  private final Map<ResourceId, CurrentState> _curStateMetaMap;
+
+  /**
+   * construct
+   */
+  public NewCurrentStateOutput() {
+    _currentStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
+    _pendingStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
+    _resourceStateModelMap = new HashMap<ResourceId, StateModelDefId>();
+    _curStateMetaMap = new HashMap<ResourceId, CurrentState>();
+
+  }
+
+  /**
+   * @param resourceId
+   * @param stateModelDefId
+   */
+  public void setResourceStateModelDef(ResourceId resourceId, StateModelDefId stateModelDefId) {
+    _resourceStateModelMap.put(resourceId, stateModelDefId);
+  }
+
+  /**
+   * @param resourceId
+   * @return
+   */
+  public StateModelDefId getResourceStateModelDef(ResourceId resourceId) {
+    return _resourceStateModelMap.get(resourceId);
+  }
+
+  /**
+   * @param resourceId
+   * @param bucketSize
+   */
+  public void setBucketSize(ResourceId resourceId, int bucketSize) {
+    CurrentState curStateMeta = _curStateMetaMap.get(resourceId);
+    if (curStateMeta == null) {
+      curStateMeta = new CurrentState(resourceId);
+      _curStateMetaMap.put(resourceId, curStateMeta);
+    }
+    curStateMeta.setBucketSize(bucketSize);
+  }
+
+  /**
+   * @param resourceId
+   * @return
+   */
+  public int getBucketSize(ResourceId resourceId) {
+    int bucketSize = 0;
+    CurrentState curStateMeta = _curStateMetaMap.get(resourceId);
+    if (curStateMeta != null) {
+      bucketSize = curStateMeta.getBucketSize();
+    }
+
+    return bucketSize;
+  }
+
+  /**
+   * @param stateMap
+   * @param resourceId
+   * @param partitionId
+   * @param participantId
+   * @param state
+   */
+  static void setStateMap(Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap,
+      ResourceId resourceId, PartitionId partitionId, ParticipantId participantId, State state) {
+    if (!stateMap.containsKey(resourceId)) {
+      stateMap.put(resourceId, new HashMap<PartitionId, Map<ParticipantId, State>>());
+    }
+
+    if (!stateMap.get(resourceId).containsKey(partitionId)) {
+      stateMap.get(resourceId).put(partitionId, new HashMap<ParticipantId, State>());
+    }
+    stateMap.get(resourceId).get(partitionId).put(participantId, state);
+  }
+
+  /**
+   * @param stateMap
+   * @param resourceId
+   * @param partitionId
+   * @param participantId
+   * @return state
+   */
+  static State getState(Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap,
+    ResourceId resourceId, PartitionId partitionId, ParticipantId participantId) {
+    Map<PartitionId, Map<ParticipantId, State>> map = stateMap.get(resourceId);
+    if (map != null) {
+      Map<ParticipantId, State> instanceStateMap = map.get(partitionId);
+      if (instanceStateMap != null) {
+        return instanceStateMap.get(participantId);
+      }
+    }
+    return null;
+
+  }
+
+  /**
+   * @param stateMap
+   * @param resourceId
+   * @param partitionId
+   * @return
+   */
+  static Map<ParticipantId, State> getStateMap(
+      Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap, ResourceId resourceId,
+      PartitionId partitionId) {
+    if (stateMap.containsKey(resourceId)) {
+      Map<PartitionId, Map<ParticipantId, State>> map = stateMap.get(resourceId);
+      if (map.containsKey(partitionId)) {
+        return map.get(partitionId);
+      }
+    }
+    return Collections.emptyMap();
+  }
+
+  /**
+   * @param resourceId
+   * @param partitionId
+   * @param participantId
+   * @param state
+   */
+  public void setCurrentState(ResourceId resourceId, PartitionId partitionId,
+      ParticipantId participantId, State state) {
+    setStateMap(_currentStateMap, resourceId, partitionId, participantId, state);
+  }
+
+  /**
+   * @param resourceId
+   * @param partitionId
+   * @param participantId
+   * @param state
+   */
+  public void setPendingState(ResourceId resourceId, PartitionId partitionId,
+      ParticipantId participantId, State state) {
+    setStateMap(_pendingStateMap, resourceId, partitionId, participantId, state);
+  }
+
+  /**
+   * given (resource, partition, instance), returns currentState
+   * @param resourceName
+   * @param partition
+   * @param instanceName
+   * @return
+   */
+  public State getCurrentState(ResourceId resourceId, PartitionId partitionId,
+      ParticipantId participantId) {
+    return getState(_currentStateMap, resourceId, partitionId, participantId);
+  }
+
+  /**
+   * given (resource, partition, instance), returns toState
+   * @param resourceName
+   * @param partition
+   * @param instanceName
+   * @return
+   */
+  public State getPendingState(ResourceId resourceId, PartitionId partitionId,
+      ParticipantId participantId) {
+    return getState(_pendingStateMap, resourceId, partitionId, participantId);
+  }
+
+  /**
+   * @param resourceId
+   * @param partitionId
+   * @return
+   */
+  public Map<ParticipantId, State> getCurrentStateMap(ResourceId resourceId, PartitionId partitionId) {
+    return getStateMap(_currentStateMap, resourceId, partitionId);
+  }
+
+  /**
+   * @param resourceId
+   * @param partitionId
+   * @return
+   */
+  public Map<ParticipantId, State> getPendingStateMap(ResourceId resourceId, PartitionId partitionId) {
+    return getStateMap(_currentStateMap, resourceId, partitionId);
+
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("current state= ").append(_currentStateMap);
+    sb.append(", pending state= ").append(_pendingStateMap);
+    return sb.toString();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationPhase.java
new file mode 100644
index 0000000..0fdfe56
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationPhase.java
@@ -0,0 +1,233 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.MessageId;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.SessionId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.StateModelFactoryId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * Compares the currentState, pendingState with IdealState and generate messages
+ */
+public class NewMessageGenerationPhase extends AbstractBaseStage {
+  private static Logger LOG = Logger.getLogger(NewMessageGenerationPhase.class);
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    HelixManager manager = event.getAttribute("helixmanager");
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    NewCurrentStateOutput currentStateOutput =
+        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    BestPossibleStateOutput bestPossibleStateOutput =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+    if (manager == null || cluster == null || resourceMap == null || currentStateOutput == null
+        || bestPossibleStateOutput == null) {
+      throw new StageException("Missing attributes in event:" + event
+          + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
+    }
+
+    // Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+    // Map<String, String> sessionIdMap = new HashMap<String, String>();
+
+    // for (LiveInstance liveInstance : liveInstances.values()) {
+    // sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId().stringify());
+    // }
+    MessageGenerationOutput output = new MessageGenerationOutput();
+
+    for (ResourceId resourceId : resourceMap.keySet()) {
+      Resource resource = resourceMap.get(resourceId);
+      int bucketSize = resource.getRebalancerConfig().getBucketSize();
+
+      // TODO fix it
+      StateModelDefinition stateModelDef = null;
+      // cache.getStateModelDef(resource.getStateModelDefRef());
+
+      for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+        // TODO fix it
+        Map<ParticipantId, State> instanceStateMap = null;
+        // bestPossibleStateOutput.getInstanceStateMap(resourceId, partition);
+
+        // we should generate message based on the desired-state priority
+        // so keep generated messages in a temp map keyed by state
+        // desired-state->list of generated-messages
+        Map<State, List<Message>> messageMap = new HashMap<State, List<Message>>();
+
+        for (ParticipantId participantId : instanceStateMap.keySet()) {
+          State desiredState = instanceStateMap.get(participantId);
+
+          State currentState =
+              currentStateOutput.getCurrentState(resourceId, partitionId, participantId);
+          if (currentState == null) {
+            // TODO fix it
+            // currentState = stateModelDef.getInitialStateString();
+          }
+
+          if (desiredState.equals(currentState)) {
+            continue;
+          }
+
+          State pendingState =
+              currentStateOutput.getPendingState(resourceId, partitionId, participantId);
+
+          // TODO fix it
+          State nextState = new State("");
+          // stateModelDef.getNextStateForTransition(currentState, desiredState);
+          if (nextState == null) {
+            LOG.error("Unable to find a next state for partition: " + partitionId
+                + " from stateModelDefinition"
+                + stateModelDef.getClass() + " from:" + currentState + " to:" + desiredState);
+            continue;
+          }
+
+          if (pendingState != null) {
+            if (nextState.equals(pendingState)) {
+              LOG.debug("Message already exists for " + participantId + " to transit "
+                  + partitionId + " from " + currentState + " to " + nextState);
+            } else if (currentState.equals(pendingState)) {
+              LOG.info("Message hasn't been removed for " + participantId + " to transit"
+                  + partitionId + " to " + pendingState + ", desiredState: "
+                  + desiredState);
+            } else {
+              LOG.info("IdealState changed before state transition completes for " + partitionId
+                  + " on " + participantId + ", pendingState: "
+                  + pendingState + ", currentState: " + currentState + ", nextState: " + nextState);
+            }
+          } else {
+            // TODO check if instance is alive
+            SessionId sessionId =
+                cluster.getLiveParticipantMap().get(participantId).getRunningInstance()
+                    .getSessionId();
+            Message message =
+                createMessage(manager, resourceId, partitionId, participantId, currentState,
+                    nextState, sessionId, new StateModelDefId(stateModelDef.getId()), resource
+                        .getRebalancerConfig()
+                        .getStateModelFactoryId(), bucketSize);
+
+            // TODO fix this
+            // IdealState idealState = cache.getIdealState(resourceName);
+            // if (idealState != null
+            // && idealState.getStateModelDefRef().equalsIgnoreCase(
+            // DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+            // if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
+            // message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
+            // idealState.getRecord().getMapField(partition.getPartitionName()));
+            // }
+            // }
+            // Set timeout of needed
+            // String stateTransition =
+            // currentState + "-" + nextState + "_" + Message.Attributes.TIMEOUT;
+            // if (idealState != null) {
+            // String timeOutStr = idealState.getRecord().getSimpleField(stateTransition);
+            // if (timeOutStr == null
+            // && idealState.getStateModelDefRef().equalsIgnoreCase(
+            // DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+            // // scheduled task queue
+            // if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
+            // timeOutStr =
+            // idealState.getRecord().getMapField(partition.getPartitionName())
+            // .get(Message.Attributes.TIMEOUT.toString());
+            // }
+            // }
+            // if (timeOutStr != null) {
+            // try {
+            // int timeout = Integer.parseInt(timeOutStr);
+            // if (timeout > 0) {
+            // message.setExecutionTimeout(timeout);
+            // }
+            // } catch (Exception e) {
+            // logger.error("", e);
+            // }
+            // }
+            // }
+            // message.getRecord().setSimpleField("ClusterEventName", event.getName());
+
+            if (!messageMap.containsKey(desiredState)) {
+              messageMap.put(desiredState, new ArrayList<Message>());
+            }
+            messageMap.get(desiredState).add(message);
+          }
+        }
+
+        // add generated messages to output according to state priority
+        List<String> statesPriorityList = stateModelDef.getStatesPriorityStringList();
+        for (String state : statesPriorityList) {
+          if (messageMap.containsKey(state)) {
+            for (Message message : messageMap.get(state)) {
+              // TODO fix it
+              // output.addMessage(resourceId, partitionId, message);
+            }
+          }
+        }
+
+      } // end of for-each-partition
+    }
+    event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
+  }
+
+  private Message createMessage(HelixManager manager, ResourceId resourceId,
+      PartitionId partitionId, ParticipantId participantId, State currentState, State nextState,
+      SessionId sessionId, StateModelDefId stateModelDefId,
+      StateModelFactoryId stateModelFactoryId, int bucketSize) {
+  // MessageId uuid = Id.message(UUID.randomUUID().toString());
+  // Message message = new Message(MessageType.STATE_TRANSITION, uuid);
+  // message.setSrcName(manager.getInstanceName());
+  // message.setTgtName(instanceName);
+  // message.setMsgState(MessageState.NEW);
+  // message.setPartitionId(Id.partition(partitionName));
+  // message.setResourceId(Id.resource(resourceName));
+  // message.setFromState(State.from(currentState));
+  // message.setToState(State.from(nextState));
+  // message.setTgtSessionId(Id.session(sessionId));
+  // message.setSrcSessionId(Id.session(manager.getSessionId()));
+  // message.setStateModelDef(Id.stateModelDef(stateModelDefName));
+  // message.setStateModelFactoryName(stateModelFactoryName);
+  // message.setBucketSize(bucketSize);
+  //
+  // return message;
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
new file mode 100644
index 0000000..4038c69
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
@@ -0,0 +1,290 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+public class NewMessageSelectionStage extends AbstractBaseStage {
+  private static final Logger LOG = Logger.getLogger(NewMessageSelectionStage.class);
+
+  public static class Bounds {
+    private int upper;
+    private int lower;
+
+    public Bounds(int lower, int upper) {
+      this.lower = lower;
+      this.upper = upper;
+    }
+
+    public void increaseUpperBound() {
+      upper++;
+    }
+
+    public void increaseLowerBound() {
+      lower++;
+    }
+
+    public void decreaseUpperBound() {
+      upper--;
+    }
+
+    public void decreaseLowerBound() {
+      lower--;
+    }
+
+    public int getLowerBound() {
+      return lower;
+    }
+
+    public int getUpperBound() {
+      return upper;
+    }
+  }
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    NewCurrentStateOutput currentStateOutput =
+        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    MessageGenerationOutput messageGenOutput =
+        event.getAttribute(AttributeName.MESSAGES_ALL.toString());
+    if (cluster == null || resourceMap == null || currentStateOutput == null
+        || messageGenOutput == null) {
+      throw new StageException("Missing attributes in event:" + event
+          + ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
+    }
+
+    MessageSelectionStageOutput output = new MessageSelectionStageOutput();
+
+    for (ResourceId resourceId : resourceMap.keySet()) {
+      Resource resource = resourceMap.get(resourceId);
+      // TODO fix it
+      StateModelDefinition stateModelDef = null;
+      // cache.getStateModelDef(resource.getStateModelDefRef());
+
+      Map<String, Integer> stateTransitionPriorities = getStateTransitionPriorityMap(stateModelDef);
+      // IdealState idealState = cache.getIdealState(resourceName);
+      Map<String, Bounds> stateConstraints =
+          computeStateConstraints(stateModelDef, resource.getRebalancerConfig(), cluster);
+
+      // TODO fix it
+      // for (Partition partition : resource.getPartitions()) {
+      // List<Message> messages = messageGenOutput.getMessages(resourceId.stringify(), partition);
+      // List<Message> selectedMessages =
+      // selectMessages(cache.getLiveInstances(),
+      // currentStateOutput.getCurrentStateMap(resourceName, partition),
+      // currentStateOutput.getPendingStateMap(resourceName, partition), messages,
+      // stateConstraints, stateTransitionPriorities, stateModelDef.getInitialStateString());
+      // output.addMessages(resourceId.stringify(), partition, selectedMessages);
+      // }
+    }
+    event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
+  }
+
+  // TODO: This method deserves its own class. The class should not understand helix but
+  // just be
+  // able to solve the problem using the algo. I think the method is following that but if
+  // we don't move it to another class its quite easy to break that contract
+  /**
+   * greedy message selection algorithm: 1) calculate CS+PS state lower/upper-bounds 2)
+   * group messages by state transition and sorted by priority 3) from highest priority to
+   * lowest, for each message group with the same transition add message one by one and
+   * make sure state constraint is not violated update state lower/upper-bounds when a new
+   * message is selected
+   * @param currentStates
+   * @param pendingStates
+   * @param messages
+   * @param stateConstraints
+   *          : STATE -> bound (lower:upper)
+   * @param stateTransitionPriorities
+   *          : FROME_STATE-TO_STATE -> priority
+   * @return: selected messages
+   */
+  List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
+      Map<String, String> currentStates, Map<String, String> pendingStates, List<Message> messages,
+      Map<String, Bounds> stateConstraints, final Map<String, Integer> stateTransitionPriorities,
+      String initialState) {
+    if (messages == null || messages.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    List<Message> selectedMessages = new ArrayList<Message>();
+    Map<String, Bounds> bounds = new HashMap<String, Bounds>();
+
+    // count currentState, if no currentState, count as in initialState
+    for (String instance : liveInstances.keySet()) {
+      String state = initialState;
+      if (currentStates.containsKey(instance)) {
+        state = currentStates.get(instance);
+      }
+
+      if (!bounds.containsKey(state)) {
+        bounds.put(state, new Bounds(0, 0));
+      }
+      bounds.get(state).increaseLowerBound();
+      bounds.get(state).increaseUpperBound();
+    }
+
+    // count pendingStates
+    for (String instance : pendingStates.keySet()) {
+      String state = pendingStates.get(instance);
+      if (!bounds.containsKey(state)) {
+        bounds.put(state, new Bounds(0, 0));
+      }
+      // TODO: add lower bound, need to refactor pendingState to include fromState also
+      bounds.get(state).increaseUpperBound();
+    }
+
+    // group messages based on state transition priority
+    Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
+        new TreeMap<Integer, List<Message>>();
+    for (Message message : messages) {
+      State fromState = message.getFromState();
+      State toState = message.getToState();
+      String transition = fromState + "-" + toState;
+      int priority = Integer.MAX_VALUE;
+
+      if (stateTransitionPriorities.containsKey(transition)) {
+        priority = stateTransitionPriorities.get(transition);
+      }
+
+      if (!messagesGroupByStateTransitPriority.containsKey(priority)) {
+        messagesGroupByStateTransitPriority.put(priority, new ArrayList<Message>());
+      }
+      messagesGroupByStateTransitPriority.get(priority).add(message);
+    }
+
+    // select messages
+    for (List<Message> messageList : messagesGroupByStateTransitPriority.values()) {
+      for (Message message : messageList) {
+        State fromState = message.getFromState();
+        State toState = message.getToState();
+
+        if (!bounds.containsKey(fromState)) {
+          LOG.error("Message's fromState is not in currentState. message: " + message);
+          continue;
+        }
+
+        if (!bounds.containsKey(toState)) {
+          bounds.put(toState.toString(), new Bounds(0, 0));
+        }
+
+        // check lower bound of fromState
+        if (stateConstraints.containsKey(fromState)) {
+          int newLowerBound = bounds.get(fromState).getLowerBound() - 1;
+          if (newLowerBound < 0) {
+            LOG.error("Number of currentState in " + fromState
+                + " is less than number of messages transiting from " + fromState);
+            continue;
+          }
+
+          if (newLowerBound < stateConstraints.get(fromState).getLowerBound()) {
+            continue;
+          }
+        }
+
+        // check upper bound of toState
+        if (stateConstraints.containsKey(toState)) {
+          int newUpperBound = bounds.get(toState).getUpperBound() + 1;
+          if (newUpperBound > stateConstraints.get(toState).getUpperBound()) {
+            continue;
+          }
+        }
+
+        selectedMessages.add(message);
+        bounds.get(fromState).increaseLowerBound();
+        bounds.get(toState).increaseUpperBound();
+      }
+    }
+
+    return selectedMessages;
+  }
+
+  // TODO change to return Map<State, Bounds>
+  /**
+   * TODO: This code is duplicate in multiple places. Can we do it in to one place in the
+   * beginning and compute the stateConstraint instance once and re use at other places.
+   * Each IdealState must have a constraint object associated with it
+   */
+  private Map<String, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
+      RebalancerConfig rebalancerConfig, Cluster cluster) {
+    Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
+
+    List<String> statePriorityList = stateModelDefinition.getStatesPriorityStringList();
+    for (String state : statePriorityList) {
+      String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state);
+      int max = -1;
+      if ("N".equals(numInstancesPerState)) {
+        max = cluster.getLiveParticipantMap().size();
+      } else if ("R".equals(numInstancesPerState)) {
+        // idealState is null when resource has been dropped,
+        // R can't be evaluated and ignore state constraints
+        if (rebalancerConfig != null) {
+          max = rebalancerConfig.getReplicaCount();
+        }
+      } else {
+        try {
+          max = Integer.parseInt(numInstancesPerState);
+        } catch (Exception e) {
+          // use -1
+        }
+      }
+
+      if (max > -1) {
+        // if state has no constraint, will not put in map
+        stateConstraints.put(state, new Bounds(0, max));
+      }
+    }
+
+    return stateConstraints;
+  }
+
+  // TODO: if state transition priority is not provided then use lexicographical sorting
+  // so that behavior is consistent
+  private Map<String, Integer> getStateTransitionPriorityMap(StateModelDefinition stateModelDef) {
+    Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
+    List<String> stateTransitionPriorityList = stateModelDef.getStateTransitionPriorityStringList();
+    for (int i = 0; i < stateTransitionPriorityList.size(); i++) {
+      stateTransitionPriorities.put(stateTransitionPriorityList.get(i), i);
+    }
+
+    return stateTransitionPriorities;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
new file mode 100644
index 0000000..e45cd38
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
@@ -0,0 +1,196 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
+import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.ClusterConstraints.ConstraintValue;
+import org.apache.log4j.Logger;
+
+public class NewMessageThrottleStage extends AbstractBaseStage {
+  private static final Logger LOG = Logger.getLogger(NewMessageThrottleStage.class.getName());
+
+  int valueOf(String valueStr) {
+    int value = Integer.MAX_VALUE;
+
+    try {
+      ConstraintValue valueToken = ConstraintValue.valueOf(valueStr);
+      switch (valueToken) {
+      case ANY:
+        value = Integer.MAX_VALUE;
+        break;
+      default:
+        LOG.error("Invalid constraintValue token:" + valueStr + ". Use default value:"
+            + Integer.MAX_VALUE);
+        break;
+      }
+    } catch (Exception e) {
+      try {
+        value = Integer.parseInt(valueStr);
+      } catch (NumberFormatException ne) {
+        LOG.error("Invalid constraintValue string:" + valueStr + ". Use default value:"
+            + Integer.MAX_VALUE);
+      }
+    }
+    return value;
+  }
+
+  /**
+   * constraints are selected in the order of the following rules: 1) don't select
+   * constraints with CONSTRAINT_VALUE=ANY; 2) if one constraint is more specific than the
+   * other, select the most specific one 3) if a message matches multiple constraints of
+   * incomparable specificity, select the one with the minimum value 4) if a message
+   * matches multiple constraints of incomparable specificity, and they all have the same
+   * value, select the first in alphabetic order
+   */
+  Set<ConstraintItem> selectConstraints(Set<ConstraintItem> items,
+      Map<ConstraintAttribute, String> attributes) {
+    Map<String, ConstraintItem> selectedItems = new HashMap<String, ConstraintItem>();
+    for (ConstraintItem item : items) {
+      // don't select constraints with CONSTRAINT_VALUE=ANY
+      if (item.getConstraintValue().equals(ConstraintValue.ANY.toString())) {
+        continue;
+      }
+
+      String key = item.filter(attributes).toString();
+      if (!selectedItems.containsKey(key)) {
+        selectedItems.put(key, item);
+      } else {
+        ConstraintItem existingItem = selectedItems.get(key);
+        if (existingItem.match(item.getAttributes())) {
+          // item is more specific than existingItem
+          selectedItems.put(key, item);
+        } else if (!item.match(existingItem.getAttributes())) {
+          // existingItem and item are of incomparable specificity
+          int value = valueOf(item.getConstraintValue());
+          int existingValue = valueOf(existingItem.getConstraintValue());
+          if (value < existingValue) {
+            // item's constraint value is less than that of existingItem
+            selectedItems.put(key, item);
+          } else if (value == existingValue) {
+            if (item.toString().compareTo(existingItem.toString()) < 0) {
+              // item is ahead of existingItem in alphabetic order
+              selectedItems.put(key, item);
+            }
+          }
+        }
+      }
+    }
+    return new HashSet<ConstraintItem>(selectedItems.values());
+  }
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    MessageSelectionStageOutput msgSelectionOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+
+    if (cluster == null || resourceMap == null || msgSelectionOutput == null) {
+      throw new StageException("Missing attributes in event: " + event
+          + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
+    }
+
+    MessageThrottleStageOutput output = new MessageThrottleStageOutput();
+
+    // TODO fix it
+    ClusterConstraints constraint = null;
+    // cache.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
+    Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
+
+    // TODO fix it
+    // if (constraint != null) {
+    // // go through all pending messages, they should be counted but not throttled
+    // for (String instance : cache.getLiveInstances().keySet()) {
+    // throttle(throttleCounterMap, constraint, new ArrayList<Message>(cache.getMessages(instance)
+    // .values()), false);
+    // }
+    // }
+
+    // go through all new messages, throttle if necessary
+    // assume messages should be sorted by state transition priority in messageSelection stage
+    for (ResourceId resourceId : resourceMap.keySet()) {
+      Resource resource = resourceMap.get(resourceId);
+      // TODO fix it
+      // for (Partition partition : resource.getPartitions()) {
+      // List<Message> messages = msgSelectionOutput.getMessages(resourceName, partition);
+      // if (constraint != null && messages != null && messages.size() > 0) {
+      // messages = throttle(throttleCounterMap, constraint, messages, true);
+      // }
+      // output.addMessages(resourceName, partition, messages);
+      // }
+    }
+
+    event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output);
+  }
+
+  private List<Message> throttle(Map<String, Integer> throttleMap, ClusterConstraints constraint,
+      List<Message> messages, final boolean needThrottle) {
+
+    List<Message> throttleOutputMsgs = new ArrayList<Message>();
+    for (Message message : messages) {
+      Map<ConstraintAttribute, String> msgAttr = ClusterConstraints.toConstraintAttributes(message);
+
+      Set<ConstraintItem> matches = constraint.match(msgAttr);
+      matches = selectConstraints(matches, msgAttr);
+
+      boolean msgThrottled = false;
+      for (ConstraintItem item : matches) {
+        String key = item.filter(msgAttr).toString();
+        if (!throttleMap.containsKey(key)) {
+          throttleMap.put(key, valueOf(item.getConstraintValue()));
+        }
+        int value = throttleMap.get(key);
+        throttleMap.put(key, --value);
+
+        if (needThrottle && value < 0) {
+          msgThrottled = true;
+
+          if (LOG.isDebugEnabled()) {
+            // TODO: printout constraint item that throttles the message
+            LOG.debug("message: " + message + " is throttled by constraint: " + item);
+          }
+        }
+      }
+
+      if (!msgThrottled) {
+        throttleOutputMsgs.add(message);
+      }
+    }
+
+    return throttleOutputMsgs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java
new file mode 100644
index 0000000..3359b50
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java
@@ -0,0 +1,84 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.rebalancer.NewRebalancer;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Check and invoke custom implementation idealstate rebalancers.<br/>
+ * If the resourceConfig has specified className of the customized rebalancer, <br/>
+ * the rebalancer will be invoked to re-write the idealstate of the resource<br/>
+ */
+public class NewRebalanceIdealStateStage extends AbstractBaseStage {
+  private static final Logger LOG = Logger.getLogger(NewRebalanceIdealStateStage.class.getName());
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    NewCurrentStateOutput currentStateOutput =
+        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+
+    // Map<String, IdealState> updatedIdealStates = new HashMap<String, IdealState>();
+    for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
+      // IdealState currentIdealState = idealStateMap.get(resourceName);
+      Resource resource = cluster.getResource(resourceId);
+      RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+      if (rebalancerConfig.getRebalancerMode() == RebalanceMode.USER_DEFINED
+          && rebalancerConfig.getRebalancerClassName() != null) {
+        String rebalancerClassName = rebalancerConfig.getRebalancerClassName();
+        LOG.info("resource " + resourceId + " use idealStateRebalancer " + rebalancerClassName);
+        try {
+          NewRebalancer balancer =
+              (NewRebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
+
+          // TODO add state model def
+          ResourceAssignment resourceAssignment =
+              balancer.computeResourceMapping(resource, cluster, null);
+
+          // TODO impl this
+          // currentIdealState.updateFromAssignment(resourceAssignment);
+          // updatedIdealStates.put(resourceName, currentIdealState);
+        } catch (Exception e) {
+          LOG.error("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
+        }
+      }
+    }
+
+    // TODO
+    // if (updatedIdealStates.size() > 0) {
+      // cache.getIdealStates().putAll(updatedIdealStates);
+    // }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
new file mode 100644
index 0000000..b8c1ecf
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -0,0 +1,108 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.StateModelFactoryId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.CurrentState;
+import org.apache.log4j.Logger;
+
+/**
+ * This stage computes all the resources in a cluster. The resources are
+ * computed from IdealStates -> this gives all the resources currently active
+ * CurrentState for liveInstance-> Helps in finding resources that are inactive
+ * and needs to be dropped
+ */
+public class NewResourceComputationStage extends AbstractBaseStage {
+  private static Logger LOG = Logger.getLogger(NewResourceComputationStage.class);
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    if (cluster == null) {
+      throw new StageException("Missing attributes in event:" + event + ". Requires Cluster");
+    }
+
+    Map<ResourceId, Resource.Builder> resourceBuilderMap =
+        new LinkedHashMap<ResourceId, Resource.Builder>();
+    // include all resources in ideal-state
+    for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
+      Resource resource = cluster.getResource(resourceId);
+      RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+
+      Resource.Builder resourceBuilder = new Resource.Builder(resourceId);
+      resourceBuilder.rebalancerConfig(rebalancerConfig);
+      resourceBuilder.addPartitions(resource.getPartitionSet());
+      resourceBuilderMap.put(resourceId, resourceBuilder);
+    }
+
+    // include all partitions from CurrentState as well since idealState might be removed
+    for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
+      for ( ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
+        CurrentState currentState = liveParticipant.getCurrentStateMap().get(resourceId);
+
+        if (currentState.getStateModelDefRef() == null) {
+          LOG.error("state model def is null." + "resource:" + currentState.getResourceId()
+              + ", partitions: " + currentState.getPartitionStateStringMap().keySet()
+              + ", states: " + currentState.getPartitionStateStringMap().values());
+          throw new StageException("State model def is null for resource:"
+              + currentState.getResourceId());
+        }
+
+        // don't overwrite ideal state configs
+        if (!resourceBuilderMap.containsKey(resourceId)) {
+          RebalancerConfig.Builder rebalancerConfigBuilder = new RebalancerConfig.Builder();
+          rebalancerConfigBuilder.stateModelDef(currentState.getStateModelDefId());
+          rebalancerConfigBuilder.stateModelFactoryId(new StateModelFactoryId(currentState.getStateModelFactoryName()));
+          rebalancerConfigBuilder.bucketSize(currentState.getBucketSize());
+          rebalancerConfigBuilder.batchMessageMode(currentState.getBatchMessageMode());
+
+          org.apache.helix.api.Resource.Builder resourceBuilder = new org.apache.helix.api.Resource.Builder(resourceId);
+          resourceBuilder.rebalancerConfig(rebalancerConfigBuilder.build());
+          resourceBuilderMap.put(resourceId, resourceBuilder);
+        }
+
+        for (PartitionId partitionId : currentState.getPartitionStateMap().keySet()) {
+          resourceBuilderMap.get(resourceId).addPartition(new Partition(partitionId));
+        }
+      }
+    }
+
+    // convert builder-map to resource-map
+    Map<ResourceId, Resource> resourceMap = new LinkedHashMap<ResourceId, Resource>();
+    for (ResourceId resourceId : resourceBuilderMap.keySet()) {
+      resourceMap.put(resourceId, resourceBuilderMap.get(resourceId).build());
+    }
+
+    event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
+  }
+}