You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:17 UTC

[09/53] [abbrv] git commit: [HELIX-98] clean up setting constraint api, [HELIX-246] Refactor scheduler task config to comply with new rebalancer config and fix related scheduler task tests

[HELIX-98] clean up setting constraint api, [HELIX-246] Refactor scheduler task config to comply with new rebalancer config and fix related scheduler task tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/243f2adf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/243f2adf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/243f2adf

Branch: refs/heads/master
Commit: 243f2adfa716bc704c3ad31cb4c66256f0b8fd5e
Parents: fb96a13
Author: zzhang <zz...@apache.org>
Authored: Wed Sep 18 13:10:51 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Nov 6 13:17:34 2013 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/helix/api/Cluster.java |  40 ++-
 .../org/apache/helix/api/ClusterAccessor.java   |   2 +-
 .../org/apache/helix/api/ClusterConfig.java     | 245 ++++++++++++++++++-
 .../java/org/apache/helix/api/ConstraintId.java |  74 ++++++
 .../helix/api/CustomRebalancerConfig.java       |  10 +
 .../helix/api/FullAutoRebalancerConfig.java     |   8 +
 .../apache/helix/api/ParticipantAccessor.java   |   5 +-
 .../java/org/apache/helix/api/PartitionId.java  |   8 +
 .../org/apache/helix/api/RebalancerConfig.java  |  24 ++
 .../main/java/org/apache/helix/api/Scope.java   |  31 +++
 .../helix/api/SemiAutoRebalancerConfig.java     |  10 +
 .../helix/api/UserDefinedRebalancerConfig.java  |   9 +
 .../controller/rebalancer/AutoRebalancer.java   |   1 +
 .../controller/rebalancer/CustomRebalancer.java |   4 +-
 .../rebalancer/NewAutoRebalancer.java           |  25 +-
 .../rebalancer/NewSemiAutoRebalancer.java       |   6 +-
 .../helix/controller/rebalancer/Rebalancer.java |   1 +
 .../rebalancer/SemiAutoRebalancer.java          |   1 +
 .../util/ConstraintBasedAssignment.java         |   1 +
 .../util/NewConstraintBasedAssignment.java      |  29 ++-
 .../stages/BestPossibleStateCalcStage.java      |   4 +-
 .../stages/NewBestPossibleStateCalcStage.java   |   3 +-
 .../stages/NewResourceComputationStage.java     | 160 ++++++++----
 .../stages/RebalanceIdealStateStage.java        |   4 +-
 .../stages/StatsAggregationStage.java           |   2 +-
 .../strategy/EspressoRelayStrategy.java         |   3 +-
 .../DefaultSchedulerMessageHandlerFactory.java  |   6 +-
 .../apache/helix/manager/zk/ZKHelixAdmin.java   |  39 +--
 .../apache/helix/model/ClusterConstraints.java  |  38 ++-
 .../org/apache/helix/model/ExternalView.java    |   3 +
 .../java/org/apache/helix/model/IdealState.java |  29 +--
 .../builder/ClusterConstraintsBuilder.java      |  11 +-
 .../helix/model/builder/IdealStateBuilder.java  |   6 +-
 .../builder/MessageConstraintItemBuilder.java   | 107 ++++++++
 .../builder/StateConstraintItemBuilder.java     |  92 +++++++
 .../monitoring/mbeans/ResourceMonitor.java      |  18 +-
 .../participant/HelixCustomCodeRunner.java      |  10 +-
 .../org/apache/helix/tools/ClusterSetup.java    |  10 +-
 .../org/apache/helix/util/RebalanceUtil.java    |  25 +-
 .../java/org/apache/helix/TestZKCallback.java   |   2 +-
 .../java/org/apache/helix/ZkUnitTestBase.java   |  25 +-
 .../helix/controller/stages/BaseStageTest.java  |   2 +-
 .../TestBestPossibleCalcStageCompatibility.java |   9 +-
 .../stages/TestCompatibilityCheckStage.java     |   3 +-
 .../stages/TestResourceComputationStage.java    |   7 +-
 .../strategy/TestNewAutoRebalanceStrategy.java  |  17 +-
 .../TestAddStateModelFactoryAfterConnect.java   |   3 +-
 .../helix/integration/TestAutoRebalance.java    |  15 +-
 .../TestAutoRebalancePartitionLimit.java        |  13 +-
 .../TestCustomizedIdealStateRebalancer.java     |  16 +-
 .../integration/TestInvalidAutoIdealState.java  |   3 +-
 .../helix/integration/TestRenamePartition.java  |   3 +-
 .../helix/integration/TestSchedulerMessage.java |   9 +-
 .../integration/TestStateTransitionTimeout.java |  17 +-
 .../helix/integration/TestZkReconnect.java      |   3 +-
 .../helix/manager/zk/TestZNRecordSizeLimit.java |  17 +-
 .../helix/manager/zk/TestZkHelixAdmin.java      |   7 +-
 .../org/apache/helix/model/TestIdealState.java  |  25 +-
 .../apache/helix/tools/TestHelixAdminCli.java   |  32 +--
 59 files changed, 1068 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index 005ae0d..56a84e9 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.Transition;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableMap;
@@ -99,8 +100,10 @@ public class Cluster {
           }
         });
     _config =
-        new ClusterConfig(id, resourceConfigMap, participantConfigMap, constraintMap,
-            stateModelMap, userConfig, isPaused);
+        new ClusterConfig.Builder(id).addResources(resourceConfigMap.values())
+            .addParticipants(participantConfigMap.values()).addConstraints(constraintMap.values())
+            .addStateModelDefinitions(stateModelMap.values()).setPausedStatus(isPaused)
+            .userConfig(userConfig).build();
 
     _resourceMap = ImmutableMap.copyOf(resourceMap);
 
@@ -222,10 +225,43 @@ public class Cluster {
   }
 
   /**
+   * Get the maximum number of participants that can be in a state
+   * @param scope the scope for the bound
+   * @param stateModelDefId the state model of the state
+   * @param state the constrained state
+   * @return The upper bound, which can be "-1" if unspecified, a numerical upper bound, "R" for
+   *         number of replicas, or "N" for number of participants
+   */
+  public String getStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+      State state) {
+    return _config.getStateUpperBoundConstraint(scope, stateModelDefId, state);
+  }
+
+  /**
+   * Get the limit of simultaneous execution of a transition
+   * @param scope the scope under which the transition is constrained
+   * @param stateModelDefId the state model of which the transition is a part
+   * @param transition the constrained transition
+   * @return the limit, or Integer.MAX_VALUE if there is no limit
+   */
+  public int getTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+      Transition transition) {
+    return _config.getTransitionConstraint(scope, stateModelDefId, transition);
+  }
+
+  /**
    * Check the pasued status of the cluster
    * @return true if paused, false otherwise
    */
   public boolean isPaused() {
     return _config.isPaused();
   }
+
+  /**
+   * Get the ClusterConfig specifying this cluster
+   * @return ClusterConfig
+   */
+  public ClusterConfig getConfig() {
+    return _config;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
index a46edad..e30825c 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
@@ -66,7 +66,7 @@ public class ClusterAccessor {
   public boolean createCluster(ClusterConfig cluster) {
     boolean created = _accessor.createProperty(_keyBuilder.cluster(), null);
     if (!created) {
-      LOG.warn("Cluster already created. Aborting.");
+      // LOG.warn("Cluster already created. Aborting.");
       // return false;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
index 27da37d..ba1e78e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
@@ -3,12 +3,21 @@ package org.apache.helix.api;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.ClusterConstraints.ConstraintValue;
+import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.Transition;
+import org.apache.helix.model.builder.ConstraintItemBuilder;
+import org.apache.log4j.Logger;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -33,6 +42,8 @@ import com.google.common.collect.ImmutableMap;
  * Configuration properties of a cluster
  */
 public class ClusterConfig {
+  private static final Logger LOG = Logger.getLogger(ClusterConfig.class);
+
   private final ClusterId _id;
   private final Map<ResourceId, ResourceConfig> _resourceMap;
   private final Map<ParticipantId, ParticipantConfig> _participantMap;
@@ -51,7 +62,7 @@ public class ClusterConfig {
    * @param userConfig user-defined cluster properties
    * @param isPaused true if paused, false if active
    */
-  public ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
+  private ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
       Map<ParticipantId, ParticipantConfig> participantMap,
       Map<ConstraintType, ClusterConstraints> constraintMap,
       Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
@@ -90,6 +101,105 @@ public class ClusterConfig {
   }
 
   /**
+   * Get the maximum number of participants that can be in a state
+   * @param scope the scope for the bound
+   * @param stateModelDefId the state model of the state
+   * @param state the constrained state
+   * @return The upper bound, which can be "-1" if unspecified, a numerical upper bound, "R" for
+   *         number of replicas, or "N" for number of participants
+   */
+  public String getStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+      State state) {
+    // set up attributes to match based on the scope
+    ClusterConstraints stateConstraints = getConstraintMap().get(ConstraintType.STATE_CONSTRAINT);
+    Map<ConstraintAttribute, String> matchAttributes = Maps.newHashMap();
+    matchAttributes.put(ConstraintAttribute.STATE, state.toString());
+    matchAttributes.put(ConstraintAttribute.STATE_MODEL, stateModelDefId.toString());
+    switch (scope.getType()) {
+    case CLUSTER:
+      // cluster is implicit
+      break;
+    case RESOURCE:
+      matchAttributes.put(ConstraintAttribute.RESOURCE, scope.getScopedId().stringify());
+      break;
+    default:
+      LOG.error("Unsupported scope for state constraint: " + scope);
+      return "-1";
+    }
+    Set<ConstraintItem> matches = stateConstraints.match(matchAttributes);
+    int value = -1;
+    for (ConstraintItem item : matches) {
+      // match: if an R or N is found, always choose that one
+      // otherwise, take the minimum of the counts specified in the constraints
+      String constraintValue = item.getConstraintValue();
+      if (constraintValue != null) {
+        if (constraintValue.equals(ConstraintValue.N.toString())
+            || constraintValue.equals(ConstraintValue.R.toString())) {
+          return constraintValue;
+        } else {
+          try {
+            int current = Integer.parseInt(constraintValue);
+            if (value == -1 || current < value) {
+              value = current;
+            }
+          } catch (NumberFormatException e) {
+            LOG.error("Invalid state upper bound: " + constraintValue);
+          }
+        }
+      }
+    }
+    return Integer.toString(value);
+  }
+
+  /**
+   * Get the limit of simultaneous execution of a transition
+   * @param scope the scope under which the transition is constrained
+   * @param stateModelDefId the state model of which the transition is a part
+   * @param transition the constrained transition
+   * @return the limit, or Integer.MAX_VALUE if there is no limit
+   */
+  public int getTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+      Transition transition) {
+    // set up attributes to match based on the scope
+    ClusterConstraints transitionConstraints =
+        getConstraintMap().get(ConstraintType.MESSAGE_CONSTRAINT);
+    Map<ConstraintAttribute, String> matchAttributes = Maps.newHashMap();
+    matchAttributes.put(ConstraintAttribute.STATE_MODEL, stateModelDefId.toString());
+    matchAttributes.put(ConstraintAttribute.MESSAGE_TYPE, MessageType.STATE_TRANSITION.toString());
+    matchAttributes.put(ConstraintAttribute.TRANSITION, transition.toString());
+    switch (scope.getType()) {
+    case CLUSTER:
+      // cluster is implicit
+      break;
+    case RESOURCE:
+      matchAttributes.put(ConstraintAttribute.RESOURCE, scope.getScopedId().stringify());
+      break;
+    case PARTICIPANT:
+      matchAttributes.put(ConstraintAttribute.INSTANCE, scope.getScopedId().stringify());
+      break;
+    default:
+      LOG.error("Unsupported scope for transition constraints: " + scope);
+      return Integer.MAX_VALUE;
+    }
+    Set<ConstraintItem> matches = transitionConstraints.match(matchAttributes);
+    int value = Integer.MAX_VALUE;
+    for (ConstraintItem item : matches) {
+      String constraintValue = item.getConstraintValue();
+      if (constraintValue != null) {
+        try {
+          int current = Integer.parseInt(constraintValue);
+          if (current < value) {
+            value = current;
+          }
+        } catch (NumberFormatException e) {
+          LOG.error("Invalid in-flight transition cap: " + constraintValue);
+        }
+      }
+    }
+    return value;
+  }
+
+  /**
    * Get participants of the cluster
    * @return a map of participant id to participant, or empty map if none
    */
@@ -197,7 +307,11 @@ public class ClusterConfig {
      * @return Builder
      */
     public Builder addConstraint(ClusterConstraints constraint) {
-      _constraintMap.put(constraint.getType(), constraint);
+      ClusterConstraints existConstraints = getConstraintsInstance(constraint.getType());
+      for (ConstraintId constraintId : constraint.getConstraintItems().keySet()) {
+        existConstraints
+            .addConstraintItem(constraintId, constraint.getConstraintItem(constraintId));
+      }
       return this;
     }
 
@@ -214,12 +328,121 @@ public class ClusterConfig {
     }
 
     /**
-     * Add a constraint to the cluster
-     * @param constraint cluster constraint of a specific type
+     * Add a constraint on the maximum number of in-flight transitions of a certain type
+     * @param scope scope of the constraint
+     * @param constraintId unique constraint identifier
+     * @param stateModelDefId identifies the state model containing the transition
+     * @param transition the transition to constrain
+     * @param maxInFlightTransitions number of allowed in-flight transitions in the scope
+     * @return Builder
+     */
+    public Builder addTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+        Transition transition, int maxInFlightTransitions) {
+      Map<String, String> attributes = Maps.newHashMap();
+      attributes.put(ConstraintAttribute.MESSAGE_TYPE.toString(),
+          MessageType.STATE_TRANSITION.toString());
+      attributes.put(ConstraintAttribute.CONSTRAINT_VALUE.toString(),
+          Integer.toString(maxInFlightTransitions));
+      attributes.put(ConstraintAttribute.TRANSITION.toString(), transition.toString());
+      attributes.put(ConstraintAttribute.STATE_MODEL.toString(), stateModelDefId.stringify());
+      switch (scope.getType()) {
+      case CLUSTER:
+        // cluster is implicit
+        break;
+      case RESOURCE:
+        attributes.put(ConstraintAttribute.RESOURCE.toString(), scope.getScopedId().stringify());
+        break;
+      case PARTICIPANT:
+        attributes.put(ConstraintAttribute.INSTANCE.toString(), scope.getScopedId().stringify());
+        break;
+      default:
+        LOG.error("Unsupported scope for adding a transition constraint: " + scope);
+        return this;
+      }
+      ConstraintItem item = new ConstraintItemBuilder().addConstraintAttributes(attributes).build();
+      ClusterConstraints constraints = getConstraintsInstance(ConstraintType.MESSAGE_CONSTRAINT);
+      constraints.addConstraintItem(ConstraintId.from(scope, stateModelDefId, transition), item);
+      return this;
+    }
+
+    /**
+     * Add a state upper bound constraint
+     * @param scope scope under which the constraint is valid
+     * @param stateModelDefId identifier of the state model that owns the state
+     * @param state the state to constrain
+     * @param upperBound maximum number of replicas per partition in the state
+     * @return
+     */
+    public Builder addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+        State state, int upperBound) {
+      return addStateUpperBoundConstraint(scope, stateModelDefId, state,
+          Integer.toString(upperBound));
+    }
+
+    /**
+     * Add a state upper bound constraint
+     * @param scope scope under which the constraint is valid
+     * @param stateModelDefId identifier of the state model that owns the state
+     * @param state the state to constrain
+     * @param dynamicUpperBound the upper bound of replicas per partition in the state, can be a
+     *          number, or the currently supported special bound values:<br />
+     *          "R" - Refers to the number of replicas specified during resource
+     *          creation. This allows having different replication factor for each
+     *          resource without having to create a different state machine. <br />
+     *          "N" - Refers to all nodes in the cluster. Useful for resources that need
+     *          to exist on all nodes. This way one can add/remove nodes without having
+     *          the change the bounds.
+     * @return Builder
+     */
+    public Builder addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+        State state, String dynamicUpperBound) {
+      Map<String, String> attributes = Maps.newHashMap();
+      attributes.put(ConstraintAttribute.STATE.toString(), state.toString());
+      attributes.put(ConstraintAttribute.STATE_MODEL.toString(), stateModelDefId.stringify());
+      attributes.put(ConstraintAttribute.CONSTRAINT_VALUE.toString(), dynamicUpperBound);
+      switch (scope.getType()) {
+      case CLUSTER:
+        // cluster is implicit
+        break;
+      case RESOURCE:
+        attributes.put(ConstraintAttribute.RESOURCE.toString(), scope.getScopedId().stringify());
+        break;
+      default:
+        LOG.error("Unsupported scope for adding a state constraint: " + scope);
+        return this;
+      }
+      ConstraintItem item = new ConstraintItemBuilder().addConstraintAttributes(attributes).build();
+      ClusterConstraints constraints = getConstraintsInstance(ConstraintType.STATE_CONSTRAINT);
+      constraints.addConstraintItem(ConstraintId.from(scope, stateModelDefId, state), item);
+      return this;
+    }
+
+    /**
+     * Add a state model definition to the cluster
+     * @param stateModelDef state model definition of the cluster
      * @return Builder
      */
     public Builder addStateModelDefinition(StateModelDefinition stateModelDef) {
       _stateModelMap.put(stateModelDef.getStateModelDefId(), stateModelDef);
+      // add state constraints from the state model definition
+      for (State state : stateModelDef.getStatesPriorityList()) {
+        if (!stateModelDef.getNumParticipantsPerState(state).equals("-1")) {
+          addStateUpperBoundConstraint(Scope.cluster(_id), stateModelDef.getStateModelDefId(),
+              state, stateModelDef.getNumParticipantsPerState(state));
+        }
+      }
+      return this;
+    }
+
+    /**
+     * Add multiple state model definitions
+     * @param stateModelDefs collection of state model definitions for the cluster
+     * @return Builder
+     */
+    public Builder addStateModelDefinitions(Collection<StateModelDefinition> stateModelDefs) {
+      for (StateModelDefinition stateModelDef : stateModelDefs) {
+        addStateModelDefinition(stateModelDef);
+      }
       return this;
     }
 
@@ -251,5 +474,19 @@ public class ClusterConfig {
       return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _stateModelMap,
           _userConfig, _isPaused);
     }
+
+    /**
+     * Get a valid instance of ClusterConstraints for a type
+     * @param type the type
+     * @return ClusterConstraints
+     */
+    private ClusterConstraints getConstraintsInstance(ConstraintType type) {
+      ClusterConstraints constraints = _constraintMap.get(type);
+      if (constraints == null) {
+        constraints = new ClusterConstraints(type);
+        _constraintMap.put(type, constraints);
+      }
+      return constraints;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/ConstraintId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ConstraintId.java b/helix-core/src/main/java/org/apache/helix/api/ConstraintId.java
new file mode 100644
index 0000000..85c9430
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/ConstraintId.java
@@ -0,0 +1,74 @@
+package org.apache.helix.api;
+
+import org.apache.helix.model.Transition;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Identifies a constraint item on the cluster
+ */
+public class ConstraintId extends Id {
+  private final String _constraintId;
+
+  /**
+   * Create a constraint id
+   * @param constraintId string representing the constraint id
+   */
+  private ConstraintId(String constraintId) {
+    _constraintId = constraintId;
+  }
+
+  @Override
+  public String stringify() {
+    return _constraintId;
+  }
+
+  /**
+   * Get a constraint id from a string
+   * @param constraintId string representing the constraint id
+   * @return ConstraintId
+   */
+  public static ConstraintId from(String constraintId) {
+    return new ConstraintId(constraintId);
+  }
+
+  /**
+   * Get a state constraint id based on the state model definition and state
+   * @param scope the scope of the constraint
+   * @param stateModelDefId the state model
+   * @param state the constrained state
+   * @return ConstraintId
+   */
+  public static ConstraintId from(Scope<?> scope, StateModelDefId stateModelDefId, State state) {
+    return new ConstraintId(scope + "|" + stateModelDefId + "|" + state);
+  }
+
+  /**
+   * Get a state constraint id based on the state model definition and transition
+   * @param scope the scope of the constraint
+   * @param stateModelDefId the state model
+   * @param transition the constrained transition
+   * @return ConstraintId
+   */
+  public static ConstraintId from(Scope<?> scope, StateModelDefId stateModelDefId,
+      Transition transition) {
+    return new ConstraintId(scope + "|" + stateModelDefId + "|" + transition);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/CustomRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/CustomRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/CustomRebalancerConfig.java
index 97d4a45..a5f2bbe 100644
--- a/helix-core/src/main/java/org/apache/helix/api/CustomRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/CustomRebalancerConfig.java
@@ -112,6 +112,16 @@ public final class CustomRebalancerConfig extends RebalancerConfig {
       _preferenceMaps = new HashMap<PartitionId, Map<ParticipantId, State>>();
     }
 
+		/**
+		 * Construct builder using an existing custom rebalancer config
+		 * @param config
+		 */
+		public Builder(CustomRebalancerConfig config) {
+			super(config);
+			_preferenceMaps = new HashMap<PartitionId, Map<ParticipantId, State>>();
+			_preferenceMaps.putAll(config.getPreferenceMaps());
+		}
+
     /**
      * Add a preference map of a partition
      * @param partitionId the partition to set

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/FullAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/FullAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/FullAutoRebalancerConfig.java
index 3482d16..599d20d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/FullAutoRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/FullAutoRebalancerConfig.java
@@ -67,6 +67,14 @@ public final class FullAutoRebalancerConfig extends RebalancerConfig {
       super(resourceId);
     }
 
+    /**
+     * Construct a builder using an existing full-auto rebalancer config
+     * @param config
+     */
+    public Builder(FullAutoRebalancerConfig config) {
+      super(config);
+    }
+
     @Override
     public FullAutoRebalancerConfig build() {
       if (_partitionMap.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
index 6c1be9e..46895d3 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
@@ -170,11 +170,10 @@ public class ParticipantAccessor {
     } else {
       // check partitions exist. warn if not
       for (PartitionId partitionId : partitionIdSet) {
-        String partitionName = partitionId.stringify();
         if ((idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO && idealState
-            .getPreferenceList(partitionName) == null)
+            .getPreferenceList(partitionId) == null)
             || (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED && idealState
-                .getInstanceStateMap(partitionName) == null)) {
+                .getParticipantStateMap(partitionId) == null)) {
           LOG.warn("Cluster: " + _clusterId + ", resource: " + resourceId + ", partition: "
               + partitionId + ", partition does NOT exist in ideal state");
         }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/PartitionId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/PartitionId.java b/helix-core/src/main/java/org/apache/helix/api/PartitionId.java
index e72f1b3..757406d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/PartitionId.java
+++ b/helix-core/src/main/java/org/apache/helix/api/PartitionId.java
@@ -28,6 +28,14 @@ public class PartitionId extends Id {
     _partitionName = partitionName;
   }
 
+  /**
+   * Get the id of the resource containing this partition
+   * @return ResourceId
+   */
+  public ResourceId getResourceId() {
+    return _resourceId;
+  }
+
   @Override
   public String stringify() {
     // check in case the partition name is instantiated incorrectly

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
index 2f531e0..d377af8 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
@@ -376,6 +376,22 @@ public class RebalancerConfig extends NamespacedConfig {
     }
 
     /**
+     * Construct a builder from an existing rebalancer config
+     * @param config
+     */
+    public Builder(RebalancerConfig config) {
+      _resourceId = config.getResourceId();
+      _partitionMap = new TreeMap<PartitionId, Partition>();
+      _partitionMap.putAll(config.getPartitionMap());
+      _stateModelDefId = config.getStateModelDefId();
+      _stateModelFactoryId = config.getStateModelFactoryId();
+      _anyLiveParticipant = config.canAssignAnyLiveParticipant();
+      _replicaCount = config.getReplicaCount();
+      _maxPartitionsPerParticipant = config.getMaxPartitionsPerParticipant();
+      _participantGroupTag = config.getParticipantGroupTag();
+    }
+
+    /**
      * Set the state model definition
      * @param stateModelDefId state model identifier
      * @return Builder
@@ -513,6 +529,14 @@ public class RebalancerConfig extends NamespacedConfig {
     }
 
     /**
+     * Construct with an existing rebalancer config
+     * @param config
+     */
+    public SimpleBuilder(RebalancerConfig config) {
+      super(config);
+    }
+
+    /**
      * Build a rebalancer config
      * @return
      */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/Scope.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Scope.java b/helix-core/src/main/java/org/apache/helix/api/Scope.java
index e887f98..4bff194 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Scope.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Scope.java
@@ -24,6 +24,13 @@ package org.apache.helix.api;
  * of cluster, participant, partition, or resource, but not more than one of these at any time.
  */
 public class Scope<T extends Id> {
+  public enum ScopeType {
+    CLUSTER,
+    PARTICIPANT,
+    PARTITION,
+    RESOURCE
+  }
+
   private final T _id;
 
   /**
@@ -42,6 +49,30 @@ public class Scope<T extends Id> {
     return _id;
   }
 
+  @Override
+  public String toString() {
+    return getType() + "{" + getScopedId() + "}";
+  }
+
+  /**
+   * Get the Helix entity type that this scope covers
+   * @return scope type
+   */
+  public ScopeType getType() {
+    Class<?> idClass = _id.getClass();
+    if (idClass == ClusterId.class) {
+      return ScopeType.CLUSTER;
+    } else if (idClass == ParticipantId.class) {
+      return ScopeType.PARTICIPANT;
+    } else if (idClass == PartitionId.class) {
+      return ScopeType.PARTITION;
+    } else if (idClass == ResourceId.class) {
+      return ScopeType.RESOURCE;
+    } else {
+      return null;
+    }
+  }
+
   /**
    * Get a cluster scope
    * @param clusterId the id of the cluster that is scoped

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/SemiAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/SemiAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/SemiAutoRebalancerConfig.java
index a5284a8..409769e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/SemiAutoRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/SemiAutoRebalancerConfig.java
@@ -116,6 +116,16 @@ public final class SemiAutoRebalancerConfig extends RebalancerConfig {
     }
 
     /**
+     * Construct a builder from an existing semi-auto rebalancer config
+     * @param config
+     */
+    public Builder(SemiAutoRebalancerConfig config) {
+    	super(config);
+        _preferenceLists = new HashMap<PartitionId, List<ParticipantId>>();
+        _preferenceLists.putAll(config.getPreferenceLists());
+    }
+    
+    /**
      * Add a preference list of a partition
      * @param partitionId the partition to set
      * @param preferenceList list of participant ids, most preferred first

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java
index 345c80b..d9517a9 100644
--- a/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java
@@ -88,6 +88,15 @@ public final class UserDefinedRebalancerConfig extends RebalancerConfig {
       super(resourceId);
     }
 
+    /**
+     * Construct a builder using an existing user-defined rebalancer config
+     * @param config
+     */
+		public Builder(UserDefinedRebalancerConfig config) {
+			super(config);
+			_rebalancerRef = config.getRebalancerRef();
+		}
+
     public Builder rebalancerRef(RebalancerRef rebalancerRef) {
       _rebalancerRef = rebalancerRef;
       return this;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 32b4e92..1c69dfe 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -56,6 +56,7 @@ import org.apache.log4j.Logger;
  * The output is a preference list and a mapping based on that preference list, i.e. partition p
  * has a replica on node k with state s.
  */
+@Deprecated
 public class AutoRebalancer implements Rebalancer {
   // These should be final, but are initialized in init rather than a constructor
   private HelixManager _manager;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 689561b..709b61e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -46,6 +46,7 @@ import org.apache.log4j.Logger;
  * The output is a verified mapping based on that preference list, i.e. partition p has a replica
  * on node k with state s, where s may be a dropped or error state if necessary.
  */
+@Deprecated
 public class CustomRebalancer implements Rebalancer {
 
   private static final Logger LOG = Logger.getLogger(CustomRebalancer.class);
@@ -70,7 +71,8 @@ public class CustomRebalancer implements Rebalancer {
       Set<String> disabledInstancesForPartition =
           clusterData.getDisabledInstancesForPartition(partition.toString());
       Map<String, String> idealStateMap =
-          currentIdealState.getInstanceStateMap(partition.getPartitionName());
+          IdealState.stringMapFromParticipantStateMap(currentIdealState
+              .getParticipantStateMap(PartitionId.from(partition.getPartitionName())));
       Map<String, String> bestStateForPartition =
           computeCustomizedBestStateForPartition(clusterData, stateModelDef, idealStateMap,
               currentStateMap, disabledInstancesForPartition);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
index 8212b7f..1972f03 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
@@ -35,7 +35,6 @@ import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.State;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
@@ -82,13 +81,23 @@ public class NewAutoRebalancer implements NewRebalancer<FullAutoRebalancerConfig
       replicas = config.getReplicaCount();
     }
 
-    LinkedHashMap<String, Integer> stateCountMap =
-        ConstraintBasedAssignment.stateCount(stateModelDef, liveParticipants.size(), replicas);
+    // count how many replicas should be in each state
+    LinkedHashMap<State, Integer> stateCountMap =
+        NewConstraintBasedAssignment.stateCount(cluster.getConfig(), config.getResourceId(),
+            stateModelDef, liveParticipants.size(), replicas);
+    LinkedHashMap<String, Integer> rawStateCountMap = new LinkedHashMap<String, Integer>();
+    for (State state : stateCountMap.keySet()) {
+      rawStateCountMap.put(state.toString(), stateCountMap.get(state));
+    }
+
+    // get the participant lists
     List<ParticipantId> liveParticipantList =
         new ArrayList<ParticipantId>(liveParticipants.keySet());
     List<ParticipantId> allParticipantList =
         new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
     List<String> liveNodes = Lists.transform(liveParticipantList, Functions.toStringFunction());
+
+    // compute the current mapping from the current state
     Map<PartitionId, Map<ParticipantId, State>> currentMapping =
         currentMapping(config, currentState, stateCountMap);
 
@@ -109,9 +118,9 @@ public class NewAutoRebalancer implements NewRebalancer<FullAutoRebalancerConfig
       liveNodes = new ArrayList<String>(taggedNodes);
     }
 
+    // determine which nodes the replicas should live on
     List<String> allNodes = Lists.transform(allParticipantList, Functions.toStringFunction());
     int maxPartition = config.getMaxPartitionsPerParticipant();
-
     if (LOG.isInfoEnabled()) {
       LOG.info("currentMapping: " + currentMapping);
       LOG.info("stateCountMap: " + stateCountMap);
@@ -122,7 +131,7 @@ public class NewAutoRebalancer implements NewRebalancer<FullAutoRebalancerConfig
     ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
     _algorithm =
         new AutoRebalanceStrategy(config.getResourceId().stringify(), partitionNames,
-            stateCountMap, maxPartition, placementScheme);
+            rawStateCountMap, maxPartition, placementScheme);
     ZNRecord newMapping =
         _algorithm.computePartitionAssignment(liveNodes,
             ResourceAssignment.stringMapsFromReplicaMaps(currentMapping), allNodes);
@@ -153,8 +162,8 @@ public class NewAutoRebalancer implements NewRebalancer<FullAutoRebalancerConfig
       preferenceList =
           NewConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
       Map<ParticipantId, State> bestStateForPartition =
-          NewConstraintBasedAssignment.computeAutoBestStateForPartition(liveParticipants,
-              stateModelDef, preferenceList,
+          NewConstraintBasedAssignment.computeAutoBestStateForPartition(cluster.getConfig(),
+              config.getResourceId(), liveParticipants, stateModelDef, preferenceList,
               currentState.getCurrentStateMap(config.getResourceId(), partition),
               disabledParticipantsForPartition);
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
@@ -164,7 +173,7 @@ public class NewAutoRebalancer implements NewRebalancer<FullAutoRebalancerConfig
 
   private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
       FullAutoRebalancerConfig config, ResourceCurrentState currentStateOutput,
-      Map<String, Integer> stateCountMap) {
+      Map<State, Integer> stateCountMap) {
     Map<PartitionId, Map<ParticipantId, State>> map =
         new HashMap<PartitionId, Map<ParticipantId, State>>();
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
index cb061ab..3153623 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
@@ -66,9 +66,9 @@ public class NewSemiAutoRebalancer implements NewRebalancer<SemiAutoRebalancerCo
           NewConstraintBasedAssignment.getPreferenceList(cluster, partition,
               config.getPreferenceList(partition));
       Map<ParticipantId, State> bestStateForPartition =
-          NewConstraintBasedAssignment.computeAutoBestStateForPartition(
-              cluster.getLiveParticipantMap(), stateModelDef, preferenceList, currentStateMap,
-              disabledInstancesForPartition);
+          NewConstraintBasedAssignment.computeAutoBestStateForPartition(cluster.getConfig(),
+              config.getResourceId(), cluster.getLiveParticipantMap(), stateModelDef,
+              preferenceList, currentStateMap, disabledInstancesForPartition);
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
index e4f165b..5dd399f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
@@ -31,6 +31,7 @@ import org.apache.helix.model.ResourceAssignment;
  * This will be invoked on all changes that happen in the cluster.<br/>
  * Simply return the newIdealState for a resource in this method.<br/>
  */
+@Deprecated
 public interface Rebalancer {
   /**
    * Initialize the rebalancer with a HelixManager if necessary

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index d525ea3..433b4a7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -45,6 +45,7 @@ import org.apache.log4j.Logger;
  * The output is a mapping based on that preference list, i.e. partition p has a replica on node k
  * with state s.
  */
+@Deprecated
 public class SemiAutoRebalancer implements Rebalancer {
 
   private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index ee7524f..dce0a07 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -41,6 +41,7 @@ import org.apache.log4j.Logger;
  * Collection of functions that will compute the best possible states given the live instances and
  * an ideal state.
  */
+@Deprecated
 public class ConstraintBasedAssignment {
   private static Logger logger = Logger.getLogger(ConstraintBasedAssignment.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
index 69d9fcf..603edd0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
@@ -31,9 +31,12 @@ import java.util.Set;
 import org.apache.helix.HelixConstants.StateModelToken;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.ClusterConfig;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.Scope;
 import org.apache.helix.api.State;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -96,10 +99,10 @@ public class NewConstraintBasedAssignment {
    * @param disabledParticipantsForPartition
    * @return
    */
-  public static Map<ParticipantId, State> computeAutoBestStateForPartition(
-      Map<ParticipantId, Participant> liveParticipantMap, StateModelDefinition stateModelDef,
-      List<ParticipantId> participantPreferenceList, Map<ParticipantId, State> currentStateMap,
-      Set<ParticipantId> disabledParticipantsForPartition) {
+  public static Map<ParticipantId, State> computeAutoBestStateForPartition(ClusterConfig cluster,
+      ResourceId resourceId, Map<ParticipantId, Participant> liveParticipantMap,
+      StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
+      Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
     Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
 
     // if the resource is deleted, instancePreferenceList will be empty and
@@ -128,7 +131,9 @@ public class NewConstraintBasedAssignment {
     boolean assigned[] = new boolean[participantPreferenceList.size()];
 
     for (State state : statesPriorityList) {
-      String num = stateModelDef.getNumParticipantsPerState(state);
+      String num =
+          cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
+              stateModelDef.getStateModelDefId(), state);
       int stateCount = -1;
       if ("N".equals(num)) {
         Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantMap.keySet());
@@ -171,19 +176,23 @@ public class NewConstraintBasedAssignment {
 
   /**
    * Get the number of replicas that should be in each state for a partition
+   * @param cluster cluster configuration
+   * @param resourceId the resource for which to get the state count
    * @param stateModelDef StateModelDefinition object
    * @param liveNodesNb number of live nodes
    * @param total number of replicas
    * @return state count map: state->count
    */
-  public static LinkedHashMap<State, Integer> stateCount(StateModelDefinition stateModelDef,
-      int liveNodesNb, int totalReplicas) {
+  public static LinkedHashMap<State, Integer> stateCount(ClusterConfig cluster,
+      ResourceId resourceId, StateModelDefinition stateModelDef, int liveNodesNb, int totalReplicas) {
     LinkedHashMap<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
     List<State> statesPriorityList = stateModelDef.getStatesPriorityList();
 
     int replicas = totalReplicas;
     for (State state : statesPriorityList) {
-      String num = stateModelDef.getNumParticipantsPerState(state);
+      String num =
+          cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
+              stateModelDef.getStateModelDefId(), state);
       if ("N".equals(num)) {
         stateCountMap.put(state, liveNodesNb);
       } else if ("R".equals(num)) {
@@ -207,7 +216,9 @@ public class NewConstraintBasedAssignment {
 
     // get state count for R
     for (State state : statesPriorityList) {
-      String num = stateModelDef.getNumParticipantsPerState(state);
+      String num =
+          cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
+              stateModelDef.getStateModelDefId(), state);
       if ("R".equals(num)) {
         stateCountMap.put(state, replicas);
         // should have at most one state using R

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index c7473ff..dbbcb1e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -97,8 +97,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
 
       Rebalancer rebalancer = null;
       if (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED
-          && idealState.getRebalancerClassName() != null) {
-        String rebalancerClassName = idealState.getRebalancerClassName();
+          && idealState.getRebalancerRef() != null) {
+        String rebalancerClassName = idealState.getRebalancerRef().toString();
         logger
             .info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
         try {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
index a89707b..021c9b8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -101,7 +101,8 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
           NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
               partitionId);
       partitionMapping.addReplicaMap(partitionId, NewConstraintBasedAssignment
-          .computeAutoBestStateForPartition(cluster.getLiveParticipantMap(), stateModelDef, null,
+          .computeAutoBestStateForPartition(cluster.getConfig(), resourceId,
+              cluster.getLiveParticipantMap(), stateModelDef, null,
               currentStateOutput.getCurrentStateMap(resourceId, partitionId),
               disabledParticipantsForPartition));
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
index 0cc1ea1..457b470 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -20,10 +20,11 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.CustomRebalancerConfig;
+import org.apache.helix.api.FullAutoRebalancerConfig;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.Partition;
 import org.apache.helix.api.PartitionId;
@@ -31,7 +32,9 @@ import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.SemiAutoRebalancerConfig;
 import org.apache.helix.api.StateModelFactoryId;
+import org.apache.helix.api.UserDefinedRebalancerConfig;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.CurrentState;
@@ -47,29 +50,99 @@ public class NewResourceComputationStage extends AbstractBaseStage {
   private static Logger LOG = Logger.getLogger(NewResourceComputationStage.class);
 
   @Override
-  public void process(ClusterEvent event) throws Exception {
+  public void process(ClusterEvent event) throws StageException {
     Cluster cluster = event.getAttribute("ClusterDataCache");
     if (cluster == null) {
-      throw new StageException("Missing attributes in event:" + event + ". Requires Cluster");
+      throw new StageException("Missing attributes in event: " + event + ". Requires Cluster");
+    }
+
+    Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
+    Map<ResourceId, ResourceConfig> csResCfgMap = getCurStateResourceCfgMap(cluster);
+
+    // ideal-state may be removed, add all resource config in current-state but not in ideal-state
+    for (ResourceId resourceId : csResCfgMap.keySet()) {
+      if (!cluster.getResourceMap().keySet().contains(resourceId)) {
+        resCfgMap.put(resourceId, csResCfgMap.get(resourceId));
+      }
     }
 
-    Map<ResourceId, ResourceConfig.Builder> resourceBuilderMap =
-        new LinkedHashMap<ResourceId, ResourceConfig.Builder>();
-    // include all resources in ideal-state
     for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
       Resource resource = cluster.getResource(resourceId);
-      RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
-
-      ResourceConfig.Builder resourceBuilder = new ResourceConfig.Builder(resourceId);
-      resourceBuilder.rebalancerConfig(rebalancerConfig);
-      resourceBuilder.bucketSize(resource.getBucketSize());
-      resourceBuilder.batchMessageMode(resource.getBatchMessageMode());
-      resourceBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
-      resourceBuilderMap.put(resourceId, resourceBuilder);
+      RebalancerConfig rebalancerCfg = resource.getRebalancerConfig();
+
+      ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
+      resCfgBuilder.bucketSize(resource.getBucketSize());
+      resCfgBuilder.batchMessageMode(resource.getBatchMessageMode());
+      resCfgBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
+
+      switch (rebalancerCfg.getRebalancerMode()) {
+      case USER_DEFINED: {
+        UserDefinedRebalancerConfig.Builder builder =
+            new UserDefinedRebalancerConfig.Builder(UserDefinedRebalancerConfig.from(rebalancerCfg));
+        if (csResCfgMap.containsKey(resourceId)) {
+          builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
+        }
+        resCfgBuilder.rebalancerConfig(builder.build());
+        resCfgMap.put(resourceId, resCfgBuilder.build());
+        break;
+      }
+      case FULL_AUTO: {
+        FullAutoRebalancerConfig.Builder builder =
+            new FullAutoRebalancerConfig.Builder(FullAutoRebalancerConfig.from(rebalancerCfg));
+        if (csResCfgMap.containsKey(resourceId)) {
+          builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
+        }
+        resCfgBuilder.rebalancerConfig(builder.build());
+        resCfgMap.put(resourceId, resCfgBuilder.build());
+        break;
+      }
+      case SEMI_AUTO: {
+        SemiAutoRebalancerConfig.Builder builder =
+            new SemiAutoRebalancerConfig.Builder(SemiAutoRebalancerConfig.from(rebalancerCfg));
+        if (csResCfgMap.containsKey(resourceId)) {
+          builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
+        }
+        resCfgBuilder.rebalancerConfig(builder.build());
+        resCfgMap.put(resourceId, resCfgBuilder.build());
+        break;
+      }
+      case CUSTOMIZED: {
+        CustomRebalancerConfig.Builder builder =
+            new CustomRebalancerConfig.Builder(CustomRebalancerConfig.from(rebalancerCfg));
+        if (csResCfgMap.containsKey(resourceId)) {
+          builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
+        }
+        resCfgBuilder.rebalancerConfig(builder.build());
+        resCfgMap.put(resourceId, resCfgBuilder.build());
+        break;
+      }
+      default:
+        RebalancerConfig.SimpleBuilder builder = new RebalancerConfig.SimpleBuilder(rebalancerCfg);
+        if (csResCfgMap.containsKey(resourceId)) {
+          builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
+        }
+        resCfgBuilder.rebalancerConfig(builder.build());
+        resCfgMap.put(resourceId, resCfgBuilder.build());
+        break;
+      }
+
     }
 
-    // include all partitions from CurrentState as well since idealState might be removed
-    Map<ResourceId, RebalancerConfig.SimpleBuilder> rebalancerConfigBuilderMap =
+    event.addAttribute(AttributeName.RESOURCES.toString(), resCfgMap);
+  }
+
+  /**
+   * Get resource config's from current-state
+   * @param cluster
+   * @return resource config map or empty map if not available
+   * @throws StageException
+   */
+  Map<ResourceId, ResourceConfig> getCurStateResourceCfgMap(Cluster cluster)
+      throws StageException {
+    Map<ResourceId, ResourceConfig.Builder> resCfgBuilderMap =
+        new HashMap<ResourceId, ResourceConfig.Builder>();
+
+    Map<ResourceId, RebalancerConfig.SimpleBuilder> rebCfgBuilderMap =
         new HashMap<ResourceId, RebalancerConfig.SimpleBuilder>();
 
     for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
@@ -84,46 +157,35 @@ public class NewResourceComputationStage extends AbstractBaseStage {
               + currentState.getResourceId());
         }
 
-        // don't overwrite ideal state configs
-        if (!resourceBuilderMap.containsKey(resourceId)) {
-          if (!rebalancerConfigBuilderMap.containsKey(resourceId)) {
-            RebalancerConfig.SimpleBuilder rebalancerConfigBuilder =
-                new RebalancerConfig.SimpleBuilder(resourceId);
-            rebalancerConfigBuilder.stateModelDef(currentState.getStateModelDefId());
-            rebalancerConfigBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
-                .getStateModelFactoryName()));
-            rebalancerConfigBuilderMap.put(resourceId, rebalancerConfigBuilder);
-          }
-          ResourceConfig.Builder resourceBuilder = new ResourceConfig.Builder(resourceId);
-          resourceBuilder.bucketSize(currentState.getBucketSize());
-          resourceBuilder.batchMessageMode(currentState.getBatchMessageMode());
-          resourceBuilderMap.put(resourceId, resourceBuilder);
+        if (!resCfgBuilderMap.containsKey(resourceId)) {
+          RebalancerConfig.SimpleBuilder rebCfgBuilder =
+              new RebalancerConfig.SimpleBuilder(resourceId);
+          rebCfgBuilder.stateModelDef(currentState.getStateModelDefId());
+          rebCfgBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
+              .getStateModelFactoryName()));
+          rebCfgBuilderMap.put(resourceId, rebCfgBuilder);
+
+          ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
+          resCfgBuilder.bucketSize(currentState.getBucketSize());
+          resCfgBuilder.batchMessageMode(currentState.getBatchMessageMode());
+          resCfgBuilderMap.put(resourceId, resCfgBuilder);
         }
 
-        // add all partitions in current-state
-        if (rebalancerConfigBuilderMap.containsKey(resourceId)) {
-          RebalancerConfig.SimpleBuilder rebalancerConfigBuilder =
-              rebalancerConfigBuilderMap.get(resourceId);
-          for (PartitionId partitionId : currentState.getPartitionStateMap().keySet()) {
-            rebalancerConfigBuilder.addPartition(new Partition(partitionId));
-          }
+        RebalancerConfig.SimpleBuilder rebCfgBuilder = rebCfgBuilderMap.get(resourceId);
+        for (PartitionId partitionId : currentState.getPartitionStateMap().keySet()) {
+          rebCfgBuilder.addPartition(new Partition(partitionId));
         }
       }
-
     }
 
-    // convert builder-map to resource-map
-    Map<ResourceId, ResourceConfig> resourceMap = new LinkedHashMap<ResourceId, ResourceConfig>();
-    for (ResourceId resourceId : resourceBuilderMap.keySet()) {
-      ResourceConfig.Builder resourceConfigBuilder = resourceBuilderMap.get(resourceId);
-      if (rebalancerConfigBuilderMap.containsKey(resourceId)) {
-        RebalancerConfig.SimpleBuilder rebalancerConfigBuilder =
-            rebalancerConfigBuilderMap.get(resourceId);
-        resourceConfigBuilder.rebalancerConfig(rebalancerConfigBuilder.build());
-      }
-      resourceMap.put(resourceId, resourceConfigBuilder.build());
+    Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
+    for (ResourceId resourceId : resCfgBuilderMap.keySet()) {
+      ResourceConfig.Builder resCfgBuilder = resCfgBuilderMap.get(resourceId);
+      RebalancerConfig.SimpleBuilder rebCfgBuilder = rebCfgBuilderMap.get(resourceId);
+      resCfgBuilder.rebalancerConfig(rebCfgBuilder.build());
+      resCfgMap.put(resourceId, resCfgBuilder.build());
     }
 
-    event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
+    return resCfgMap;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
index e6b7398..ffc14d6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
@@ -54,8 +54,8 @@ public class RebalanceIdealStateStage extends AbstractBaseStage {
     for (String resourceName : idealStateMap.keySet()) {
       IdealState currentIdealState = idealStateMap.get(resourceName);
       if (currentIdealState.getRebalanceMode() == RebalanceMode.USER_DEFINED
-          && currentIdealState.getRebalancerClassName() != null) {
-        String rebalancerClassName = currentIdealState.getRebalancerClassName();
+          && currentIdealState.getRebalancerRef() != null) {
+        String rebalancerClassName = currentIdealState.getRebalancerRef().toString();
         LOG.info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
         try {
           Rebalancer balancer =

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
index 00612dd..c48f156 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
@@ -320,7 +320,7 @@ public class StatsAggregationStage extends AbstractBaseStage {
     Builder kb = accessor.keyBuilder();
     List<IdealState> idealStates = accessor.getChildValues(kb.idealStates());
     for (IdealState idealState : idealStates) {
-      String resourceName = idealState.getResourceName();
+      String resourceName = idealState.getResourceId().stringify();
       if (actualStatName.contains("=" + resourceName + ".")
           || actualStatName.contains("=" + resourceName + ";")) {
         return resourceName;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/strategy/EspressoRelayStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/EspressoRelayStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/EspressoRelayStrategy.java
index 4e88499..6a7bb59 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/EspressoRelayStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/EspressoRelayStrategy.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.helix.HelixException;
+import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.model.IdealState;
 
 public class EspressoRelayStrategy {
@@ -42,7 +43,7 @@ public class EspressoRelayStrategy {
     IdealState result = new IdealState(resultRecordName);
     result.setNumPartitions(partitions.size());
     result.setReplicas("" + replica);
-    result.setStateModelDefRef(stateModelName);
+    result.setStateModelDefId(StateModelDefId.from(stateModelName));
 
     int groups = instances.size() / replica;
     int remainder = instances.size() % replica;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
index 0a223fb..9d61914 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
@@ -40,6 +40,7 @@ import org.apache.helix.api.MessageId;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.messaging.AsyncCallback;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
@@ -180,7 +181,7 @@ public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFact
         }
         IdealState newAddedScheduledTasks = new IdealState(taskQueueName);
         newAddedScheduledTasks.setBucketSize(TASKQUEUE_BUCKET_NUM);
-        newAddedScheduledTasks.setStateModelDefRef(SCHEDULER_TASK_QUEUE);
+        newAddedScheduledTasks.setStateModelDefId(StateModelDefId.from(SCHEDULER_TASK_QUEUE));
 
         synchronized (_manager) {
           int existingTopPartitionId = 0;
@@ -233,8 +234,9 @@ public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFact
 
     private int findTopPartitionId(IdealState currentTaskQueue) {
       int topId = 0;
-      for (String partitionName : currentTaskQueue.getPartitionStringSet()) {
+      for (PartitionId partitionId : currentTaskQueue.getPartitionSet()) {
         try {
+          String partitionName = partitionId.stringify();
           String partitionNumStr = partitionName.substring(partitionName.lastIndexOf('_') + 1);
           int num = Integer.parseInt(partitionNumStr);
           if (topId < num) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 61b1cd1..14c63f4 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -53,12 +53,14 @@ import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.alerts.AlertsHolder;
 import org.apache.helix.alerts.StatsHolder;
+import org.apache.helix.api.ConstraintId;
 import org.apache.helix.api.MessageId;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.SessionId;
 import org.apache.helix.api.State;
 import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.StateModelFactoryId;
 import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
 import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConstraints;
@@ -240,9 +242,9 @@ public class ZKHelixAdmin implements HelixAdmin {
       IdealState idealState = new IdealState(idealStateRecord);
       for (String partitionName : partitionNames) {
         if ((idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO && idealState
-            .getPreferenceList(partitionName) == null)
+            .getPreferenceList(PartitionId.from(partitionName)) == null)
             || (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED && idealState
-                .getInstanceStateMap(partitionName) == null)) {
+                .getParticipantStateMap(PartitionId.from(partitionName)) == null)) {
           logger.warn("Cluster: " + clusterName + ", resource: " + resourceName + ", partition: "
               + partitionName + ", partition does not exist in ideal state");
         }
@@ -390,7 +392,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       message.setStateModelDef(stateModelDef);
       message.setFromState(State.from(HelixDefinedState.ERROR.toString()));
       message.setToState(stateModel.getInitialState());
-      message.setStateModelFactoryName(idealState.getStateModelFactoryName());
+      message.setStateModelFactoryId(idealState.getStateModelFactoryId());
 
       resetMessages.add(message);
       messageKeys.add(keyBuilder.message(instanceName, message.getId()));
@@ -590,7 +592,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public void addResource(String clusterName, String resourceName, IdealState idealstate) {
-    String stateModelRef = idealstate.getStateModelDefRef();
+    String stateModelRef = idealstate.getStateModelDefId().stringify();
     String stateModelDefPath =
         PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS, clusterName, stateModelRef);
     if (!_zkClient.exists(stateModelDefPath)) {
@@ -631,10 +633,11 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
     IdealState idealState = new IdealState(resourceName);
     idealState.setNumPartitions(partitions);
-    idealState.setStateModelDefRef(stateModelRef);
+    idealState.setStateModelDefId(StateModelDefId.from(stateModelRef));
     idealState.setRebalanceMode(mode);
     idealState.setReplicas("" + 0);
-    idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+    idealState.setStateModelFactoryId(StateModelFactoryId
+        .from(HelixConstants.DEFAULT_STATE_MODEL_FACTORY));
     if (maxPartitionsPerInstance > 0 && maxPartitionsPerInstance < Integer.MAX_VALUE) {
       idealState.setMaxPartitionsPerInstance(maxPartitionsPerInstance);
     }
@@ -900,7 +903,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     IdealState idealState = new IdealState(clusterName);
 
     idealState.setNumPartitions(1);
-    idealState.setStateModelDefRef("LeaderStandby");
+    idealState.setStateModelDefId(StateModelDefId.from("LeaderStandby"));
 
     List<String> controllers = getInstancesInCluster(grandCluster);
     if (controllers.size() == 0) {
@@ -918,7 +921,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
-    accessor.setProperty(keyBuilder.idealState(idealState.getResourceName()), idealState);
+    accessor.setProperty(keyBuilder.idealState(idealState.getResourceId().stringify()), idealState);
   }
 
   @Override
@@ -986,7 +989,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
     idealState.setReplicas(Integer.toString(replica));
     int partitions = idealState.getNumPartitions();
-    String stateModelName = idealState.getStateModelDefRef();
+    String stateModelName = idealState.getStateModelDefId().stringify();
     StateModelDefinition stateModDef = getStateModelDef(clusterName, stateModelName);
 
     if (stateModDef == null) {
@@ -1103,7 +1106,7 @@ public class ZKHelixAdmin implements HelixAdmin {
             currentData == null ? new ClusterConstraints(constraintType) : new ClusterConstraints(
                 currentData);
 
-        constraints.addConstraintItem(constraintId, constraintItem);
+        constraints.addConstraintItem(ConstraintId.from(constraintId), constraintItem);
         return constraints.getRecord();
       }
     }, AccessOption.PERSISTENT);
@@ -1123,7 +1126,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         if (currentData != null) {
           ClusterConstraints constraints = new ClusterConstraints(currentData);
 
-          constraints.removeConstraintItem(constraintId);
+          constraints.removeConstraintItem(ConstraintId.from(constraintId));
           return constraints.getRecord();
         }
         return null;
@@ -1152,8 +1155,9 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void rebalance(String clusterName, IdealState currentIdealState, List<String> instanceNames) {
     Set<String> activeInstances = new HashSet<String>();
-    for (String partition : currentIdealState.getPartitionStringSet()) {
-      activeInstances.addAll(currentIdealState.getRecord().getListField(partition));
+    for (PartitionId partition : currentIdealState.getPartitionSet()) {
+      activeInstances.addAll(IdealState.stringListFromPreferenceList(currentIdealState
+          .getPreferenceList(partition)));
     }
     instanceNames.removeAll(activeInstances);
     Map<String, Object> previousIdealState =
@@ -1162,17 +1166,16 @@ public class ZKHelixAdmin implements HelixAdmin {
     Map<String, Object> balancedRecord =
         DefaultTwoStateStrategy.calculateNextIdealState(instanceNames, previousIdealState);
     StateModelDefinition stateModDef =
-        this.getStateModelDef(clusterName, currentIdealState.getStateModelDefRef());
+        this.getStateModelDef(clusterName, currentIdealState.getStateModelDefId().stringify());
 
     if (stateModDef == null) {
-      throw new HelixException("cannot find state model: "
-          + currentIdealState.getStateModelDefRef());
+      throw new HelixException("cannot find state model: " + currentIdealState.getStateModelDefId());
     }
     String[] states = RebalanceUtil.parseStates(clusterName, stateModDef);
 
     ZNRecord newIdealStateRecord =
-        DefaultTwoStateStrategy.convertToZNRecord(balancedRecord,
-            currentIdealState.getResourceName(), states[0], states[1]);
+        DefaultTwoStateStrategy.convertToZNRecord(balancedRecord, currentIdealState.getResourceId()
+            .stringify(), states[0], states[1]);
     Set<String> partitionSet = new HashSet<String>();
     partitionSet.addAll(newIdealStateRecord.getMapFields().keySet());
     partitionSet.addAll(newIdealStateRecord.getListFields().keySet());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
index ef47a12..2b014ae 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
@@ -27,6 +27,7 @@ import java.util.TreeMap;
 
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.ConstraintId;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.builder.ConstraintItemBuilder;
 import org.apache.log4j.Logger;
@@ -42,9 +43,11 @@ public class ClusterConstraints extends HelixProperty {
    */
   public enum ConstraintAttribute {
     STATE,
+    STATE_MODEL,
     MESSAGE_TYPE,
     TRANSITION,
     RESOURCE,
+    PARTITION,
     INSTANCE,
     CONSTRAINT_VALUE
   }
@@ -53,7 +56,9 @@ public class ClusterConstraints extends HelixProperty {
    * Possible special values that constraint attributes can take
    */
   public enum ConstraintValue {
-    ANY
+    ANY,
+    N,
+    R
   }
 
   /**
@@ -65,7 +70,8 @@ public class ClusterConstraints extends HelixProperty {
   }
 
   // constraint-id -> constraint-item
-  private final Map<String, ConstraintItem> _constraints = new HashMap<String, ConstraintItem>();
+  private final Map<ConstraintId, ConstraintItem> _constraints =
+      new HashMap<ConstraintId, ConstraintItem>();
 
   /**
    * Instantiate constraints as a given type
@@ -96,7 +102,7 @@ public class ClusterConstraints extends HelixProperty {
           builder.addConstraintAttributes(_record.getMapField(constraintId)).build();
       // ignore item with empty attributes or no constraint-value
       if (item.getAttributes().size() > 0 && item.getConstraintValue() != null) {
-        addConstraintItem(constraintId, item);
+        addConstraintItem(ConstraintId.from(constraintId), item);
       } else {
         LOG.error("Skip invalid constraint. key: " + constraintId + ", value: "
             + _record.getMapField(constraintId));
@@ -109,13 +115,13 @@ public class ClusterConstraints extends HelixProperty {
    * @param constraintId unique constraint identifier
    * @param item the constraint as a {@link ConstraintItem}
    */
-  public void addConstraintItem(String constraintId, ConstraintItem item) {
+  public void addConstraintItem(ConstraintId constraintId, ConstraintItem item) {
     Map<String, String> map = new TreeMap<String, String>();
     for (ConstraintAttribute attr : item.getAttributes().keySet()) {
       map.put(attr.toString(), item.getAttributeValue(attr));
     }
     map.put(ConstraintAttribute.CONSTRAINT_VALUE.toString(), item.getConstraintValue());
-    _record.setMapField(constraintId, map);
+    _record.setMapField(constraintId.stringify(), map);
     _constraints.put(constraintId, item);
   }
 
@@ -123,8 +129,8 @@ public class ClusterConstraints extends HelixProperty {
    * Add multiple constraint items.
    * @param items (constraint identifier, {@link ConstrantItem}) pairs
    */
-  public void addConstraintItems(Map<String, ConstraintItem> items) {
-    for (String constraintId : items.keySet()) {
+  public void addConstraintItems(Map<ConstraintId, ConstraintItem> items) {
+    for (ConstraintId constraintId : items.keySet()) {
       addConstraintItem(constraintId, items.get(constraintId));
     }
   }
@@ -133,9 +139,9 @@ public class ClusterConstraints extends HelixProperty {
    * remove a constraint-item
    * @param constraintId unique constraint identifier
    */
-  public void removeConstraintItem(String constraintId) {
+  public void removeConstraintItem(ConstraintId constraintId) {
     _constraints.remove(constraintId);
-    _record.getMapFields().remove(constraintId);
+    _record.getMapFields().remove(constraintId.stringify());
   }
 
   /**
@@ -143,7 +149,7 @@ public class ClusterConstraints extends HelixProperty {
    * @param constraintId unique constraint identifier
    * @return {@link ConstraintItem} or null if not present
    */
-  public ConstraintItem getConstraintItem(String constraintId) {
+  public ConstraintItem getConstraintItem(ConstraintId constraintId) {
     return _constraints.get(constraintId);
   }
 
@@ -163,6 +169,14 @@ public class ClusterConstraints extends HelixProperty {
   }
 
   /**
+   * Get all constraint items in this collection of constraints
+   * @return map of constraint id to constraint item
+   */
+  public Map<ConstraintId, ConstraintItem> getConstraintItems() {
+    return _constraints;
+  }
+
+  /**
    * convert a message to constraint attribute pairs
    * @param msg a {@link Message} containing constraint attributes
    * @return constraint attribute scope-value pairs
@@ -182,13 +196,15 @@ public class ClusterConstraints extends HelixProperty {
       if (msg.getTgtName() != null) {
         attributes.put(ConstraintAttribute.INSTANCE, msg.getTgtName());
       }
+      if (msg.getStateModelDefId() != null) {
+        attributes.put(ConstraintAttribute.STATE_MODEL, msg.getStateModelDefId().stringify());
+      }
     }
     return attributes;
   }
 
   @Override
   public boolean isValid() {
-    // TODO Auto-generated method stub
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
index b8f4ee5..f4304f4 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
@@ -146,6 +146,9 @@ public class ExternalView extends HelixProperty {
    */
   public Map<ParticipantId, State> getStateMap(PartitionId partitionId) {
     Map<String, String> rawStateMap = getStateMap(partitionId.stringify());
+    if (rawStateMap == null) {
+      return null;
+    }
     ImmutableMap.Builder<ParticipantId, State> builder =
         new ImmutableMap.Builder<ParticipantId, State>();
     for (String participantName : rawStateMap.keySet()) {