You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:26 UTC

[18/53] [abbrv] [HELIX-100] Improve the helix config api

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
new file mode 100644
index 0000000..3aff151
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
@@ -0,0 +1,93 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.StateModelFactoryId;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Defines the state available to a rebalancer. The most common use case is to use a
+ * {@link PartitionedRebalancerContext} or a subclass and set up a resource with it. A rebalancer
+ * configuration, at a minimum, is aware of subunits of a resource, the state model to follow, and
+ * how the configuration should be serialized.
+ */
+public interface RebalancerContext {
+  /**
+   * Get a map of resource partition identifiers to partitions. A partition is a subunit of a
+   * resource, e.g. a subtask of a task
+   * @return map of (subunit id, subunit) pairs
+   */
+  public Map<? extends PartitionId, ? extends Partition> getSubUnitMap();
+
+  /**
+   * Get the subunits of the resource (e.g. partitions)
+   * @return set of subunit ids
+   */
+  public Set<? extends PartitionId> getSubUnitIdSet();
+
+  /**
+   * Get a specific subunit
+   * @param subUnitId the id of the subunit
+   * @return SubUnit
+   */
+  public Partition getSubUnit(PartitionId partitionId);
+
+  /**
+   * Get the resource to rebalance
+   * @return resource id
+   */
+  public ResourceId getResourceId();
+
+  /**
+   * Get the state model definition that the resource follows
+   * @return state model definition id
+   */
+  public StateModelDefId getStateModelDefId();
+
+  /**
+   * Get the state model factory of this resource
+   * @return state model factory id
+   */
+  public StateModelFactoryId getStateModelFactoryId();
+
+  /**
+   * Get the tag, if any, that participants must have in order to serve this resource
+   * @return participant group tag, or null
+   */
+  public String getParticipantGroupTag();
+
+  /**
+   * Get the serializer for this context
+   * @return ContextSerializer class object
+   */
+  public Class<? extends ContextSerializer> getSerializerClass();
+
+  /**
+   * Get a reference to the class used to rebalance this resource
+   * @return RebalancerRef
+   */
+  public RebalancerRef getRebalancerRef();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerRef.java
new file mode 100644
index 0000000..a90b77a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerRef.java
@@ -0,0 +1,94 @@
+package org.apache.helix.controller.rebalancer.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Reference to a class that extends {@link Rebalancer}. It loads the class automatically.
+ */
+public class RebalancerRef {
+  private static final Logger LOG = Logger.getLogger(RebalancerRef.class);
+
+  @JsonProperty("rebalancerClassName")
+  private final String _rebalancerClassName;
+
+  @JsonCreator
+  private RebalancerRef(@JsonProperty("rebalancerClassName") String rebalancerClassName) {
+    _rebalancerClassName = rebalancerClassName;
+  }
+
+  /**
+   * Get an instantiated Rebalancer
+   * @return Rebalancer or null if instantiation failed
+   */
+  @JsonIgnore
+  public Rebalancer getRebalancer() {
+    try {
+      return (Rebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
+    } catch (Exception e) {
+      LOG.warn("Exception while invoking custom rebalancer class:" + _rebalancerClassName, e);
+    }
+    return null;
+  }
+
+  @Override
+  public String toString() {
+    return _rebalancerClassName;
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that instanceof RebalancerRef) {
+      return this.toString().equals(((RebalancerRef) that).toString());
+    } else if (that instanceof String) {
+      return this.toString().equals(that);
+    }
+    return false;
+  }
+
+  /**
+   * Get a rebalancer class reference
+   * @param rebalancerClassName name of the class
+   * @return RebalancerRef or null if name is null
+   */
+  public static RebalancerRef from(String rebalancerClassName) {
+    if (rebalancerClassName == null) {
+      return null;
+    }
+    return new RebalancerRef(rebalancerClassName);
+  }
+
+  /**
+   * Get a RebalancerRef from a class object
+   * @param rebalancerClass class that implements Rebalancer
+   * @return RebalancerRef
+   */
+  public static RebalancerRef from(Class<? extends Rebalancer> rebalancerClass) {
+    if (rebalancerClass == null) {
+      return null;
+    }
+    return RebalancerRef.from(rebalancerClass.getName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
new file mode 100644
index 0000000..525931d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
@@ -0,0 +1,40 @@
+package org.apache.helix.controller.rebalancer.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Methods specifying a rebalancer context that allows replicas. For instance, a rebalancer context
+ * with partitions may accept state model definitions that support multiple replicas per partition,
+ * and it's possible that the policy is that each live participant in the system should have a
+ * replica.
+ */
+public interface ReplicatedRebalancerContext extends RebalancerContext {
+  /**
+   * Check if this resource should be assigned to any live participant
+   * @return true if any live participant expected, false otherwise
+   */
+  public boolean anyLiveParticipant();
+
+  /**
+   * Get the number of replicas that each resource subunit should have
+   * @return replica count
+   */
+  public int getReplicaCount();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
new file mode 100644
index 0000000..6fe3f54
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
@@ -0,0 +1,78 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.State;
+import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Rebalancer for the SEMI_AUTO mode. It expects a RebalancerConfig that understands the preferred
+ * locations of each partition replica
+ */
+public class SemiAutoRebalancer implements Rebalancer {
+  private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
+
+  @Override
+  public void init(HelixManager helixManager) {
+    // do nothing
+  }
+
+  @Override
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster,
+      ResourceCurrentState currentState) {
+    SemiAutoRebalancerContext config =
+        rebalancerConfig.getRebalancerContext(SemiAutoRebalancerContext.class);
+    StateModelDefinition stateModelDef =
+        cluster.getStateModelMap().get(config.getStateModelDefId());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing resource:" + config.getResourceId());
+    }
+    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+    for (PartitionId partition : config.getPartitionSet()) {
+      Map<ParticipantId, State> currentStateMap =
+          currentState.getCurrentStateMap(config.getResourceId(), partition);
+      Set<ParticipantId> disabledInstancesForPartition =
+          NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+              partition);
+      List<ParticipantId> preferenceList =
+          NewConstraintBasedAssignment.getPreferenceList(cluster, partition,
+              config.getPreferenceList(partition));
+      Map<ParticipantId, State> bestStateForPartition =
+          NewConstraintBasedAssignment.computeAutoBestStateForPartition(cluster.getConfig(),
+              config.getResourceId(), cluster.getLiveParticipantMap(), stateModelDef,
+              preferenceList, currentStateMap, disabledInstancesForPartition);
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
+    }
+    return partitionMapping;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
new file mode 100644
index 0000000..6cdfbb6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
@@ -0,0 +1,117 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerContext for SEMI_AUTO rebalancer mode. It indicates the preferred locations of each
+ * partition replica. By default, it corresponds to {@link SemiAutoRebalancer}
+ */
+public final class SemiAutoRebalancerContext extends PartitionedRebalancerContext {
+  @JsonProperty("preferenceLists")
+  private Map<PartitionId, List<ParticipantId>> _preferenceLists;
+
+  /**
+   * Instantiate a SemiAutoRebalancerContext
+   */
+  public SemiAutoRebalancerContext() {
+    super(RebalanceMode.SEMI_AUTO);
+    setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
+    _preferenceLists = Maps.newHashMap();
+  }
+
+  /**
+   * Get the preference lists of all partitions of the resource
+   * @return map of partition id to list of participant ids
+   */
+  public Map<PartitionId, List<ParticipantId>> getPreferenceLists() {
+    return _preferenceLists;
+  }
+
+  /**
+   * Set the preference lists of all partitions of the resource
+   * @param preferenceLists
+   */
+  public void setPreferenceLists(Map<PartitionId, List<ParticipantId>> preferenceLists) {
+    _preferenceLists = preferenceLists;
+  }
+
+  /**
+   * Get the preference list of a partition
+   * @param partitionId the partition to look up
+   * @return list of participant ids
+   */
+  @JsonIgnore
+  public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
+    return _preferenceLists.get(partitionId);
+  }
+
+  /**
+   * Build a SemiAutoRebalancerContext. By default, it corresponds to {@link SemiAutoRebalancer}
+   */
+  public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
+    private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
+
+    /**
+     * Instantiate for a resource
+     * @param resourceId resource id
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+      super.rebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
+      _preferenceLists = Maps.newHashMap();
+    }
+
+    /**
+     * Add a preference list for a partition
+     * @param partitionId partition to set
+     * @param preferenceList ordered list of participants who can serve the partition
+     * @return Builder
+     */
+    public Builder preferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
+      _preferenceLists.put(partitionId, preferenceList);
+      return self();
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    @Override
+    public SemiAutoRebalancerContext build() {
+      SemiAutoRebalancerContext context = new SemiAutoRebalancerContext();
+      super.update(context);
+      context.setPreferenceLists(_preferenceLists);
+      return context;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 2c4d8e1..f50b95c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -42,7 +42,6 @@ import org.apache.log4j.Logger;
  * Reads the data from the cluster using data accessor. This output ClusterData which
  * provides useful methods to search/lookup properties
  */
-@Deprecated
 public class ClusterDataCache {
 
   Map<String, LiveInstance> _liveInstanceMap;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
index 021c9b8..6334279 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -22,25 +22,19 @@ package org.apache.helix.controller.stages;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.HelixManager;
 import org.apache.helix.api.Cluster;
-import org.apache.helix.api.CustomRebalancerConfig;
-import org.apache.helix.api.FullAutoRebalancerConfig;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
-import org.apache.helix.api.SemiAutoRebalancerConfig;
 import org.apache.helix.api.StateModelDefId;
-import org.apache.helix.api.UserDefinedRebalancerConfig;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.NewAutoRebalancer;
-import org.apache.helix.controller.rebalancer.NewCustomRebalancer;
-import org.apache.helix.controller.rebalancer.NewSemiAutoRebalancer;
-import org.apache.helix.controller.rebalancer.NewUserDefinedRebalancer;
+import org.apache.helix.controller.rebalancer.context.Rebalancer;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
-import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -115,52 +109,24 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
     Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
 
     for (ResourceId resourceId : resourceMap.keySet()) {
-      LOG.debug("Processing resource:" + resourceId);
-      // Resource may be gone. In that case we need to get the state model name
-      // from the current state
-      // if (cluster.getResource(resourceId) == null) {
-      // // if resource is deleted, then we do not know which rebalancer to use
-      // // instead, just mark all partitions of the resource as dropped
-      // if (LOG.isInfoEnabled()) {
-      // LOG.info("resource:" + resourceId + " does not exist anymore");
-      // }
-      // StateModelDefinition stateModelDef =
-      // stateModelDefs.get(currentStateOutput.getResourceStateModelDef(resourceId));
-      // ResourceAssignment droppedAssignment =
-      // mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
-      // output.setResourceAssignment(resourceId, droppedAssignment);
-      // continue;
-      // }
-
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Processing resource:" + resourceId);
+      }
       ResourceConfig resourceConfig = resourceMap.get(resourceId);
       RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
       ResourceAssignment resourceAssignment = null;
-      if (rebalancerConfig.getRebalancerMode() == RebalanceMode.USER_DEFINED) {
-        UserDefinedRebalancerConfig config = UserDefinedRebalancerConfig.from(rebalancerConfig);
-        if (config.getRebalancerRef() != null) {
-          NewUserDefinedRebalancer rebalancer = config.getRebalancerRef().getRebalancer();
-          resourceAssignment =
-              rebalancer.computeResourceMapping(config, cluster, currentStateOutput);
-        }
-      } else {
-        if (rebalancerConfig.getRebalancerMode() == RebalanceMode.FULL_AUTO) {
-          FullAutoRebalancerConfig config = FullAutoRebalancerConfig.from(rebalancerConfig);
-          resourceAssignment =
-              new NewAutoRebalancer().computeResourceMapping(config, cluster, currentStateOutput);
-        } else if (rebalancerConfig.getRebalancerMode() == RebalanceMode.SEMI_AUTO) {
-          SemiAutoRebalancerConfig config = SemiAutoRebalancerConfig.from(rebalancerConfig);
-          resourceAssignment =
-              new NewSemiAutoRebalancer().computeResourceMapping(config, cluster,
-                  currentStateOutput);
-        } else if (rebalancerConfig.getRebalancerMode() == RebalanceMode.CUSTOMIZED) {
-          CustomRebalancerConfig config = CustomRebalancerConfig.from(rebalancerConfig);
+      if (rebalancerConfig != null) {
+        Rebalancer rebalancer = rebalancerConfig.getRebalancer();
+        if (rebalancer != null) {
+          HelixManager manager = event.getAttribute("helixmanager");
+          rebalancer.init(manager);
           resourceAssignment =
-              new NewCustomRebalancer().computeResourceMapping(config, cluster, currentStateOutput);
+              rebalancer.computeResourceMapping(rebalancerConfig, cluster, currentStateOutput);
         }
       }
       if (resourceAssignment == null) {
-        StateModelDefinition stateModelDef =
-            stateModelDefs.get(rebalancerConfig.getStateModelDefId());
+        RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+        StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
         resourceAssignment =
             mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
       }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
index 873419c..1a92919 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
@@ -80,7 +80,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
 
         if (!message.getBatchMessageMode()) {
           PartitionId partitionId = message.getPartitionId();
-          Partition partition = resource.getPartition(partitionId);
+          Partition partition = resource.getSubUnit(partitionId);
           if (partition != null) {
             currentStateOutput.setPendingState(resourceId, partitionId, participantId,
                 message.getToState());
@@ -91,7 +91,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
           List<PartitionId> partitionNames = message.getPartitionIds();
           if (!partitionNames.isEmpty()) {
             for (PartitionId partitionId : partitionNames) {
-              Partition partition = resource.getPartition(partitionId);
+              Partition partition = resource.getSubUnit(partitionId);
               if (partition != null) {
                 currentStateOutput.setPendingState(resourceId, partitionId, participantId,
                     message.getToState());
@@ -126,11 +126,10 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
 
         Map<PartitionId, State> partitionStateMap = curState.getPartitionStateMap();
         for (PartitionId partitionId : partitionStateMap.keySet()) {
-          Partition partition = resource.getPartition(partitionId);
+          Partition partition = resource.getSubUnit(partitionId);
           if (partition != null) {
             currentStateOutput.setCurrentState(resourceId, partitionId, participantId,
                 curState.getState(partitionId));
-
           } else {
             // log
           }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
index 95c76e6..477247e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
@@ -37,7 +37,6 @@ import org.apache.helix.ZNRecordDelta.MergeOperation;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.SchedulerTaskConfig;
@@ -45,6 +44,8 @@ import org.apache.helix.api.State;
 import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
@@ -53,7 +54,7 @@ import org.apache.helix.model.StatusUpdate;
 import org.apache.log4j.Logger;
 
 public class NewExternalViewComputeStage extends AbstractBaseStage {
-  private static Logger LOG = Logger.getLogger(ExternalViewComputeStage.class);
+  private static Logger LOG = Logger.getLogger(NewExternalViewComputeStage.class);
 
   @Override
   public void process(ClusterEvent event) throws Exception {
@@ -97,7 +98,7 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
       } else {
         view.setBucketSize(currentStateOutput.getBucketSize(resourceId));
       }
-      for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
         Map<ParticipantId, State> currentStateMap =
             currentStateOutput.getCurrentStateMap(resourceId, partitionId);
         if (currentStateMap != null && currentStateMap.size() > 0) {
@@ -138,8 +139,11 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
         // partitions are finished (COMPLETED or ERROR), update the status update of the original
         // scheduler
         // message, and then remove the partitions from the ideal state
-        if (rebalancerConfig != null
-            && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
+        RebalancerContext rebalancerContext =
+            (rebalancerConfig != null) ? rebalancerConfig
+                .getRebalancerContext(RebalancerContext.class) : null;
+        if (rebalancerContext != null
+            && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
                 StateModelDefId.SchedulerTaskQueue)) {
           updateScheduledTaskStatus(resourceId, view, manager, schedulerTaskConfig);
         }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
index c0bde54..ad0ad95 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
@@ -30,7 +30,6 @@ import org.apache.helix.api.Cluster;
 import org.apache.helix.api.MessageId;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.SchedulerTaskConfig;
@@ -40,6 +39,7 @@ import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.api.StateModelFactoryId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
@@ -76,13 +76,14 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
       ResourceConfig resourceConfig = resourceMap.get(resourceId);
       int bucketSize = resourceConfig.getBucketSize();
 
-      StateModelDefinition stateModelDef =
-          stateModelDefMap.get(resourceConfig.getRebalancerConfig().getStateModelDefId());
+      RebalancerContext rebalancerCtx =
+          resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
+      StateModelDefinition stateModelDef = stateModelDefMap.get(rebalancerCtx.getStateModelDefId());
 
       ResourceAssignment resourceAssignment =
           bestPossibleStateOutput.getResourceAssignment(resourceId);
-      for (PartitionId partitionId : resourceConfig.getPartitionMap().keySet()) {
-        Map<ParticipantId, State> instanceStateMap = resourceAssignment.getReplicaMap(partitionId);
+      for (PartitionId subUnitId : resourceConfig.getSubUnitMap().keySet()) {
+        Map<ParticipantId, State> instanceStateMap = resourceAssignment.getReplicaMap(subUnitId);
 
         // we should generate message based on the desired-state priority
         // so keep generated messages in a temp map keyed by state
@@ -93,7 +94,7 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
           State desiredState = instanceStateMap.get(participantId);
 
           State currentState =
-              currentStateOutput.getCurrentState(resourceId, partitionId, participantId);
+              currentStateOutput.getCurrentState(resourceId, subUnitId, participantId);
           if (currentState == null) {
             currentState = stateModelDef.getInitialState();
           }
@@ -103,12 +104,12 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
           }
 
           State pendingState =
-              currentStateOutput.getPendingState(resourceId, partitionId, participantId);
+              currentStateOutput.getPendingState(resourceId, subUnitId, participantId);
 
           // TODO fix it
           State nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
           if (nextState == null) {
-            LOG.error("Unable to find a next state for partition: " + partitionId
+            LOG.error("Unable to find a next state for partition: " + subUnitId
                 + " from stateModelDefinition" + stateModelDef.getClass() + " from:" + currentState
                 + " to:" + desiredState);
             continue;
@@ -116,13 +117,13 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
 
           if (pendingState != null) {
             if (nextState.equals(pendingState)) {
-              LOG.debug("Message already exists for " + participantId + " to transit "
-                  + partitionId + " from " + currentState + " to " + nextState);
+              LOG.debug("Message already exists for " + participantId + " to transit " + subUnitId
+                  + " from " + currentState + " to " + nextState);
             } else if (currentState.equals(pendingState)) {
               LOG.info("Message hasn't been removed for " + participantId + " to transit"
-                  + partitionId + " to " + pendingState + ", desiredState: " + desiredState);
+                  + subUnitId + " to " + pendingState + ", desiredState: " + desiredState);
             } else {
-              LOG.info("IdealState changed before state transition completes for " + partitionId
+              LOG.info("IdealState changed before state transition completes for " + subUnitId
                   + " on " + participantId + ", pendingState: " + pendingState + ", currentState: "
                   + currentState + ", nextState: " + nextState);
             }
@@ -131,20 +132,21 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
             SessionId sessionId =
                 cluster.getLiveParticipantMap().get(participantId).getRunningInstance()
                     .getSessionId();
+            RebalancerContext rebalancerContext =
+                resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
             Message message =
-                createMessage(manager, resourceId, partitionId, participantId, currentState,
+                createMessage(manager, resourceId, subUnitId, participantId, currentState,
                     nextState, sessionId, StateModelDefId.from(stateModelDef.getId()),
-                    resourceConfig.getRebalancerConfig().getStateModelFactoryId(), bucketSize);
+                    rebalancerContext.getStateModelFactoryId(), bucketSize);
 
             // TODO refactor get/set timeout/inner-message
-            RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
-            if (rebalancerConfig != null
-                && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
+            if (rebalancerContext != null
+                && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
                     StateModelDefId.SchedulerTaskQueue)) {
-              if (resourceConfig.getPartitionMap().size() > 0) {
+              if (resourceConfig.getSubUnitMap().size() > 0) {
                 // TODO refactor it -- we need a way to read in scheduler tasks a priori
                 Message innerMsg =
-                    resourceConfig.getSchedulerTaskConfig().getInnerMessage(partitionId);
+                    resourceConfig.getSchedulerTaskConfig().getInnerMessage(subUnitId);
                 if (innerMsg != null) {
                   message.setInnerMessage(innerMsg);
                 }
@@ -157,12 +159,12 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
                     Message.Attributes.TIMEOUT.name());
             SchedulerTaskConfig schedulerTaskConfig = resourceConfig.getSchedulerTaskConfig();
             if (schedulerTaskConfig != null) {
-              int timeout = schedulerTaskConfig.getTimeout(stateTransition, partitionId);
+              int timeout = schedulerTaskConfig.getTimeout(stateTransition, subUnitId);
               if (timeout > 0) {
                 message.setExecutionTimeout(timeout);
               }
             }
-            message.getRecord().setSimpleField("ClusterEventName", event.getName());
+            message.setClusterEvent(event);
 
             if (!messageMap.containsKey(desiredState)) {
               messageMap.put(desiredState, new ArrayList<Message>());
@@ -176,7 +178,7 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
         for (State state : statesPriorityList) {
           if (messageMap.containsKey(state)) {
             for (Message message : messageMap.get(state)) {
-              output.addMessage(resourceId, partitionId, message);
+              output.addMessage(resourceId, subUnitId, message);
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
index 8ea2013..04d6af8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
@@ -30,14 +30,17 @@ import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.Scope;
 import org.apache.helix.api.State;
 import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.ReplicatedRebalancerContext;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -104,7 +107,8 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
     for (ResourceId resourceId : resourceMap.keySet()) {
       ResourceConfig resource = resourceMap.get(resourceId);
       StateModelDefinition stateModelDef =
-          stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
+          stateModelDefMap.get(resource.getRebalancerConfig()
+              .getRebalancerContext(RebalancerContext.class).getStateModelDefId());
 
       // TODO have a logical model for transition
       Map<String, Integer> stateTransitionPriorities = getStateTransitionPriorityMap(stateModelDef);
@@ -116,7 +120,7 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
               configResource == null ? null : configResource.getRebalancerConfig(), cluster);
 
       // TODO fix it
-      for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
         List<Message> messages = messageGenOutput.getMessages(resourceId, partitionId);
         List<Message> selectedMessages =
             selectMessages(cluster.getLiveParticipantMap(),
@@ -259,22 +263,27 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
    */
   private Map<State, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
       RebalancerConfig rebalancerConfig, Cluster cluster) {
+    ReplicatedRebalancerContext context =
+        (rebalancerConfig != null) ? rebalancerConfig
+            .getRebalancerContext(ReplicatedRebalancerContext.class) : null;
     Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
 
     List<State> statePriorityList = stateModelDefinition.getStatesPriorityList();
     for (State state : statePriorityList) {
-      String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state.toString());
+      String numInstancesPerState =
+          cluster.getStateUpperBoundConstraint(Scope.cluster(cluster.getId()),
+              stateModelDefinition.getStateModelDefId(), state);
       int max = -1;
       if ("N".equals(numInstancesPerState)) {
         max = cluster.getLiveParticipantMap().size();
       } else if ("R".equals(numInstancesPerState)) {
         // idealState is null when resource has been dropped,
         // R can't be evaluated and ignore state constraints
-        if (rebalancerConfig != null) {
-          if (rebalancerConfig.canAssignAnyLiveParticipant()) {
+        if (context != null) {
+          if (context.anyLiveParticipant()) {
             max = cluster.getLiveParticipantMap().size();
           } else {
-            max = rebalancerConfig.getReplicaCount();
+            max = context.getReplicaCount();
           }
         }
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
index cfbd45c..8c3c847 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
@@ -147,7 +147,7 @@ public class NewMessageThrottleStage extends AbstractBaseStage {
     for (ResourceId resourceId : resourceMap.keySet()) {
       ResourceConfig resource = resourceMap.get(resourceId);
       // TODO fix it
-      for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
         List<Message> messages = msgSelectionOutput.getMessages(resourceId, partitionId);
         if (constraint != null && messages != null && messages.size() > 0) {
           messages = throttle(throttleCounterMap, constraint, messages, true);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
index 457b470..38da0ac 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -23,20 +23,18 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.helix.api.Cluster;
-import org.apache.helix.api.CustomRebalancerConfig;
-import org.apache.helix.api.FullAutoRebalancerConfig;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.Partition;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
-import org.apache.helix.api.SemiAutoRebalancerConfig;
 import org.apache.helix.api.StateModelFactoryId;
-import org.apache.helix.api.UserDefinedRebalancerConfig;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.model.CurrentState;
 import org.apache.log4j.Logger;
 
@@ -74,58 +72,8 @@ public class NewResourceComputationStage extends AbstractBaseStage {
       resCfgBuilder.bucketSize(resource.getBucketSize());
       resCfgBuilder.batchMessageMode(resource.getBatchMessageMode());
       resCfgBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
-
-      switch (rebalancerCfg.getRebalancerMode()) {
-      case USER_DEFINED: {
-        UserDefinedRebalancerConfig.Builder builder =
-            new UserDefinedRebalancerConfig.Builder(UserDefinedRebalancerConfig.from(rebalancerCfg));
-        if (csResCfgMap.containsKey(resourceId)) {
-          builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
-        }
-        resCfgBuilder.rebalancerConfig(builder.build());
-        resCfgMap.put(resourceId, resCfgBuilder.build());
-        break;
-      }
-      case FULL_AUTO: {
-        FullAutoRebalancerConfig.Builder builder =
-            new FullAutoRebalancerConfig.Builder(FullAutoRebalancerConfig.from(rebalancerCfg));
-        if (csResCfgMap.containsKey(resourceId)) {
-          builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
-        }
-        resCfgBuilder.rebalancerConfig(builder.build());
-        resCfgMap.put(resourceId, resCfgBuilder.build());
-        break;
-      }
-      case SEMI_AUTO: {
-        SemiAutoRebalancerConfig.Builder builder =
-            new SemiAutoRebalancerConfig.Builder(SemiAutoRebalancerConfig.from(rebalancerCfg));
-        if (csResCfgMap.containsKey(resourceId)) {
-          builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
-        }
-        resCfgBuilder.rebalancerConfig(builder.build());
-        resCfgMap.put(resourceId, resCfgBuilder.build());
-        break;
-      }
-      case CUSTOMIZED: {
-        CustomRebalancerConfig.Builder builder =
-            new CustomRebalancerConfig.Builder(CustomRebalancerConfig.from(rebalancerCfg));
-        if (csResCfgMap.containsKey(resourceId)) {
-          builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
-        }
-        resCfgBuilder.rebalancerConfig(builder.build());
-        resCfgMap.put(resourceId, resCfgBuilder.build());
-        break;
-      }
-      default:
-        RebalancerConfig.SimpleBuilder builder = new RebalancerConfig.SimpleBuilder(rebalancerCfg);
-        if (csResCfgMap.containsKey(resourceId)) {
-          builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
-        }
-        resCfgBuilder.rebalancerConfig(builder.build());
-        resCfgMap.put(resourceId, resCfgBuilder.build());
-        break;
-      }
-
+      resCfgBuilder.rebalancerContext(rebalancerCfg.getRebalancerContext(RebalancerContext.class));
+      resCfgMap.put(resourceId, resCfgBuilder.build());
     }
 
     event.addAttribute(AttributeName.RESOURCES.toString(), resCfgMap);
@@ -137,13 +85,12 @@ public class NewResourceComputationStage extends AbstractBaseStage {
    * @return resource config map or empty map if not available
    * @throws StageException
    */
-  Map<ResourceId, ResourceConfig> getCurStateResourceCfgMap(Cluster cluster)
-      throws StageException {
+  Map<ResourceId, ResourceConfig> getCurStateResourceCfgMap(Cluster cluster) throws StageException {
     Map<ResourceId, ResourceConfig.Builder> resCfgBuilderMap =
         new HashMap<ResourceId, ResourceConfig.Builder>();
 
-    Map<ResourceId, RebalancerConfig.SimpleBuilder> rebCfgBuilderMap =
-        new HashMap<ResourceId, RebalancerConfig.SimpleBuilder>();
+    Map<ResourceId, PartitionedRebalancerContext.Builder> rebCtxBuilderMap =
+        new HashMap<ResourceId, PartitionedRebalancerContext.Builder>();
 
     for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
       for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
@@ -158,12 +105,12 @@ public class NewResourceComputationStage extends AbstractBaseStage {
         }
 
         if (!resCfgBuilderMap.containsKey(resourceId)) {
-          RebalancerConfig.SimpleBuilder rebCfgBuilder =
-              new RebalancerConfig.SimpleBuilder(resourceId);
-          rebCfgBuilder.stateModelDef(currentState.getStateModelDefId());
-          rebCfgBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
+          PartitionedRebalancerContext.Builder rebCtxBuilder =
+              new PartitionedRebalancerContext.Builder(resourceId);
+          rebCtxBuilder.stateModelDefId(currentState.getStateModelDefId());
+          rebCtxBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
               .getStateModelFactoryName()));
-          rebCfgBuilderMap.put(resourceId, rebCfgBuilder);
+          rebCtxBuilderMap.put(resourceId, rebCtxBuilder);
 
           ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
           resCfgBuilder.bucketSize(currentState.getBucketSize());
@@ -171,9 +118,9 @@ public class NewResourceComputationStage extends AbstractBaseStage {
           resCfgBuilderMap.put(resourceId, resCfgBuilder);
         }
 
-        RebalancerConfig.SimpleBuilder rebCfgBuilder = rebCfgBuilderMap.get(resourceId);
+        PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
         for (PartitionId partitionId : currentState.getPartitionStateMap().keySet()) {
-          rebCfgBuilder.addPartition(new Partition(partitionId));
+          rebCtxBuilder.addPartition(new Partition(partitionId));
         }
       }
     }
@@ -181,8 +128,8 @@ public class NewResourceComputationStage extends AbstractBaseStage {
     Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
     for (ResourceId resourceId : resCfgBuilderMap.keySet()) {
       ResourceConfig.Builder resCfgBuilder = resCfgBuilderMap.get(resourceId);
-      RebalancerConfig.SimpleBuilder rebCfgBuilder = rebCfgBuilderMap.get(resourceId);
-      resCfgBuilder.rebalancerConfig(rebCfgBuilder.build());
+      PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
+      resCfgBuilder.rebalancerContext(rebCtxBuilder.build());
       resCfgMap.put(resourceId, resCfgBuilder.build());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
index 1bbfc15..74b29c7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
@@ -66,7 +66,7 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
     List<Message> messagesToSend = new ArrayList<Message>();
     for (ResourceId resourceId : resourceMap.keySet()) {
       ResourceConfig resource = resourceMap.get(resourceId);
-      for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
         List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
         messagesToSend.addAll(messages);
       }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 88992c1..ff0923d 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -34,12 +34,11 @@ import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerRef;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.State;
 import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.api.StateModelFactoryId;
-import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.rebalancer.context.RebalancerRef;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Function;
@@ -559,7 +558,9 @@ public class IdealState extends HelixProperty {
    * @param name state model factory id
    */
   public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
-    setStateModelFactoryName(stateModelFactoryId.stringify());
+    if (stateModelFactoryId != null) {
+      setStateModelFactoryName(stateModelFactoryId.stringify());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 29ac173..b372ac2 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -41,6 +41,7 @@ import org.apache.helix.api.SessionId;
 import org.apache.helix.api.State;
 import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.api.StateModelFactoryId;
+import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 
 import com.google.common.collect.ImmutableList;
@@ -732,6 +733,22 @@ public class Message extends HelixProperty {
     _record.setMapField(Attributes.INNER_MESSAGE.name(), message.getRecord().getSimpleFields());
   }
 
+  /**
+   * Set the cluster event generating this message
+   * @param event cluster event
+   */
+  public void setClusterEvent(ClusterEvent event) {
+    _record.setSimpleField("ClusterEventName", event.getName());
+  }
+
+  /**
+   * Get the cluster event name generating this message
+   * @param the cluster event event name
+   */
+  public String getClusterEventName() {
+    return _record.getSimpleField("ClusterEventName");
+  }
+
   private boolean isNullOrEmpty(String data) {
     return data == null || data.length() == 0 || data.trim().length() == 0;
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
index 00f8472..8966434 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
@@ -1,13 +1,10 @@
 package org.apache.helix.model;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.api.NamespacedConfig;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.ResourceId;
 
 import com.google.common.base.Function;
@@ -91,25 +88,4 @@ public class ResourceConfiguration extends HelixProperty {
     return null;
   }
 
-  /**
-   * Add a rebalancer config to this resource
-   * @param config populated rebalancer config
-   */
-  public void addRebalancerConfig(RebalancerConfig config) {
-    addNamespacedConfig(config);
-    setPartitionIds(new ArrayList<PartitionId>(config.getPartitionSet()));
-  }
-
-  /**
-   * Create a new ResourceConfiguration from a NamespacedConfig
-   * @param namespacedConfig namespaced configuration properties
-   * @return ResourceConfiguration
-   */
-  public static ResourceConfiguration from(NamespacedConfig namespacedConfig) {
-    ResourceConfiguration resourceConfiguration =
-        new ResourceConfiguration(ResourceId.from(namespacedConfig.getId()));
-    resourceConfiguration.addNamespacedConfig(namespacedConfig);
-    return resourceConfiguration;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index 6e5576a..7e16de0 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -276,7 +276,7 @@ public class ClusterStateVerifier {
             PartitionId partitionId = PartitionId.from(partitionName);
             ParticipantId participantId = ParticipantId.from(instanceName);
             raBuilder.addAssignment(partitionId, participantId,
-                new State(HelixDefinedState.ERROR.toString()));
+                State.from(HelixDefinedState.ERROR.toString()));
           }
           bestPossOutput.setResourceAssignment(resourceId, raBuilder.build());
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java b/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java
index 9f8f494..29370cb 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java
@@ -1,13 +1,10 @@
 package org.apache.helix.api;
 
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
-import org.apache.helix.model.ResourceConfiguration;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -86,44 +83,4 @@ public class TestNamespacedConfig {
     Assert.assertEquals(instanceConfig.getRecord().getListField(prefixedKey), testListValue);
     Assert.assertEquals(instanceConfig.getRecord().getMapField(prefixedKey), testMapValue);
   }
-
-  @Test
-  public void testConfiguredResource() {
-    // Set up the namespaced configs
-    String userKey = "userKey";
-    String userValue = "userValue";
-    ResourceId resourceId = ResourceId.from("testResource");
-    UserConfig userConfig = new UserConfig(Scope.resource(resourceId));
-    userConfig.setSimpleField(userKey, userValue);
-    PartitionId partitionId = PartitionId.from(resourceId, "0");
-    Partition partition = new Partition(partitionId);
-    Map<ParticipantId, State> preferenceMap = new HashMap<ParticipantId, State>();
-    ParticipantId participantId = ParticipantId.from("participant");
-    preferenceMap.put(participantId, State.from("ONLINE"));
-    CustomRebalancerConfig rebalancerConfig =
-        new CustomRebalancerConfig.Builder(resourceId).replicaCount(1).addPartition(partition)
-            .stateModelDef(StateModelDefId.from("OnlineOffline"))
-            .preferenceMap(partitionId, preferenceMap).build();
-
-    // copy in the configs
-    ResourceConfiguration config = new ResourceConfiguration(resourceId);
-    config.addNamespacedConfig(userConfig);
-    config.addRebalancerConfig(rebalancerConfig);
-
-    // recreate the configs and check the fields
-    UserConfig retrievedUserConfig = UserConfig.from(config);
-    Assert.assertEquals(retrievedUserConfig.getSimpleField(userKey), userValue);
-    Map<PartitionId, UserConfig> partitionConfigs = Collections.emptyMap();
-    RebalancerConfig retrievedRebalancerConfig = RebalancerConfig.from(config, partitionConfigs);
-    Assert.assertEquals(retrievedRebalancerConfig.getReplicaCount(),
-        rebalancerConfig.getReplicaCount());
-    Assert.assertEquals(retrievedRebalancerConfig.getStateModelDefId(),
-        rebalancerConfig.getStateModelDefId());
-    Assert.assertTrue(retrievedRebalancerConfig.getPartitionMap().containsKey(partitionId));
-    Assert.assertEquals(retrievedRebalancerConfig.getPartitionSet().size(), rebalancerConfig
-        .getPartitionSet().size());
-    CustomRebalancerConfig customConfig = CustomRebalancerConfig.from(retrievedRebalancerConfig);
-    Assert.assertEquals(customConfig.getPreferenceMap(partitionId).get(participantId),
-        State.from("ONLINE"));
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index 7fc4c83..5775f9e 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -27,7 +27,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.controller.rebalancer.NewSemiAutoRebalancer;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
@@ -38,7 +38,6 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.mock.controller.ClusterController;
 import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -94,8 +93,8 @@ public class TestNewStages extends ZkUnitTestBase {
     ResourceId resourceId = ResourceId.from("TestDB0");
     Assert.assertTrue(resourceMap.containsKey(resourceId));
     Resource resource = resourceMap.get(resourceId);
-    Assert
-        .assertEquals(resource.getRebalancerConfig().getRebalancerMode(), RebalanceMode.SEMI_AUTO);
+    Assert.assertNotNull(resource.getRebalancerConfig().getRebalancerContext(
+        SemiAutoRebalancerContext.class));
 
     System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
   }
@@ -157,11 +156,9 @@ public class TestNewStages extends ZkUnitTestBase {
     ResourceId resourceId = ResourceId.from("TestDB0");
     Resource resource = cluster.getResource(resourceId);
     ResourceCurrentState currentStateOutput = new ResourceCurrentState();
-    SemiAutoRebalancerConfig semiAutoConfig =
-        SemiAutoRebalancerConfig.from(resource.getRebalancerConfig());
     ResourceAssignment semiAutoResult =
-        new NewSemiAutoRebalancer().computeResourceMapping(semiAutoConfig, cluster,
-            currentStateOutput);
+        resource.getRebalancerConfig().getRebalancer()
+            .computeResourceMapping(resource.getRebalancerConfig(), cluster, currentStateOutput);
     verifySemiAutoRebalance(resource, semiAutoResult);
 
     System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
@@ -173,10 +170,11 @@ public class TestNewStages extends ZkUnitTestBase {
    * @param assignment the assignment to verify
    */
   private void verifySemiAutoRebalance(Resource resource, ResourceAssignment assignment) {
-    Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getPartitionSet().size());
-    SemiAutoRebalancerConfig config = SemiAutoRebalancerConfig.from(resource.getRebalancerConfig());
+    Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getSubUnitSet().size());
+    SemiAutoRebalancerContext context =
+        resource.getRebalancerConfig().getRebalancerContext(SemiAutoRebalancerContext.class);
     for (PartitionId partitionId : assignment.getMappedPartitions()) {
-      List<ParticipantId> preferenceList = config.getPreferenceList(partitionId);
+      List<ParticipantId> preferenceList = context.getPreferenceList(partitionId);
       Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
       Assert.assertEquals(replicaMap.size(), preferenceList.size());
       Assert.assertEquals(replicaMap.size(), r);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
new file mode 100644
index 0000000..ebdcaff
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
@@ -0,0 +1,103 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.Map;
+
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.model.ResourceConfiguration;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Ensure that a RebalancerContext of a specified type is able to be serialized and deserialized.
+ */
+public class TestSerializeRebalancerContext {
+  @Test
+  public void basicTest() {
+    // populate a context
+    CustomRebalancerContext context = new CustomRebalancerContext();
+    context.setAnyLiveParticipant(false);
+    context.setMaxPartitionsPerParticipant(Integer.MAX_VALUE);
+    Map<PartitionId, Partition> partitionMap = Maps.newHashMap();
+    ResourceId resourceId = ResourceId.from("testResource");
+    PartitionId partitionId = PartitionId.from(resourceId, "0");
+    partitionMap.put(partitionId, new Partition(partitionId));
+    context.setPartitionMap(partitionMap);
+    Map<PartitionId, Map<ParticipantId, State>> preferenceMaps = Maps.newHashMap();
+    ParticipantId participant1 = ParticipantId.from("participant1");
+    ParticipantId participant2 = ParticipantId.from("participant2");
+    Map<ParticipantId, State> preferenceMap =
+        ImmutableMap.of(participant1, State.from("MASTER"), participant2, State.from("SLAVE"));
+    preferenceMaps.put(partitionId, preferenceMap);
+    context.setPreferenceMaps(preferenceMaps);
+    context.setReplicaCount(3);
+    context.setStateModelDefId(StateModelDefId.from("MasterSlave"));
+    context.setResourceId(resourceId);
+
+    // serialize and deserialize by wrapping in a config
+    RebalancerConfig config = new RebalancerConfig(context);
+    CustomRebalancerContext deserialized =
+        config.getRebalancerContext(CustomRebalancerContext.class);
+
+    // check to make sure that the two objects contain the same data
+    Assert.assertNotNull(deserialized);
+    Assert.assertEquals(deserialized.anyLiveParticipant(), context.anyLiveParticipant());
+    Assert.assertEquals(deserialized.getPreferenceMap(partitionId).get(participant1), context
+        .getPreferenceMap(partitionId).get(participant1));
+    Assert.assertEquals(deserialized.getPreferenceMap(partitionId).get(participant2), context
+        .getPreferenceMap(partitionId).get(participant2));
+    Assert.assertEquals(deserialized.getReplicaCount(), context.getReplicaCount());
+    Assert.assertEquals(deserialized.getStateModelDefId(), context.getStateModelDefId());
+    Assert.assertEquals(deserialized.getResourceId(), context.getResourceId());
+
+    // wrap in a physical config and then unwrap it
+    ResourceConfiguration physicalConfig = new ResourceConfiguration(resourceId);
+    physicalConfig.addNamespacedConfig(config.toNamespacedConfig());
+    RebalancerConfig extractedConfig = new RebalancerConfig(physicalConfig);
+    CustomRebalancerContext extractedContext =
+        extractedConfig.getRebalancerContext(CustomRebalancerContext.class);
+
+    // make sure the unwrapped data hasn't changed
+    Assert.assertNotNull(extractedContext);
+    Assert.assertEquals(extractedContext.anyLiveParticipant(), context.anyLiveParticipant());
+    Assert.assertEquals(extractedContext.getPreferenceMap(partitionId).get(participant1), context
+        .getPreferenceMap(partitionId).get(participant1));
+    Assert.assertEquals(extractedContext.getPreferenceMap(partitionId).get(participant2), context
+        .getPreferenceMap(partitionId).get(participant2));
+    Assert.assertEquals(extractedContext.getReplicaCount(), context.getReplicaCount());
+    Assert.assertEquals(extractedContext.getStateModelDefId(), context.getStateModelDefId());
+    Assert.assertEquals(extractedContext.getResourceId(), context.getResourceId());
+
+    // make sure that it's legal to use a base rebalancer context
+    RebalancerContext rebalancerContext =
+        extractedConfig.getRebalancerContext(RebalancerContext.class);
+    Assert.assertNotNull(rebalancerContext);
+    Assert.assertEquals(rebalancerContext.getResourceId(), context.getResourceId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index f7cdcba..262f779 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -20,7 +20,6 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -33,8 +32,6 @@ import org.apache.helix.Mocks;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
@@ -43,6 +40,8 @@ import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.api.UserConfig;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
@@ -169,14 +168,11 @@ public class BaseStageTest {
     Map<ResourceId, ResourceConfig> resourceMap = new HashMap<ResourceId, ResourceConfig>();
     for (IdealState idealState : idealStates) {
       ResourceId resourceId = idealState.getResourceId();
-      Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
-      for (PartitionId partitionId : idealState.getPartitionSet()) {
-        partitionMap.put(partitionId, new Partition(partitionId));
-      }
-      Map<PartitionId, UserConfig> partitionConfigMap = Collections.emptyMap();
+      RebalancerContext context = PartitionedRebalancerContext.from(idealState);
       Resource resource =
-          new Resource(resourceId, idealState, null, null, new UserConfig(
-              Scope.resource(resourceId)), partitionConfigMap);
+          new Resource(resourceId, idealState, null, null, context, new UserConfig(
+              Scope.resource(resourceId)), idealState.getBucketSize(),
+              idealState.getBatchMessageMode());
       resourceMap.put(resourceId, resource.getConfig());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
index 45507a1..ec6c525 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
@@ -33,6 +33,7 @@ import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.State;
 import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
@@ -78,9 +79,10 @@ public class TestResourceComputationStage extends BaseStageTest {
     AssertJUnit.assertEquals(resource.values().iterator().next().getId(),
         ResourceId.from(resourceName));
     AssertJUnit.assertEquals(resource.values().iterator().next().getRebalancerConfig()
-        .getStateModelDefId(), idealState.getStateModelDefId());
-    AssertJUnit.assertEquals(resource.values().iterator().next().getPartitionSet().size(),
-        partitions);
+        .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
+        idealState.getStateModelDefId());
+    AssertJUnit
+        .assertEquals(resource.values().iterator().next().getSubUnitSet().size(), partitions);
   }
 
   @Test
@@ -105,8 +107,9 @@ public class TestResourceComputationStage extends BaseStageTest {
       AssertJUnit.assertTrue(resourceMap.containsKey(resourceId));
       AssertJUnit.assertEquals(resourceMap.get(resourceId).getId(), resourceId);
       AssertJUnit.assertEquals(resourceMap.get(resourceId).getRebalancerConfig()
-          .getStateModelDefId(), idealState.getStateModelDefId());
-      AssertJUnit.assertEquals(resourceMap.get(resourceId).getPartitionSet().size(),
+          .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
+          idealState.getStateModelDefId());
+      AssertJUnit.assertEquals(resourceMap.get(resourceId).getSubUnitSet().size(),
           idealState.getNumPartitions());
     }
   }
@@ -179,8 +182,9 @@ public class TestResourceComputationStage extends BaseStageTest {
       AssertJUnit.assertTrue(resourceMap.containsKey(resourceId));
       AssertJUnit.assertEquals(resourceMap.get(resourceId).getId(), resourceId);
       AssertJUnit.assertEquals(resourceMap.get(resourceId).getRebalancerConfig()
-          .getStateModelDefId(), idealState.getStateModelDefId());
-      AssertJUnit.assertEquals(resourceMap.get(resourceId).getPartitionSet().size(),
+          .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
+          idealState.getStateModelDefId());
+      AssertJUnit.assertEquals(resourceMap.get(resourceId).getSubUnitSet().size(),
           idealState.getNumPartitions());
     }
     // Test the data derived from CurrentState
@@ -188,14 +192,15 @@ public class TestResourceComputationStage extends BaseStageTest {
     AssertJUnit.assertTrue(resourceMap.containsKey(oldResourceId));
     AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getId(), oldResourceId);
     AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getRebalancerConfig()
-        .getStateModelDefId(), currentState.getStateModelDefId());
-    AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getPartitionSet().size(), currentState
+        .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
+        currentState.getStateModelDefId());
+    AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getSubUnitSet().size(), currentState
         .getPartitionStateMap().size());
-    AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getPartition(
+    AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getSubUnit(
         PartitionId.from("testResourceOld_0")));
-    AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getPartition(
+    AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getSubUnit(
         PartitionId.from("testResourceOld_1")));
-    AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getPartition(
+    AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getSubUnit(
         PartitionId.from("testResourceOld_2")));
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index 4f7b814..90c53d6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -25,14 +25,16 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.State;
-import org.apache.helix.api.UserDefinedRebalancerConfig;
-import org.apache.helix.controller.rebalancer.NewUserDefinedRebalancer;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.Rebalancer;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -52,24 +54,27 @@ import org.testng.annotations.Test;
 public class TestCustomizedIdealStateRebalancer extends
     ZkStandAloneCMTestBaseWithPropertyServerCheck {
   String db2 = TEST_DB + "2";
+  static boolean testRebalancerCreated = false;
   static boolean testRebalancerInvoked = false;
 
-  public static class TestRebalancer implements NewUserDefinedRebalancer {
+  public static class TestRebalancer implements Rebalancer {
 
     /**
      * Very basic mapping that evenly assigns one replica of each partition to live nodes, each of
      * which is in the highest-priority state.
      */
     @Override
-    public ResourceAssignment computeResourceMapping(UserDefinedRebalancerConfig config,
-        Cluster cluster, ResourceCurrentState currentState) {
+    public ResourceAssignment computeResourceMapping(RebalancerConfig config, Cluster cluster,
+        ResourceCurrentState currentState) {
+      PartitionedRebalancerContext context =
+          config.getRebalancerContext(PartitionedRebalancerContext.class);
       StateModelDefinition stateModelDef =
-          cluster.getStateModelMap().get(config.getStateModelDefId());
+          cluster.getStateModelMap().get(context.getStateModelDefId());
       List<ParticipantId> liveParticipants =
           new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
-      ResourceAssignment resourceMapping = new ResourceAssignment(config.getResourceId());
+      ResourceAssignment resourceMapping = new ResourceAssignment(context.getResourceId());
       int i = 0;
-      for (PartitionId partitionId : config.getPartitionSet()) {
+      for (PartitionId partitionId : context.getPartitionSet()) {
         int nodeIndex = i % liveParticipants.size();
         Map<ParticipantId, State> replicaMap = new HashMap<ParticipantId, State>();
         replicaMap.put(liveParticipants.get(nodeIndex), stateModelDef.getStatesPriorityList()
@@ -80,6 +85,11 @@ public class TestCustomizedIdealStateRebalancer extends
       testRebalancerInvoked = true;
       return resourceMapping;
     }
+
+    @Override
+    public void init(HelixManager helixManager) {
+      testRebalancerCreated = true;
+    }
   }
 
   @Test
@@ -111,6 +121,7 @@ public class TestCustomizedIdealStateRebalancer extends
       Assert.assertEquals(is.getPreferenceList(partition).size(), 0);
       Assert.assertEquals(is.getParticipantStateMap(partition).size(), 0);
     }
+    Assert.assertTrue(testRebalancerCreated);
     Assert.assertTrue(testRebalancerInvoked);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
index 7be5ab1..ed66ae2 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
@@ -9,7 +9,6 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.api.ClusterAccessor;
 import org.apache.helix.api.ClusterConfig;
 import org.apache.helix.api.ClusterId;
-import org.apache.helix.api.FullAutoRebalancerConfig;
 import org.apache.helix.api.ParticipantConfig;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.Partition;
@@ -20,6 +19,7 @@ import org.apache.helix.api.Scope;
 import org.apache.helix.api.State;
 import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.api.UserConfig;
+import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -118,21 +118,18 @@ public class NewModelExample {
     // identify the resource
     ResourceId resourceId = ResourceId.from("exampleResource");
 
-    // create a partition with no user-defined configuration
+    // create a partition
     Partition partition1 = new Partition(PartitionId.from("partition1"));
 
-    // create a partition with (optional) user-defined configuration
-    PartitionId partition2Id = PartitionId.from("partition2");
-    UserConfig partition2Config = new UserConfig(Scope.partition(partition2Id));
-    partition2Config.setSimpleField("sampleString", "partition config");
-    Partition partition2 = new Partition(partition2Id, partition2Config);
+    // create a second partition
+    Partition partition2 = new Partition(PartitionId.from("partition2"));
 
     // specify the rebalancer configuration
     // this resource will be rebalanced in FULL_AUTO mode, so use the FullAutoRebalancerConfig
     // builder
-    FullAutoRebalancerConfig.Builder rebalanceConfigBuilder =
-        new FullAutoRebalancerConfig.Builder(resourceId).replicaCount(3).addPartition(partition1)
-            .addPartition(partition2).stateModelDef(stateModelDef.getStateModelDefId());
+    FullAutoRebalancerContext.Builder rebalanceContextBuilder =
+        new FullAutoRebalancerContext.Builder(resourceId).replicaCount(1).addPartition(partition1)
+            .addPartition(partition2).stateModelDefId(stateModelDef.getStateModelDefId());
 
     // create (optional) user-defined configuration properties for the resource
     UserConfig userConfig = new UserConfig(Scope.resource(resourceId));
@@ -140,7 +137,7 @@ public class NewModelExample {
 
     // create the configuration for a new resource
     ResourceConfig.Builder resourceBuilder =
-        new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalanceConfigBuilder.build())
+        new ResourceConfig.Builder(resourceId).rebalancerContext(rebalanceContextBuilder.build())
             .userConfig(userConfig);
     return resourceBuilder.build();
   }