You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/12/04 02:21:04 UTC

[1/4] [HELIX-327] Simplify rebalancer, rename rebalancer configs, support settable contexts, rb=15981

Updated Branches:
  refs/heads/master 7521c0cb5 -> 015e7dda2


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java
index ff98cd2..3693c2b 100644
--- a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java
+++ b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java
@@ -30,9 +30,10 @@ import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.context.ControllerContextProvider;
 import org.apache.helix.controller.rebalancer.HelixRebalancer;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
@@ -42,7 +43,7 @@ public class LockManagerRebalancer implements HelixRebalancer {
   private static final Logger LOG = Logger.getLogger(LockManagerRebalancer.class);
 
   @Override
-  public void init(HelixManager manager) {
+  public void init(HelixManager manager, ControllerContextProvider contextProvider) {
     // do nothing; this rebalancer is independent of the manager
   }
 
@@ -54,20 +55,19 @@ public class LockManagerRebalancer implements HelixRebalancer {
    */
   @Override
   public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      Cluster cluster, ResourceCurrentState currentState) {
+      ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
     // get a typed context
-    PartitionedRebalancerContext context =
-        rebalancerConfig.getRebalancerContext(PartitionedRebalancerContext.class);
+    PartitionedRebalancerConfig config = PartitionedRebalancerConfig.from(rebalancerConfig);
 
     // Initialize an empty mapping of locks to participants
-    ResourceAssignment assignment = new ResourceAssignment(context.getResourceId());
+    ResourceAssignment assignment = new ResourceAssignment(config.getResourceId());
 
     // Get the list of live participants in the cluster
     List<ParticipantId> liveParticipants =
         new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
 
     // Get the state model (should be a simple lock/unlock model) and the highest-priority state
-    StateModelDefId stateModelDefId = context.getStateModelDefId();
+    StateModelDefId stateModelDefId = config.getStateModelDefId();
     StateModelDefinition stateModelDef = cluster.getStateModelMap().get(stateModelDefId);
     if (stateModelDef.getStatesPriorityList().size() < 1) {
       LOG.error("Invalid state model definition. There should be at least one state.");
@@ -93,7 +93,7 @@ public class LockManagerRebalancer implements HelixRebalancer {
     // This assumes a simple lock-unlock model where the only state of interest is which nodes have
     // acquired each lock.
     int i = 0;
-    for (PartitionId partition : context.getPartitionSet()) {
+    for (PartitionId partition : config.getPartitionSet()) {
       Map<ParticipantId, State> replicaMap = new HashMap<ParticipantId, State>();
       for (int j = i; j < i + lockHolders; j++) {
         int participantIndex = j % liveParticipants.size();


[4/4] git commit: [HELIX-327] Simplify rebalancer, rename rebalancer configs, support settable contexts, rb=15981

Posted by ka...@apache.org.
[HELIX-327] Simplify rebalancer, rename rebalancer configs, support settable contexts, rb=15981


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/015e7dda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/015e7dda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/015e7dda

Branch: refs/heads/master
Commit: 015e7dda2c1f96beb6d387ba71c118d246840f1b
Parents: 7521c0c
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Dec 3 16:52:23 2013 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Dec 3 17:20:31 2013 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/helix/PropertyKey.java |  18 +
 .../org/apache/helix/PropertyPathConfig.java    |   5 +
 .../java/org/apache/helix/PropertyType.java     |   3 +-
 .../main/java/org/apache/helix/api/Cluster.java |  20 +-
 .../java/org/apache/helix/api/Resource.java     |   9 +-
 .../api/accessor/AtomicResourceAccessor.java    |   6 +-
 .../helix/api/accessor/ClusterAccessor.java     |  52 ++-
 .../helix/api/accessor/ParticipantAccessor.java |  23 +-
 .../helix/api/accessor/ResourceAccessor.java    | 138 ++++---
 .../apache/helix/api/config/ResourceConfig.java |  32 +-
 .../java/org/apache/helix/api/id/ContextId.java |  57 +++
 .../controller/GenericHelixController.java      |   8 +-
 .../context/BasicControllerContext.java         |  60 +++
 .../controller/context/ControllerContext.java   |  40 ++
 .../context/ControllerContextHolder.java        | 156 +++++++
 .../context/ControllerContextProvider.java      | 120 ++++++
 .../controller/rebalancer/CustomRebalancer.java |  14 +-
 .../rebalancer/FallbackRebalancer.java          |  20 +-
 .../rebalancer/FullAutoRebalancer.java          |  16 +-
 .../controller/rebalancer/HelixRebalancer.java  |  20 +-
 .../rebalancer/SemiAutoRebalancer.java          |  18 +-
 .../config/BasicRebalancerConfig.java           | 256 ++++++++++++
 .../config/CustomRebalancerConfig.java          | 164 ++++++++
 .../config/FullAutoRebalancerConfig.java        |  64 +++
 .../config/PartitionedRebalancerConfig.java     | 405 +++++++++++++++++++
 .../rebalancer/config/RebalancerConfig.java     |  95 +++++
 .../config/RebalancerConfigHolder.java          | 185 +++++++++
 .../config/ReplicatedRebalancerConfig.java      |  40 ++
 .../config/SemiAutoRebalancerConfig.java        | 178 ++++++++
 .../context/BasicRebalancerContext.java         | 240 -----------
 .../rebalancer/context/ContextSerializer.java   |  37 --
 .../context/CustomRebalancerContext.java        | 164 --------
 .../context/DefaultContextSerializer.java       |  83 ----
 .../context/FullAutoRebalancerContext.java      |  64 ---
 .../context/PartitionedRebalancerContext.java   | 393 ------------------
 .../rebalancer/context/RebalancerConfig.java    | 182 ---------
 .../rebalancer/context/RebalancerContext.java   |  94 -----
 .../context/ReplicatedRebalancerContext.java    |  40 --
 .../context/SemiAutoRebalancerContext.java      | 178 --------
 .../serializer/DefaultStringSerializer.java     |  82 ++++
 .../controller/serializer/StringSerializer.java |  37 ++
 .../helix/controller/stages/AttributeName.java  |   3 +-
 .../stages/BestPossibleStateCalcStage.java      |  26 +-
 .../stages/ExternalViewComputeStage.java        |  10 +-
 .../stages/MessageGenerationStage.java          |  16 +-
 .../stages/MessageSelectionStage.java           |  21 +-
 .../stages/PersistAssignmentStage.java          |  14 +-
 .../controller/stages/PersistContextStage.java  |  59 +++
 .../controller/stages/ReadClusterDataStage.java |  17 +
 .../stages/ResourceComputationStage.java        |  25 +-
 .../helix/model/ResourceConfiguration.java      |  14 +-
 .../org/apache/helix/tools/NewClusterSetup.java |  66 +--
 .../org/apache/helix/api/TestNewStages.java     |  22 +-
 .../org/apache/helix/api/TestUpdateConfig.java  |  27 +-
 .../context/TestSerializeRebalancerContext.java |  20 +-
 .../helix/controller/stages/BaseStageTest.java  |   6 +-
 .../stages/TestResourceComputationStage.java    |  13 +-
 .../TestCustomizedIdealStateRebalancer.java     |  35 +-
 .../helix/integration/TestHelixConnection.java  |  10 +-
 .../helix/examples/LogicalModelExample.java     |  10 +-
 .../LockManagerRebalancer.java                  |  18 +-
 61 files changed, 2450 insertions(+), 1798 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index ebd7a35..2af63a5 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -24,6 +24,7 @@ import static org.apache.helix.PropertyType.ALERT_HISTORY;
 import static org.apache.helix.PropertyType.ALERT_STATUS;
 import static org.apache.helix.PropertyType.CLUSTER;
 import static org.apache.helix.PropertyType.CONFIGS;
+import static org.apache.helix.PropertyType.CONTEXT;
 import static org.apache.helix.PropertyType.CONTROLLER;
 import static org.apache.helix.PropertyType.CURRENTSTATES;
 import static org.apache.helix.PropertyType.ERRORS;
@@ -46,6 +47,7 @@ import static org.apache.helix.PropertyType.STATUSUPDATES_CONTROLLER;
 
 import java.util.Arrays;
 
+import org.apache.helix.controller.context.ControllerContextHolder;
 import org.apache.helix.model.AlertHistory;
 import org.apache.helix.model.AlertStatus;
 import org.apache.helix.model.Alerts;
@@ -721,6 +723,22 @@ public class PropertyKey {
     public PropertyKey propertyStore() {
       return new PropertyKey(PROPERTYSTORE, null, _clusterName);
     }
+
+    /**
+     * Get a propertykey associated with the root of the Helix contexts
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey controllerContexts() {
+      return new PropertyKey(CONTEXT, ControllerContextHolder.class, _clusterName);
+    }
+
+    /**
+     * Get a propertykey associated with the root of the Helix contexts
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey controllerContext(String contextId) {
+      return new PropertyKey(CONTEXT, ControllerContextHolder.class, _clusterName, contextId);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java b/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
index 60a92f4..d66e7d9 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
@@ -22,6 +22,7 @@ package org.apache.helix;
 import static org.apache.helix.PropertyType.ALERTS;
 import static org.apache.helix.PropertyType.ALERT_STATUS;
 import static org.apache.helix.PropertyType.CONFIGS;
+import static org.apache.helix.PropertyType.CONTEXT;
 import static org.apache.helix.PropertyType.CURRENTSTATES;
 import static org.apache.helix.PropertyType.EXTERNALVIEW;
 import static org.apache.helix.PropertyType.HEALTHREPORT;
@@ -40,6 +41,7 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.helix.controller.context.ControllerContextHolder;
 import org.apache.helix.model.AlertStatus;
 import org.apache.helix.model.Alerts;
 import org.apache.helix.model.CurrentState;
@@ -81,6 +83,7 @@ public class PropertyPathConfig {
     typeToClassMapping.put(ALERT_STATUS, AlertStatus.class);
     typeToClassMapping.put(PAUSE, PauseSignal.class);
     typeToClassMapping.put(RESOURCEASSIGNMENTS, ResourceAssignment.class);
+    typeToClassMapping.put(CONTEXT, ControllerContextHolder.class);
 
     // @formatter:off
     addEntry(PropertyType.CLUSTER, 1, "/{clusterName}");
@@ -148,6 +151,8 @@ public class PropertyPathConfig {
     addEntry(PropertyType.ALERTS, 1, "/{clusterName}/CONTROLLER/ALERTS");
     addEntry(PropertyType.ALERT_STATUS, 1, "/{clusterName}/CONTROLLER/ALERT_STATUS");
     addEntry(PropertyType.ALERT_HISTORY, 1, "/{clusterName}/CONTROLLER/ALERT_HISTORY");
+    addEntry(PropertyType.CONTEXT, 1, "/{clusterName}/CONTROLLER/CONTEXT");
+    addEntry(PropertyType.CONTEXT, 2, "/{clusterName}/CONTROLLER/CONTEXT/{contextId}");
     // @formatter:on
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/PropertyType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyType.java b/helix-core/src/main/java/org/apache/helix/PropertyType.java
index 680dc06..75adb20 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyType.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyType.java
@@ -64,7 +64,8 @@ public enum PropertyType {
   PERSISTENTSTATS(Type.CONTROLLER, true, false, false, false),
   ALERTS(Type.CONTROLLER, true, false, false, false),
   ALERT_STATUS(Type.CONTROLLER, true, false, false, false),
-  ALERT_HISTORY(Type.CONTROLLER, true, false, false, false);
+  ALERT_HISTORY(Type.CONTROLLER, true, false, false, false),
+  CONTEXT(Type.CONTROLLER, true, false);
 
   // @formatter:on
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index fdeb879..98072d1 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -27,11 +27,13 @@ import org.apache.helix.api.config.ParticipantConfig;
 import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ContextId;
 import org.apache.helix.api.id.ControllerId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SpectatorId;
 import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.context.ControllerContext;
 import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -73,6 +75,8 @@ public class Cluster {
    */
   private final Map<SpectatorId, Spectator> _spectatorMap;
 
+  private final Map<ContextId, ControllerContext> _contextMap;
+
   private final ControllerId _leaderId;
 
   private final ClusterConfig _config;
@@ -86,6 +90,7 @@ public class Cluster {
    * @param leaderId
    * @param constraintMap
    * @param stateModelMap
+   * @param contextMap
    * @param stats
    * @param alerts
    * @param userConfig
@@ -95,8 +100,9 @@ public class Cluster {
   public Cluster(ClusterId id, Map<ResourceId, Resource> resourceMap,
       Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
       ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap,
-      Map<StateModelDefId, StateModelDefinition> stateModelMap, PersistentStats stats,
-      Alerts alerts, UserConfig userConfig, boolean isPaused, boolean autoJoinAllowed) {
+      Map<StateModelDefId, StateModelDefinition> stateModelMap,
+      Map<ContextId, ControllerContext> contextMap, PersistentStats stats, Alerts alerts,
+      UserConfig userConfig, boolean isPaused, boolean autoJoinAllowed) {
 
     // build the config
     // Guava's transform and "copy" functions really return views so the maps all reflect each other
@@ -137,6 +143,8 @@ public class Cluster {
 
     _leaderId = leaderId;
 
+    _contextMap = ImmutableMap.copyOf(contextMap);
+
     // TODO impl this when we persist controllers and spectators on zookeeper
     _controllerMap = ImmutableMap.copyOf(controllerMap);
     _spectatorMap = Collections.emptyMap();
@@ -224,6 +232,14 @@ public class Cluster {
   }
 
   /**
+   * Get all the controller context currently on the cluster
+   * @return map of context id to controller context objects
+   */
+  public Map<ContextId, ControllerContext> getContextMap() {
+    return _contextMap;
+  }
+
+  /**
    * Get all the persisted stats for the cluster
    * @return PersistentStats instance
    */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 79a1e09..0726510 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -30,8 +30,7 @@ import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
@@ -52,18 +51,16 @@ public class Resource {
    * @param idealState ideal state of the resource
    * @param externalView external view of the resource
    * @param resourceAssignment current resource assignment of the cluster
-   * @param rebalancerContext contextual parameters that the rebalancer should be aware of
+   * @param rebalancerConfig parameters that the rebalancer should be aware of
    * @param userConfig any resource user-defined configuration
    * @param bucketSize the bucket size to use for physically saved state
    * @param batchMessageMode true if batch messaging allowed, false otherwise
    */
   public Resource(ResourceId id, ResourceType type, IdealState idealState,
       ResourceAssignment resourceAssignment, ExternalView externalView,
-      RebalancerContext rebalancerContext, UserConfig userConfig, int bucketSize,
+      RebalancerConfig rebalancerConfig, UserConfig userConfig, int bucketSize,
       boolean batchMessageMode) {
     SchedulerTaskConfig schedulerTaskConfig = schedulerTaskConfig(idealState);
-    RebalancerConfig rebalancerConfig = new RebalancerConfig(rebalancerContext);
-
     _config =
         new ResourceConfig(id, type, schedulerTaskConfig, rebalancerConfig, userConfig, bucketSize,
             batchMessageMode);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
index 6d69981..cda83d8 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
@@ -6,7 +6,7 @@ import org.apache.helix.api.Scope;
 import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.lock.HelixLock;
 import org.apache.helix.lock.HelixLockable;
 import org.apache.log4j.Logger;
@@ -96,12 +96,12 @@ public class AtomicResourceAccessor extends ResourceAccessor {
   }
 
   @Override
-  public boolean setRebalancerContext(ResourceId resourceId, RebalancerContext context) {
+  public boolean setRebalancerConfig(ResourceId resourceId, RebalancerConfig config) {
     HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
     boolean locked = lock.lock();
     if (locked) {
       try {
-        return _resourceAccessor.setRebalancerContext(resourceId, context);
+        return _resourceAccessor.setRebalancerConfig(resourceId, config);
       } finally {
         lock.unlock();
       }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index b01a3ec..36c7aaa 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -45,14 +45,18 @@ import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ConstraintId;
+import org.apache.helix.api.id.ContextId;
 import org.apache.helix.api.id.ControllerId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SessionId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.context.ControllerContext;
+import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
 import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConfiguration;
 import org.apache.helix.model.ClusterConstraints;
@@ -60,6 +64,7 @@ import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
@@ -266,9 +271,13 @@ public class ClusterAccessor {
     // read the alerts
     Alerts alerts = _accessor.getProperty(_keyBuilder.alerts());
 
+    // read controller context
+    Map<ContextId, ControllerContext> contextMap = readControllerContext();
+
     // create the cluster snapshot object
     return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
-        clusterConstraintMap, stateModelMap, stats, alerts, userConfig, isPaused, autoJoinAllowed);
+        clusterConstraintMap, stateModelMap, contextMap, stats, alerts, userConfig, isPaused,
+        autoJoinAllowed);
   }
 
   /**
@@ -459,6 +468,20 @@ public class ClusterAccessor {
   }
 
   /**
+   * Read the persisted controller contexts
+   * @return map of context id to controller context
+   */
+  public Map<ContextId, ControllerContext> readControllerContext() {
+    Map<String, ControllerContextHolder> contextHolders =
+        _accessor.getChildValuesMap(_keyBuilder.controllerContexts());
+    Map<ContextId, ControllerContext> contexts = Maps.newHashMap();
+    for (String contextName : contextHolders.keySet()) {
+      contexts.put(ContextId.from(contextName), contextHolders.get(contextName).getContext());
+    }
+    return contexts;
+  }
+
+  /**
    * Add a statistic specification to the cluster. Existing stat specifications will not be
    * overwritten
    * @param statName string representing a stat specification
@@ -623,7 +646,7 @@ public class ClusterAccessor {
    */
   public boolean addResourceToCluster(ResourceConfig resource) {
     if (resource == null || resource.getRebalancerConfig() == null) {
-      LOG.error("Resource not fully defined with a rebalancer context");
+      LOG.error("Resource not fully defined with a rebalancer config");
       return false;
     }
 
@@ -631,9 +654,8 @@ public class ClusterAccessor {
       LOG.error("Cluster: " + _clusterId + " structure is not valid");
       return false;
     }
-    RebalancerContext context =
-        resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
-    StateModelDefId stateModelDefId = context.getStateModelDefId();
+    RebalancerConfig config = resource.getRebalancerConfig();
+    StateModelDefId stateModelDefId = config.getStateModelDefId();
     if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
       LOG.error("State model: " + stateModelDefId + " not found in cluster: " + _clusterId);
       return false;
@@ -656,17 +678,21 @@ public class ClusterAccessor {
       ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
       configuration.setType(resource.getType());
       configuration.addNamespacedConfig(resource.getUserConfig());
-      configuration.addNamespacedConfig(resource.getRebalancerConfig().toNamespacedConfig());
-      configuration.setBucketSize(resource.getBucketSize());
-      configuration.setBatchMessageMode(resource.getBatchMessageMode());
+      PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
+      if (partitionedConfig == null
+          || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+        // only persist if this is not easily convertible to an ideal state
+        configuration
+            .addNamespacedConfig(new RebalancerConfigHolder(resource.getRebalancerConfig())
+                .toNamespacedConfig());
+      }
       _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
     }
 
     // Create an IdealState from a RebalancerConfig (if the resource is partitioned)
-    RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
     IdealState idealState =
-        ResourceAccessor.rebalancerConfigToIdealState(rebalancerConfig, resource.getBucketSize(),
-            resource.getBatchMessageMode());
+        ResourceAccessor.rebalancerConfigToIdealState(resource.getRebalancerConfig(),
+            resource.getBucketSize(), resource.getBatchMessageMode());
     if (idealState != null) {
       _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index 2721d91..c3deafe 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -51,8 +51,8 @@ import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SessionId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -315,16 +315,15 @@ public class ParticipantAccessor {
       return false;
     }
 
-    // need the rebalancer context for the resource
-    RebalancerContext context =
-        resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
-    if (context == null) {
-      LOG.error("Rebalancer context for resource does not exist");
+    // need the rebalancer config for the resource
+    RebalancerConfig config = resource.getRebalancerConfig();
+    if (config == null) {
+      LOG.error("Rebalancer config for resource does not exist");
       return false;
     }
 
     // ensure that all partitions to reset exist
-    Set<PartitionId> partitionSet = ImmutableSet.copyOf(context.getSubUnitIdSet());
+    Set<PartitionId> partitionSet = ImmutableSet.copyOf(config.getSubUnitIdSet());
     if (!partitionSet.containsAll(resetPartitionIdSet)) {
       LOG.error("Not all of the specified partitions to reset exist for the resource");
       return false;
@@ -368,7 +367,7 @@ public class ParticipantAccessor {
     }
 
     // build messages to signal the transition
-    StateModelDefId stateModelDefId = context.getStateModelDefId();
+    StateModelDefId stateModelDefId = config.getStateModelDefId();
     StateModelDefinition stateModelDef =
         _accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify()));
     Map<MessageId, Message> messageMap = Maps.newHashMap();
@@ -385,7 +384,7 @@ public class ParticipantAccessor {
       message.setStateModelDef(stateModelDefId);
       message.setFromState(State.from(HelixDefinedState.ERROR.toString()));
       message.setToState(stateModelDef.getTypedInitialState());
-      message.setStateModelFactoryId(context.getStateModelFactoryId());
+      message.setStateModelFactoryId(config.getStateModelFactoryId());
 
       messageMap.put(message.getMessageId(), message);
     }
@@ -666,8 +665,8 @@ public class ParticipantAccessor {
     for (String resourceName : idealStateMap.keySet()) {
       IdealState idealState = idealStateMap.get(resourceName);
       swapParticipantsInIdealState(idealState, oldParticipantId, newParticipantId);
-      PartitionedRebalancerContext context = PartitionedRebalancerContext.from(idealState);
-      resourceAccessor.setRebalancerContext(ResourceId.from(resourceName), context);
+      PartitionedRebalancerConfig config = PartitionedRebalancerConfig.from(idealState);
+      resourceAccessor.setRebalancerConfig(ResourceId.from(resourceName), config);
       _accessor.setProperty(_keyBuilder.idealStates(resourceName), idealState);
     }
     return true;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index e5ac57c..b308b98 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -38,11 +38,12 @@ import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
@@ -131,11 +132,11 @@ public class ResourceAccessor {
    * @param configuration
    * @return true if set, false otherwise
    */
-  private boolean setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) {
+  private boolean setConfiguration(ResourceId resourceId, ResourceConfiguration configuration,
+      RebalancerConfig rebalancerConfig) {
     boolean status =
         _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
-    // also set an ideal state if the resource supports it
-    RebalancerConfig rebalancerConfig = new RebalancerConfig(configuration);
+    // set an ideal state if the resource supports it
     IdealState idealState =
         rebalancerConfigToIdealState(rebalancerConfig, configuration.getBucketSize(),
             configuration.getBatchMessageMode());
@@ -146,16 +147,20 @@ public class ResourceAccessor {
   }
 
   /**
-   * Set the context of the rebalancer. This includes all properties required for rebalancing this
+   * Set the config of the rebalancer. This includes all properties required for rebalancing this
    * resource
    * @param resourceId the resource to update
-   * @param context the new rebalancer context
-   * @return true if the context was set, false otherwise
+   * @param config the new rebalancer config
+   * @return true if the config was set, false otherwise
    */
-  public boolean setRebalancerContext(ResourceId resourceId, RebalancerContext context) {
-    RebalancerConfig config = new RebalancerConfig(context);
+  public boolean setRebalancerConfig(ResourceId resourceId, RebalancerConfig config) {
     ResourceConfiguration resourceConfig = new ResourceConfiguration(resourceId);
-    resourceConfig.addNamespacedConfig(config.toNamespacedConfig());
+    PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
+    if (partitionedConfig == null
+        || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+      // only persist if this is not easily convertible to an ideal state
+      resourceConfig.addNamespacedConfig(new RebalancerConfigHolder(config).toNamespacedConfig());
+    }
 
     // update the ideal state if applicable
     IdealState oldIdealState =
@@ -190,9 +195,13 @@ public class ResourceAccessor {
    * @return RebalancerConfig, or null
    */
   public RebalancerConfig readRebalancerConfig(ResourceId resourceId) {
+    IdealState idealState = _accessor.getProperty(_keyBuilder.idealStates(resourceId.stringify()));
+    if (idealState != null && idealState.getRebalanceMode() != RebalanceMode.USER_DEFINED) {
+      return PartitionedRebalancerConfig.from(idealState);
+    }
     ResourceConfiguration resourceConfig =
         _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
-    return resourceConfig != null ? RebalancerConfig.from(resourceConfig) : null;
+    return resourceConfig.getRebalancerConfig(RebalancerConfig.class);
   }
 
   /**
@@ -242,10 +251,17 @@ public class ResourceAccessor {
     ResourceId resourceId = resourceConfig.getId();
     ResourceConfiguration config = new ResourceConfiguration(resourceId);
     config.addNamespacedConfig(resourceConfig.getUserConfig());
-    config.addNamespacedConfig(resourceConfig.getRebalancerConfig().toNamespacedConfig());
+    PartitionedRebalancerConfig partitionedConfig =
+        PartitionedRebalancerConfig.from(resourceConfig.getRebalancerConfig());
+    if (partitionedConfig == null
+        || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+      // only persist if this is not easily convertible to an ideal state
+      config.addNamespacedConfig(new RebalancerConfigHolder(resourceConfig.getRebalancerConfig())
+          .toNamespacedConfig());
+    }
     config.setBucketSize(resourceConfig.getBucketSize());
     config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
-    setConfiguration(resourceId, config);
+    setConfiguration(resourceId, config, resourceConfig.getRebalancerConfig());
     return true;
   }
 
@@ -332,28 +348,28 @@ public class ResourceAccessor {
       String participantGroupTag) {
     Resource resource = readResource(resourceId);
     RebalancerConfig config = resource.getRebalancerConfig();
-    PartitionedRebalancerContext context =
-        config.getRebalancerContext(PartitionedRebalancerContext.class);
-    if (context == null) {
+    PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
+    if (partitionedConfig == null) {
       LOG.error("Only partitioned resource types are supported");
       return false;
     }
     if (replicaCount != -1) {
-      context.setReplicaCount(replicaCount);
+      partitionedConfig.setReplicaCount(replicaCount);
     }
     if (participantGroupTag != null) {
-      context.setParticipantGroupTag(participantGroupTag);
+      partitionedConfig.setParticipantGroupTag(participantGroupTag);
     }
     StateModelDefinition stateModelDef =
-        _accessor.getProperty(_keyBuilder.stateModelDef(context.getStateModelDefId().stringify()));
+        _accessor.getProperty(_keyBuilder.stateModelDef(partitionedConfig.getStateModelDefId()
+            .stringify()));
     List<InstanceConfig> participantConfigs =
         _accessor.getChildValues(_keyBuilder.instanceConfigs());
     Set<ParticipantId> participantSet = Sets.newHashSet();
     for (InstanceConfig participantConfig : participantConfigs) {
       participantSet.add(participantConfig.getParticipantId());
     }
-    context.generateDefaultConfiguration(stateModelDef, participantSet);
-    setRebalancerContext(resourceId, context);
+    partitionedConfig.generateDefaultConfiguration(stateModelDef, participantSet);
+    setRebalancerConfig(resourceId, partitionedConfig);
     return true;
   }
 
@@ -366,41 +382,40 @@ public class ResourceAccessor {
    */
   static IdealState rebalancerConfigToIdealState(RebalancerConfig config, int bucketSize,
       boolean batchMessageMode) {
-    PartitionedRebalancerContext partitionedContext =
-        config.getRebalancerContext(PartitionedRebalancerContext.class);
-    if (partitionedContext != null) {
-      IdealState idealState = new IdealState(partitionedContext.getResourceId());
-      idealState.setRebalanceMode(partitionedContext.getRebalanceMode());
-      idealState.setRebalancerRef(partitionedContext.getRebalancerRef());
+    PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
+    if (partitionedConfig != null) {
+      IdealState idealState = new IdealState(partitionedConfig.getResourceId());
+      idealState.setRebalanceMode(partitionedConfig.getRebalanceMode());
+      idealState.setRebalancerRef(partitionedConfig.getRebalancerRef());
       String replicas = null;
-      if (partitionedContext.anyLiveParticipant()) {
+      if (partitionedConfig.anyLiveParticipant()) {
         replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
       } else {
-        replicas = Integer.toString(partitionedContext.getReplicaCount());
+        replicas = Integer.toString(partitionedConfig.getReplicaCount());
       }
       idealState.setReplicas(replicas);
-      idealState.setNumPartitions(partitionedContext.getPartitionSet().size());
-      idealState.setInstanceGroupTag(partitionedContext.getParticipantGroupTag());
-      idealState.setMaxPartitionsPerInstance(partitionedContext.getMaxPartitionsPerParticipant());
-      idealState.setStateModelDefId(partitionedContext.getStateModelDefId());
-      idealState.setStateModelFactoryId(partitionedContext.getStateModelFactoryId());
+      idealState.setNumPartitions(partitionedConfig.getPartitionSet().size());
+      idealState.setInstanceGroupTag(partitionedConfig.getParticipantGroupTag());
+      idealState.setMaxPartitionsPerInstance(partitionedConfig.getMaxPartitionsPerParticipant());
+      idealState.setStateModelDefId(partitionedConfig.getStateModelDefId());
+      idealState.setStateModelFactoryId(partitionedConfig.getStateModelFactoryId());
       idealState.setBucketSize(bucketSize);
       idealState.setBatchMessageMode(batchMessageMode);
-      if (partitionedContext.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
-        SemiAutoRebalancerContext semiAutoContext =
-            config.getRebalancerContext(SemiAutoRebalancerContext.class);
-        for (PartitionId partitionId : semiAutoContext.getPartitionSet()) {
-          idealState.setPreferenceList(partitionId, semiAutoContext.getPreferenceList(partitionId));
+      if (partitionedConfig.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+        SemiAutoRebalancerConfig semiAutoConfig =
+            BasicRebalancerConfig.convert(config, SemiAutoRebalancerConfig.class);
+        for (PartitionId partitionId : semiAutoConfig.getPartitionSet()) {
+          idealState.setPreferenceList(partitionId, semiAutoConfig.getPreferenceList(partitionId));
         }
-      } else if (partitionedContext.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
-        CustomRebalancerContext customContext =
-            config.getRebalancerContext(CustomRebalancerContext.class);
-        for (PartitionId partitionId : customContext.getPartitionSet()) {
-          idealState.setParticipantStateMap(partitionId,
-              customContext.getPreferenceMap(partitionId));
+      } else if (partitionedConfig.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+        CustomRebalancerConfig customConfig =
+            BasicRebalancerConfig.convert(config, CustomRebalancerConfig.class);
+        for (PartitionId partitionId : customConfig.getPartitionSet()) {
+          idealState
+              .setParticipantStateMap(partitionId, customConfig.getPreferenceMap(partitionId));
         }
       } else {
-        for (PartitionId partitionId : partitionedContext.getPartitionSet()) {
+        for (PartitionId partitionId : partitionedConfig.getPartitionSet()) {
           List<ParticipantId> preferenceList = Collections.emptyList();
           idealState.setPreferenceList(partitionId, preferenceList);
           Map<ParticipantId, State> participantStateMap = Collections.emptyMap();
@@ -425,7 +440,7 @@ public class ResourceAccessor {
       ResourceConfiguration resourceConfiguration, IdealState idealState,
       ExternalView externalView, ResourceAssignment resourceAssignment) {
     UserConfig userConfig;
-    RebalancerContext rebalancerContext = null;
+    RebalancerConfig rebalancerConfig = null;
     ResourceType type = ResourceType.DATA;
     if (resourceConfiguration != null) {
       userConfig = resourceConfiguration.getUserConfig();
@@ -437,14 +452,14 @@ public class ResourceAccessor {
     boolean batchMessageMode = false;
     if (idealState != null) {
       if (resourceConfiguration != null
-          && idealState.getRebalanceMode() != RebalanceMode.USER_DEFINED) {
-        // prefer rebalancer context for non-user_defined data rebalancing
-        rebalancerContext =
-            resourceConfiguration.getRebalancerContext(PartitionedRebalancerContext.class);
+          && idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+        // prefer rebalancer config for user_defined data rebalancing
+        rebalancerConfig =
+            resourceConfiguration.getRebalancerConfig(PartitionedRebalancerConfig.class);
       }
-      if (rebalancerContext == null) {
+      if (rebalancerConfig == null) {
         // prefer ideal state for non-user_defined data rebalancing
-        rebalancerContext = PartitionedRebalancerContext.from(idealState);
+        rebalancerConfig = PartitionedRebalancerConfig.from(idealState);
       }
       bucketSize = idealState.getBucketSize();
       batchMessageMode = idealState.getBatchMessageMode();
@@ -452,14 +467,13 @@ public class ResourceAccessor {
     } else if (resourceConfiguration != null) {
       bucketSize = resourceConfiguration.getBucketSize();
       batchMessageMode = resourceConfiguration.getBatchMessageMode();
-      RebalancerConfig rebalancerConfig = new RebalancerConfig(resourceConfiguration);
-      rebalancerContext = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+      rebalancerConfig = resourceConfiguration.getRebalancerConfig(RebalancerConfig.class);
     }
-    if (rebalancerContext == null) {
-      rebalancerContext = new PartitionedRebalancerContext();
+    if (rebalancerConfig == null) {
+      rebalancerConfig = new PartitionedRebalancerConfig();
     }
     return new Resource(resourceId, type, idealState, resourceAssignment, externalView,
-        rebalancerContext, userConfig, bucketSize, batchMessageMode);
+        rebalancerConfig, userConfig, bucketSize, batchMessageMode);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
index 38d48ab..b148a49 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
@@ -7,8 +7,7 @@ import org.apache.helix.api.Partition;
 import org.apache.helix.api.Scope;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 
 import com.google.common.collect.Sets;
 
@@ -80,7 +79,7 @@ public class ResourceConfig {
    * @return map of subunit id to subunit or empty map if none
    */
   public Map<? extends PartitionId, ? extends Partition> getSubUnitMap() {
-    return _rebalancerConfig.getRebalancerContext(RebalancerContext.class).getSubUnitMap();
+    return _rebalancerConfig.getSubUnitMap();
   }
 
   /**
@@ -167,7 +166,7 @@ public class ResourceConfig {
   public static class Delta {
     private enum Fields {
       TYPE,
-      REBALANCER_CONTEXT,
+      REBALANCER_CONFIG,
       USER_CONFIG,
       BUCKET_SIZE,
       BATCH_MESSAGE_MODE
@@ -198,12 +197,12 @@ public class ResourceConfig {
 
     /**
      * Set the rebalancer configuration
-     * @param context properties of interest for rebalancing
+     * @param config properties of interest for rebalancing
      * @return Delta
      */
-    public Delta setRebalancerContext(RebalancerContext context) {
-      _builder.rebalancerContext(context);
-      _updateFields.add(Fields.REBALANCER_CONTEXT);
+    public Delta setRebalancerConfig(RebalancerConfig config) {
+      _builder.rebalancerConfig(config);
+      _updateFields.add(Fields.REBALANCER_CONFIG);
       return this;
     }
 
@@ -248,10 +247,8 @@ public class ResourceConfig {
     public ResourceConfig mergeInto(ResourceConfig orig) {
       ResourceConfig deltaConfig = _builder.build();
       Builder builder =
-          new Builder(orig.getId())
-              .type(orig.getType())
-              .rebalancerContext(
-                  orig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class))
+          new Builder(orig.getId()).type(orig.getType())
+              .rebalancerConfig(orig.getRebalancerConfig())
               .schedulerTaskConfig(orig.getSchedulerTaskConfig()).userConfig(orig.getUserConfig())
               .bucketSize(orig.getBucketSize()).batchMessageMode(orig.getBatchMessageMode());
       for (Fields field : _updateFields) {
@@ -259,9 +256,8 @@ public class ResourceConfig {
         case TYPE:
           builder.type(deltaConfig.getType());
           break;
-        case REBALANCER_CONTEXT:
-          builder.rebalancerContext(deltaConfig.getRebalancerConfig().getRebalancerContext(
-              RebalancerContext.class));
+        case REBALANCER_CONFIG:
+          builder.rebalancerConfig(deltaConfig.getRebalancerConfig());
           break;
         case USER_CONFIG:
           builder.userConfig(deltaConfig.getUserConfig());
@@ -314,11 +310,11 @@ public class ResourceConfig {
 
     /**
      * Set the rebalancer configuration
-     * @param rebalancerContext properties of interest for rebalancing
+     * @param rebalancerConfig properties of interest for rebalancing
      * @return Builder
      */
-    public Builder rebalancerContext(RebalancerContext rebalancerContext) {
-      _rebalancerConfig = new RebalancerConfig(rebalancerContext);
+    public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
+      _rebalancerConfig = rebalancerConfig;
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/id/ContextId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/id/ContextId.java b/helix-core/src/main/java/org/apache/helix/api/id/ContextId.java
new file mode 100644
index 0000000..29a72b7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/id/ContextId.java
@@ -0,0 +1,57 @@
+package org.apache.helix.api.id;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Identifies a context
+ */
+public class ContextId extends Id {
+  @JsonProperty("id")
+  final private String _id;
+
+  /**
+   * Create a context id
+   * @param id string representation of the id
+   */
+  @JsonCreator
+  public ContextId(@JsonProperty("id") String id) {
+    _id = id;
+  }
+
+  @Override
+  public String stringify() {
+    return _id;
+  }
+
+  /**
+   * Get a concrete context id for a string name
+   * @param contextId string context identifier
+   * @return ContextId
+   */
+  public static ContextId from(String contextId) {
+    if (contextId == null) {
+      return null;
+    }
+    return new ContextId(contextId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index eec745e..96be0fa 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -44,18 +44,19 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.PipelineRegistry;
-import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.CompatibilityCheckStage;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ExternalViewComputeStage;
 import org.apache.helix.controller.stages.MessageGenerationStage;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.PersistAssignmentStage;
+import org.apache.helix.controller.stages.PersistContextStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.controller.stages.TaskAssignmentStage;
-import org.apache.helix.controller.stages.PersistAssignmentStage;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HealthStat;
@@ -183,11 +184,12 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       rebalancePipeline.addStage(new ResourceComputationStage());
       rebalancePipeline.addStage(new CurrentStateComputationStage());
       rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-      rebalancePipeline.addStage(new PersistAssignmentStage());
       rebalancePipeline.addStage(new MessageGenerationStage());
       rebalancePipeline.addStage(new MessageSelectionStage());
       rebalancePipeline.addStage(new MessageThrottleStage());
       rebalancePipeline.addStage(new TaskAssignmentStage());
+      rebalancePipeline.addStage(new PersistAssignmentStage());
+      rebalancePipeline.addStage(new PersistContextStage());
 
       // external view generation
       Pipeline externalViewPipeline = new Pipeline();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/context/BasicControllerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/context/BasicControllerContext.java b/helix-core/src/main/java/org/apache/helix/controller/context/BasicControllerContext.java
new file mode 100644
index 0000000..2d98347
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/context/BasicControllerContext.java
@@ -0,0 +1,60 @@
+package org.apache.helix.controller.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.id.ContextId;
+import org.apache.helix.controller.serializer.DefaultStringSerializer;
+import org.apache.helix.controller.serializer.StringSerializer;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * A simple context that can be serialized by a {@link DefaultStringSerializer}
+ */
+public class BasicControllerContext implements ControllerContext {
+  private final ContextId _id;
+  private Class<? extends StringSerializer> _serializer;
+
+  /**
+   * Instantiate with an id
+   * @param id ContextId, unique among all contexts in this cluster
+   */
+  public BasicControllerContext(@JsonProperty("id") ContextId id) {
+    _id = id;
+    _serializer = DefaultStringSerializer.class;
+  }
+
+  /**
+   * Set the class that can serialize this object into a String, and back
+   * @param serializer StringSerializer implementation class
+   */
+  public void setSerializerClass(Class<? extends StringSerializer> serializer) {
+    _serializer = serializer;
+  }
+
+  @Override
+  public Class<? extends StringSerializer> getSerializerClass() {
+    return _serializer;
+  }
+
+  @Override
+  public ContextId getId() {
+    return _id;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContext.java b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContext.java
new file mode 100644
index 0000000..b1be5c2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContext.java
@@ -0,0 +1,40 @@
+package org.apache.helix.controller.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.id.ContextId;
+import org.apache.helix.controller.serializer.StringSerializer;
+
+/**
+ * Controller stages can implement this interface to set and get persistent state
+ */
+public interface ControllerContext {
+  /**
+   * Get the identifier of this context
+   * @return {@link ContextId}
+   */
+  ContextId getId();
+
+  /**
+   * Get the class that can be used to convert this object to and from a String
+   * @return {@link StringSerializer} implementation class
+   */
+  Class<? extends StringSerializer> getSerializerClass();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextHolder.java b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextHolder.java
new file mode 100644
index 0000000..8c1d03a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextHolder.java
@@ -0,0 +1,156 @@
+package org.apache.helix.controller.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.serializer.StringSerializer;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Wrap a {@link ControllerContext} so that it can be persisted in the backing store
+ */
+public class ControllerContextHolder extends HelixProperty {
+  private enum Fields {
+    SERIALIZER_CLASS,
+    CONTEXT,
+    CONTEXT_CLASS
+  }
+
+  private static final Logger LOG = Logger.getLogger(ControllerContextHolder.class);
+
+  private final ControllerContext _context;
+
+  /**
+   * Instantiate from a populated ControllerContext
+   * @param context instantiated context
+   */
+  public ControllerContextHolder(ControllerContext context) {
+    super(context.getId().toString());
+    _context = context;
+    StringSerializer serializer = instantiateSerializerFromContext(_context);
+    if (_context != null && serializer != null) {
+      _record.setSimpleField(Fields.SERIALIZER_CLASS.toString(), _context.getSerializerClass()
+          .getName());
+      _record.setSimpleField(Fields.CONTEXT_CLASS.toString(), _context.getClass().getName());
+      _record.setSimpleField(Fields.CONTEXT.toString(), serializer.serialize(_context));
+    }
+  }
+
+  /**
+   * Instantiate from a record
+   * @param record populated ZNRecord
+   */
+  public ControllerContextHolder(ZNRecord record) {
+    super(record);
+    StringSerializer serializer =
+        instantiateSerializerFromName(_record.getSimpleField(Fields.SERIALIZER_CLASS.toString()));
+    _context =
+        loadContext(serializer, _record.getSimpleField(Fields.CONTEXT_CLASS.toString()),
+            _record.getSimpleField(Fields.CONTEXT.toString()));
+  }
+
+  /**
+   * Get a ControllerContext
+   * @return instantiated {@link ControllerContext}
+   */
+  public ControllerContext getContext() {
+    return _context;
+  }
+
+  /**
+   * Get a ControllerContext as a specific subtyple
+   * @return instantiated typed {@link ControllerContext}, or null
+   */
+  public <T extends ControllerContext> T getContext(Class<T> contextClass) {
+    if (_context != null) {
+      try {
+        return contextClass.cast(_context);
+      } catch (ClassCastException e) {
+        LOG.warn(contextClass + " is incompatible with context class: " + _context.getClass());
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Instantiate a StringSerializer that can serialize the context
+   * @param context instantiated ControllerContext
+   * @return StringSerializer object, or null if it could not be instantiated
+   */
+  private StringSerializer instantiateSerializerFromContext(ControllerContext context) {
+    if (context == null) {
+      return null;
+    }
+    try {
+      return context.getSerializerClass().newInstance();
+    } catch (InstantiationException e) {
+      LOG.error("Serializer could not be instatiated", e);
+    } catch (IllegalAccessException e) {
+      LOG.error("Serializer default constructor not accessible", e);
+    }
+    return null;
+  }
+
+  /**
+   * Instantiate a StringSerializer from its class name
+   * @param serializerClassName name of a StringSerializer implementation class
+   * @return instantiated StringSerializer, or null if it could not be instantiated
+   */
+  private StringSerializer instantiateSerializerFromName(String serializerClassName) {
+    if (serializerClassName != null) {
+      try {
+        return (StringSerializer) HelixUtil.loadClass(getClass(), serializerClassName)
+            .newInstance();
+      } catch (InstantiationException e) {
+        LOG.error("Error getting the serializer", e);
+      } catch (IllegalAccessException e) {
+        LOG.error("Error getting the serializer", e);
+      } catch (ClassNotFoundException e) {
+        LOG.error("Error getting the serializer", e);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Deserialize a context
+   * @param serializer StringSerializer that can deserialize the context
+   * @param className the name of the context class
+   * @param contextData the raw context data as a String
+   * @return ControllerContext, or null if there was a conversion issue
+   */
+  private ControllerContext loadContext(StringSerializer serializer, String className,
+      String contextData) {
+    if (serializer != null && className != null && contextData != null) {
+      try {
+        Class<? extends ControllerContext> contextClass =
+            HelixUtil.loadClass(getClass(), className).asSubclass(ControllerContext.class);
+        return serializer.deserialize(contextClass, contextData);
+      } catch (ClassNotFoundException e) {
+        LOG.error("Could not convert the persisted data into a " + className, e);
+      } catch (ClassCastException e) {
+        LOG.error("Could not convert the persisted data into a " + className, e);
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextProvider.java b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextProvider.java
new file mode 100644
index 0000000..1541585
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextProvider.java
@@ -0,0 +1,120 @@
+package org.apache.helix.controller.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.api.id.ContextId;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+
+/**
+ * An interface for getting and setting {@link ControllerContext} objects, which will eventually
+ * be persisted and acessible across runs of the controller pipeline.
+ */
+public class ControllerContextProvider {
+  private static final Logger LOG = Logger.getLogger(ControllerContextProvider.class);
+
+  private Map<ContextId, ControllerContext> _persistedContexts;
+  private Map<ContextId, ControllerContext> _pendingContexts;
+
+  /**
+   * Instantiate with already-persisted controller contexts
+   * @param contexts
+   */
+  public ControllerContextProvider(Map<ContextId, ControllerContext> contexts) {
+    _persistedContexts = contexts != null ? contexts : new HashMap<ContextId, ControllerContext>();
+    _pendingContexts = Maps.newHashMap();
+  }
+
+  /**
+   * Get a base ControllerContext
+   * @param contextId the context id to look up
+   * @return a ControllerContext, or null if not found
+   */
+  public ControllerContext getControllerContext(ContextId contextId) {
+    return getControllerContext(contextId, ControllerContext.class);
+  }
+
+  /**
+   * Get a typed ControllerContext
+   * @param contextId the context id to look up
+   * @param contextClass the class which the context should be returned as
+   * @return a typed ControllerContext, or null if no context with given id is available for this
+   *         type
+   */
+  public <T extends ControllerContext> T getControllerContext(ContextId contextId,
+      Class<T> contextClass) {
+    try {
+      if (_pendingContexts.containsKey(contextId)) {
+        return contextClass.cast(_pendingContexts.get(contextId));
+      } else if (_persistedContexts.containsKey(contextId)) {
+        return contextClass.cast(_persistedContexts.get(contextId));
+      }
+    } catch (ClassCastException e) {
+      LOG.error("Could not convert context " + contextId + " into " + contextClass.getName());
+    }
+    return null;
+  }
+
+  /**
+   * Put a controller context, overwriting any existing ones
+   * @param contextId the id to set
+   * @param context the context object
+   */
+  public void putControllerContext(ContextId contextId, ControllerContext context) {
+    putControllerContext(contextId, context, true);
+  }
+
+  /**
+   * Put a controller context, specifying overwrite behavior
+   * @param contextId the id to set
+   * @param context the context object
+   * @param overwriteAllowed true if existing objects can be overwritten, false otherwise
+   * @return true if saved, false if an object with that id exists and overwrite is not allowed
+   */
+  public boolean putControllerContext(ContextId contextId, ControllerContext context,
+      boolean overwriteAllowed) {
+    if (overwriteAllowed || !exists(contextId)) {
+      _pendingContexts.put(contextId, context);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Check if a context exists
+   * @param contextId the id to look up
+   * @return true if a context exists with that id, false otherwise
+   */
+  public boolean exists(ContextId contextId) {
+    return _persistedContexts.containsKey(contextId) || _pendingContexts.containsKey(contextId);
+  }
+
+  /**
+   * Get all contexts that have been put, but not yet persisted
+   * @return a map of context id to context
+   */
+  public Map<ContextId, ControllerContext> getPendingContexts() {
+    return _pendingContexts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 5209e2c..6629bec 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -8,8 +8,10 @@ import org.apache.helix.api.Cluster;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.model.ResourceAssignment;
@@ -40,15 +42,15 @@ public class CustomRebalancer implements HelixRebalancer {
   private static final Logger LOG = Logger.getLogger(CustomRebalancer.class);
 
   @Override
-  public void init(HelixManager helixManager) {
+  public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
     // do nothing
   }
 
   @Override
   public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      Cluster cluster, ResourceCurrentState currentState) {
-    CustomRebalancerContext config =
-        rebalancerConfig.getRebalancerContext(CustomRebalancerContext.class);
+      ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+    CustomRebalancerConfig config =
+        BasicRebalancerConfig.convert(rebalancerConfig, CustomRebalancerConfig.class);
     StateModelDefinition stateModelDef =
         cluster.getStateModelMap().get(config.getStateModelDefId());
     if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
index fc4bfa0..3aa41d7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
@@ -30,8 +30,9 @@ import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -54,31 +55,30 @@ public class FallbackRebalancer implements HelixRebalancer {
   private HelixManager _helixManager;
 
   @Override
-  public void init(HelixManager helixManager) {
+  public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
     _helixManager = helixManager;
   }
 
   @Override
   public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      Cluster cluster, ResourceCurrentState currentState) {
+      ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
     // make sure the manager is not null
     if (_helixManager == null) {
       LOG.info("HelixManager is null!");
       return null;
     }
 
-    // get the context
-    PartitionedRebalancerContext context =
-        rebalancerConfig.getRebalancerContext(PartitionedRebalancerContext.class);
-    if (context == null) {
+    // get the config
+    PartitionedRebalancerConfig config = PartitionedRebalancerConfig.from(rebalancerConfig);
+    if (config == null) {
       LOG.info("Resource is not partitioned");
       return null;
     }
 
     // get the ideal state and rebalancer class
-    ResourceId resourceId = context.getResourceId();
+    ResourceId resourceId = config.getResourceId();
     StateModelDefinition stateModelDef =
-        cluster.getStateModelMap().get(context.getStateModelDefId());
+        cluster.getStateModelMap().get(config.getStateModelDefId());
     if (stateModelDef == null) {
       LOG.info("StateModelDefinition unavailable for " + resourceId);
       return null;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
index 6d7b0ef..0c55d45 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
@@ -16,8 +16,10 @@ import org.apache.helix.api.Participant;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
@@ -56,15 +58,15 @@ public class FullAutoRebalancer implements HelixRebalancer {
   private static Logger LOG = Logger.getLogger(FullAutoRebalancer.class);
 
   @Override
-  public void init(HelixManager helixManager) {
+  public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
     // do nothing
   }
 
   @Override
   public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      Cluster cluster, ResourceCurrentState currentState) {
-    FullAutoRebalancerContext config =
-        rebalancerConfig.getRebalancerContext(FullAutoRebalancerContext.class);
+      ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+    FullAutoRebalancerConfig config =
+        BasicRebalancerConfig.convert(rebalancerConfig, FullAutoRebalancerConfig.class);
     StateModelDefinition stateModelDef =
         cluster.getStateModelMap().get(config.getStateModelDefId());
     // Compute a preference list based on the current ideal state
@@ -181,7 +183,7 @@ public class FullAutoRebalancer implements HelixRebalancer {
   }
 
   private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
-      FullAutoRebalancerContext config, ResourceCurrentState currentStateOutput,
+      FullAutoRebalancerConfig config, ResourceCurrentState currentStateOutput,
       Map<State, Integer> stateCountMap) {
     Map<PartitionId, Map<ParticipantId, State>> map =
         new HashMap<PartitionId, Map<ParticipantId, State>>();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
index 7fcbba5..1fbb02f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
@@ -2,7 +2,8 @@ package org.apache.helix.controller.rebalancer;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.Cluster;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.model.ResourceAssignment;
 
@@ -32,10 +33,11 @@ import org.apache.helix.model.ResourceAssignment;
  */
 public interface HelixRebalancer {
   /**
-   * Initialize the rebalancer with a HelixManager if necessary
-   * @param manager
+   * Initialize the rebalancer with a HelixManager and ControllerContextProvider if necessary
+   * @param manager HelixManager instance
+   * @param contextProvider An object that supports getting and setting context across pipeline runs
    */
-  public void init(HelixManager helixManager);
+  public void init(HelixManager helixManager, ControllerContextProvider contextProvider);
 
   /**
    * Given an ideal state for a resource and liveness of participants, compute a assignment of
@@ -47,18 +49,20 @@ public interface HelixRebalancer {
    * Say that you have:<br/>
    * 
    * <pre>
-   * class MyRebalancerContext implements RebalancerContext
+   * class MyRebalancerConfig implements RebalancerConfig
    * </pre>
    * 
-   * as your rebalancer context. To extract it from a RebalancerConfig, do the following:<br/>
+   * as your rebalancer config. To get a typed version, you can do the following:<br/>
    * 
    * <pre>
-   * MyRebalancerContext context = rebalancerConfig.getRebalancerContext(MyRebalancerContext.class);
+   * MyRebalancerConfig config = BasicRebalancerConfig.convert(rebalancerConfig,
+   *     MyRebalancerConfig.class);
    * </pre>
    * @param rebalancerConfig the properties of the resource for which a mapping will be computed
+   * @param prevAssignment the previous ResourceAssignment of this cluster, or null if none
    * @param cluster complete snapshot of the cluster
    * @param currentState the current states of all partitions
    */
   public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      Cluster cluster, ResourceCurrentState currentState);
+      ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState);
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index a0ad6f3..51ca463 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -9,9 +9,10 @@ import org.apache.helix.api.Cluster;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.model.ResourceAssignment;
@@ -45,15 +46,15 @@ public class SemiAutoRebalancer implements HelixRebalancer {
   private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
 
   @Override
-  public void init(HelixManager helixManager) {
+  public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
     // do nothing
   }
 
   @Override
   public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      Cluster cluster, ResourceCurrentState currentState) {
-    SemiAutoRebalancerContext config =
-        rebalancerConfig.getRebalancerContext(SemiAutoRebalancerContext.class);
+      ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+    SemiAutoRebalancerConfig config =
+        BasicRebalancerConfig.convert(rebalancerConfig, SemiAutoRebalancerConfig.class);
     StateModelDefinition stateModelDef =
         cluster.getStateModelMap().get(config.getStateModelDefId());
     if (LOG.isDebugEnabled()) {
@@ -65,8 +66,7 @@ public class SemiAutoRebalancer implements HelixRebalancer {
       Map<ParticipantId, State> currentStateMap =
           currentState.getCurrentStateMap(config.getResourceId(), partition);
       Set<ParticipantId> disabledInstancesForPartition =
-          ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
-              partition);
+          ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(), partition);
       List<ParticipantId> preferenceList =
           ConstraintBasedAssignment.getPreferenceList(cluster, partition,
               config.getPreferenceList(partition));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java
new file mode 100644
index 0000000..d6aa10e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java
@@ -0,0 +1,256 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import java.util.Set;
+
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.serializer.DefaultStringSerializer;
+import org.apache.helix.controller.serializer.StringSerializer;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Abstract RebalancerConfig that functions for generic subunits. Use a subclass that more
+ * concretely defines the subunits.
+ */
+public abstract class BasicRebalancerConfig implements RebalancerConfig {
+  private ResourceId _resourceId;
+  private StateModelDefId _stateModelDefId;
+  private StateModelFactoryId _stateModelFactoryId;
+  private String _participantGroupTag;
+  private Class<? extends StringSerializer> _serializer;
+  private RebalancerRef _rebalancerRef;
+
+  /**
+   * Instantiate a basic rebalancer config
+   */
+  public BasicRebalancerConfig() {
+    _serializer = DefaultStringSerializer.class;
+  }
+
+  @Override
+  public ResourceId getResourceId() {
+    return _resourceId;
+  }
+
+  /**
+   * Set the resource to rebalance
+   * @param resourceId resource id
+   */
+  public void setResourceId(ResourceId resourceId) {
+    _resourceId = resourceId;
+  }
+
+  @Override
+  public StateModelDefId getStateModelDefId() {
+    return _stateModelDefId;
+  }
+
+  /**
+   * Set the state model definition that the resource follows
+   * @param stateModelDefId state model definition id
+   */
+  public void setStateModelDefId(StateModelDefId stateModelDefId) {
+    _stateModelDefId = stateModelDefId;
+  }
+
+  @Override
+  public StateModelFactoryId getStateModelFactoryId() {
+    return _stateModelFactoryId;
+  }
+
+  /**
+   * Set the state model factory that the resource uses
+   * @param stateModelFactoryId state model factory id
+   */
+  public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+    _stateModelFactoryId = stateModelFactoryId;
+  }
+
+  @Override
+  public String getParticipantGroupTag() {
+    return _participantGroupTag;
+  }
+
+  /**
+   * Set a tag that participants must have in order to serve this resource
+   * @param participantGroupTag string group tag
+   */
+  public void setParticipantGroupTag(String participantGroupTag) {
+    _participantGroupTag = participantGroupTag;
+  }
+
+  /**
+   * Get the serializer. If none is provided, {@link DefaultStringSerializer} is used
+   */
+  @Override
+  public Class<? extends StringSerializer> getSerializerClass() {
+    return _serializer;
+  }
+
+  /**
+   * Set the class that can serialize this config
+   * @param serializer serializer class that implements StringSerializer
+   */
+  public void setSerializerClass(Class<? extends StringSerializer> serializer) {
+    _serializer = serializer;
+  }
+
+  @Override
+  @JsonIgnore
+  public Set<? extends PartitionId> getSubUnitIdSet() {
+    return getSubUnitMap().keySet();
+  }
+
+  @Override
+  @JsonIgnore
+  public Partition getSubUnit(PartitionId subUnitId) {
+    return getSubUnitMap().get(subUnitId);
+  }
+
+  @Override
+  public RebalancerRef getRebalancerRef() {
+    return _rebalancerRef;
+  }
+
+  /**
+   * Set the reference to the class used to rebalance this resource
+   * @param rebalancerRef RebalancerRef instance
+   */
+  public void setRebalancerRef(RebalancerRef rebalancerRef) {
+    _rebalancerRef = rebalancerRef;
+  }
+
+  /**
+   * Safely cast a RebalancerConfig into a subtype
+   * @param config RebalancerConfig object
+   * @param clazz the target class
+   * @return An instance of clazz, or null if the conversion is not possible
+   */
+  public static <T extends RebalancerConfig> T convert(RebalancerConfig config, Class<T> clazz) {
+    try {
+      return clazz.cast(config);
+    } catch (ClassCastException e) {
+      return null;
+    }
+  }
+
+  /**
+   * Abstract builder for the base rebalancer config
+   */
+  public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
+    private final ResourceId _resourceId;
+    private StateModelDefId _stateModelDefId;
+    private StateModelFactoryId _stateModelFactoryId;
+    private String _participantGroupTag;
+    private Class<? extends StringSerializer> _serializerClass;
+    private RebalancerRef _rebalancerRef;
+
+    /**
+     * Instantiate with a resource id
+     * @param resourceId resource id
+     */
+    public AbstractBuilder(ResourceId resourceId) {
+      _resourceId = resourceId;
+      _serializerClass = DefaultStringSerializer.class;
+    }
+
+    /**
+     * Set the state model definition that the resource should follow
+     * @param stateModelDefId state model definition id
+     * @return Builder
+     */
+    public T stateModelDefId(StateModelDefId stateModelDefId) {
+      _stateModelDefId = stateModelDefId;
+      return self();
+    }
+
+    /**
+     * Set the state model factory that the resource should use
+     * @param stateModelFactoryId state model factory id
+     * @return Builder
+     */
+    public T stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+      _stateModelFactoryId = stateModelFactoryId;
+      return self();
+    }
+
+    /**
+     * Set the tag that all participants require in order to serve this resource
+     * @param participantGroupTag the tag
+     * @return Builder
+     */
+    public T participantGroupTag(String participantGroupTag) {
+      _participantGroupTag = participantGroupTag;
+      return self();
+    }
+
+    /**
+     * Set the serializer class for this rebalancer config
+     * @param serializerClass class that implements StringSerializer
+     * @return Builder
+     */
+    public T serializerClass(Class<? extends StringSerializer> serializerClass) {
+      _serializerClass = serializerClass;
+      return self();
+    }
+
+    /**
+     * Specify a custom class to use for rebalancing
+     * @param rebalancerRef RebalancerRef instance
+     * @return Builder
+     */
+    public T rebalancerRef(RebalancerRef rebalancerRef) {
+      _rebalancerRef = rebalancerRef;
+      return self();
+    }
+
+    /**
+     * Update an existing context with base fields
+     * @param config derived config
+     */
+    protected final void update(BasicRebalancerConfig config) {
+      config.setResourceId(_resourceId);
+      config.setStateModelDefId(_stateModelDefId);
+      config.setStateModelFactoryId(_stateModelFactoryId);
+      config.setParticipantGroupTag(_participantGroupTag);
+      config.setSerializerClass(_serializerClass);
+      config.setRebalancerRef(_rebalancerRef);
+    }
+
+    /**
+     * Get a typed reference to "this" class. Final derived classes should simply return the this
+     * reference.
+     * @return this for the most specific type
+     */
+    protected abstract T self();
+
+    /**
+     * Get the rebalancer config from the built fields
+     * @return RebalancerConfig
+     */
+    public abstract RebalancerConfig build();
+  }
+}


[2/4] [HELIX-327] Simplify rebalancer, rename rebalancer configs, support settable contexts, rb=15981

Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
deleted file mode 100644
index aa872c4..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
+++ /dev/null
@@ -1,182 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.config.NamespacedConfig;
-import org.apache.helix.controller.rebalancer.HelixRebalancer;
-import org.apache.helix.model.ResourceConfiguration;
-import org.apache.helix.util.HelixUtil;
-import org.apache.log4j.Logger;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Configuration for a resource rebalancer. This contains a RebalancerContext, which contains
- * information specific to each rebalancer.
- */
-public final class RebalancerConfig {
-  private enum Fields {
-    SERIALIZER_CLASS,
-    REBALANCER_CONTEXT,
-    REBALANCER_CONTEXT_CLASS
-  }
-
-  private static final Logger LOG = Logger.getLogger(RebalancerConfig.class);
-  private ContextSerializer _serializer;
-  private HelixRebalancer _rebalancer;
-  private final RebalancerContext _context;
-  private final NamespacedConfig _config;
-
-  /**
-   * Instantiate a RebalancerConfig
-   * @param context rebalancer context
-   * @param rebalancerRef reference to the rebalancer class that will be used
-   */
-  public RebalancerConfig(RebalancerContext context) {
-    _config =
-        new NamespacedConfig(Scope.resource(context.getResourceId()),
-            RebalancerConfig.class.getSimpleName());
-    _config.setSimpleField(Fields.SERIALIZER_CLASS.toString(), context.getSerializerClass()
-        .getName());
-    _config
-        .setSimpleField(Fields.REBALANCER_CONTEXT_CLASS.toString(), context.getClass().getName());
-    _context = context;
-    try {
-      _serializer = context.getSerializerClass().newInstance();
-      _config.setSimpleField(Fields.REBALANCER_CONTEXT.toString(), _serializer.serialize(context));
-    } catch (InstantiationException e) {
-      LOG.error("Error initializing the configuration", e);
-    } catch (IllegalAccessException e) {
-      LOG.error("Error initializing the configuration", e);
-    }
-  }
-
-  /**
-   * Instantiate from a physical ResourceConfiguration
-   * @param resourceConfiguration populated ResourceConfiguration
-   */
-  public RebalancerConfig(ResourceConfiguration resourceConfiguration) {
-    _config = new NamespacedConfig(resourceConfiguration, RebalancerConfig.class.getSimpleName());
-    _serializer = getSerializer();
-    _context = getContext();
-  }
-
-  /**
-   * Get the class that can serialize and deserialize the rebalancer context
-   * @return ContextSerializer
-   */
-  private ContextSerializer getSerializer() {
-    String serializerClassName = _config.getSimpleField(Fields.SERIALIZER_CLASS.toString());
-    if (serializerClassName != null) {
-      try {
-        return (ContextSerializer) HelixUtil.loadClass(getClass(), serializerClassName)
-            .newInstance();
-      } catch (InstantiationException e) {
-        LOG.error("Error getting the serializer", e);
-      } catch (IllegalAccessException e) {
-        LOG.error("Error getting the serializer", e);
-      } catch (ClassNotFoundException e) {
-        LOG.error("Error getting the serializer", e);
-      }
-    }
-    return null;
-  }
-
-  private RebalancerContext getContext() {
-    String className = _config.getSimpleField(Fields.REBALANCER_CONTEXT_CLASS.toString());
-    if (className != null) {
-      try {
-        Class<? extends RebalancerContext> contextClass =
-            HelixUtil.loadClass(getClass(), className).asSubclass(RebalancerContext.class);
-        String serialized = _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
-        return _serializer.deserialize(contextClass, serialized);
-      } catch (ClassNotFoundException e) {
-        LOG.error(className + " is not a valid class");
-      } catch (ClassCastException e) {
-        LOG.error(className + " does not implement RebalancerContext");
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Get a rebalancer class instance
-   * @return Rebalancer
-   */
-  public HelixRebalancer getRebalancer() {
-    // cache the rebalancer to avoid loading and instantiating it excessively
-    if (_rebalancer == null) {
-      if (_context == null || _context.getRebalancerRef() == null) {
-        return null;
-      }
-      _rebalancer = _context.getRebalancerRef().getRebalancer();
-    }
-    return _rebalancer;
-  }
-
-  /**
-   * Get the instantiated RebalancerContext
-   * @param contextClass specific class of the RebalancerContext
-   * @return RebalancerContext subclass instance, or null if conversion is not possible
-   */
-  public <T extends RebalancerContext> T getRebalancerContext(Class<T> contextClass) {
-    if (_context != null) {
-      try {
-        return contextClass.cast(_context);
-      } catch (ClassCastException e) {
-        LOG.warn(contextClass + " is incompatible with context class: " + _context.getClass());
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Get the rebalancer context serialized as a string
-   * @return string representing the context
-   */
-  public String getSerializedContext() {
-    return _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
-  }
-
-  /**
-   * Convert this to a namespaced config
-   * @return NamespacedConfig
-   */
-  public NamespacedConfig toNamespacedConfig() {
-    return _config;
-  }
-
-  /**
-   * Get a RebalancerConfig from a physical resource config
-   * @param resourceConfiguration physical resource config
-   * @return RebalancerConfig
-   */
-  public static RebalancerConfig from(ResourceConfiguration resourceConfiguration) {
-    return new RebalancerConfig(resourceConfiguration);
-  }
-
-  /**
-   * Get a RebalancerConfig from a RebalancerContext
-   * @param context instantiated RebalancerContext
-   * @return RebalancerConfig
-   */
-  public static RebalancerConfig from(RebalancerContext context) {
-    return new RebalancerConfig(context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
deleted file mode 100644
index 981891b..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Defines the state available to a rebalancer. The most common use case is to use a
- * {@link PartitionedRebalancerContext} or a subclass and set up a resource with it. A rebalancer
- * configuration, at a minimum, is aware of subunits of a resource, the state model to follow, and
- * how the configuration should be serialized.
- */
-public interface RebalancerContext {
-  /**
-   * Get a map of resource partition identifiers to partitions. A partition is a subunit of a
-   * resource, e.g. a subtask of a task
-   * @return map of (subunit id, subunit) pairs
-   */
-  public Map<? extends PartitionId, ? extends Partition> getSubUnitMap();
-
-  /**
-   * Get the subunits of the resource (e.g. partitions)
-   * @return set of subunit ids
-   */
-  public Set<? extends PartitionId> getSubUnitIdSet();
-
-  /**
-   * Get a specific subunit
-   * @param subUnitId the id of the subunit
-   * @return SubUnit
-   */
-  public Partition getSubUnit(PartitionId subUnitId);
-
-  /**
-   * Get the resource to rebalance
-   * @return resource id
-   */
-  public ResourceId getResourceId();
-
-  /**
-   * Get the state model definition that the resource follows
-   * @return state model definition id
-   */
-  public StateModelDefId getStateModelDefId();
-
-  /**
-   * Get the state model factory of this resource
-   * @return state model factory id
-   */
-  public StateModelFactoryId getStateModelFactoryId();
-
-  /**
-   * Get the tag, if any, that participants must have in order to serve this resource
-   * @return participant group tag, or null
-   */
-  public String getParticipantGroupTag();
-
-  /**
-   * Get the serializer for this context
-   * @return ContextSerializer class object
-   */
-  public Class<? extends ContextSerializer> getSerializerClass();
-
-  /**
-   * Get a reference to the class used to rebalance this resource
-   * @return RebalancerRef
-   */
-  public RebalancerRef getRebalancerRef();
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
deleted file mode 100644
index 525931d..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Methods specifying a rebalancer context that allows replicas. For instance, a rebalancer context
- * with partitions may accept state model definitions that support multiple replicas per partition,
- * and it's possible that the policy is that each live participant in the system should have a
- * replica.
- */
-public interface ReplicatedRebalancerContext extends RebalancerContext {
-  /**
-   * Check if this resource should be assigned to any live participant
-   * @return true if any live participant expected, false otherwise
-   */
-  public boolean anyLiveParticipant();
-
-  /**
-   * Get the number of replicas that each resource subunit should have
-   * @return replica count
-   */
-  public int getReplicaCount();
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
deleted file mode 100644
index afa81e2..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
+++ /dev/null
@@ -1,178 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for SEMI_AUTO rebalancer mode. It indicates the preferred locations of each
- * partition replica. By default, it corresponds to {@link SemiAutoRebalancer}
- */
-public final class SemiAutoRebalancerContext extends PartitionedRebalancerContext {
-  @JsonProperty("preferenceLists")
-  private Map<PartitionId, List<ParticipantId>> _preferenceLists;
-
-  /**
-   * Instantiate a SemiAutoRebalancerContext
-   */
-  public SemiAutoRebalancerContext() {
-    setRebalanceMode(RebalanceMode.SEMI_AUTO);
-    setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
-    _preferenceLists = Maps.newHashMap();
-  }
-
-  /**
-   * Get the preference lists of all partitions of the resource
-   * @return map of partition id to list of participant ids
-   */
-  public Map<PartitionId, List<ParticipantId>> getPreferenceLists() {
-    return _preferenceLists;
-  }
-
-  /**
-   * Set the preference lists of all partitions of the resource
-   * @param preferenceLists
-   */
-  public void setPreferenceLists(Map<PartitionId, List<ParticipantId>> preferenceLists) {
-    _preferenceLists = preferenceLists;
-  }
-
-  /**
-   * Get the preference list of a partition
-   * @param partitionId the partition to look up
-   * @return list of participant ids
-   */
-  @JsonIgnore
-  public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
-    return _preferenceLists.get(partitionId);
-  }
-
-  /**
-   * Generate preference lists based on a default cluster setup
-   * @param stateModelDef the state model definition to follow
-   * @param participantSet the set of participant ids to configure for
-   */
-  @Override
-  @JsonIgnore
-  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
-      Set<ParticipantId> participantSet) {
-    // compute default upper bounds
-    Map<State, String> upperBounds = Maps.newHashMap();
-    for (State state : stateModelDef.getTypedStatesPriorityList()) {
-      upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
-    }
-
-    // determine the current mapping
-    Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
-    for (PartitionId partitionId : getPartitionSet()) {
-      List<ParticipantId> preferenceList = getPreferenceList(partitionId);
-      if (preferenceList != null && !preferenceList.isEmpty()) {
-        Set<ParticipantId> disabledParticipants = Collections.emptySet();
-        Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
-        Map<ParticipantId, State> initialMap =
-            ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, participantSet,
-                stateModelDef, preferenceList, emptyCurrentState, disabledParticipants);
-        currentMapping.put(partitionId, initialMap);
-      }
-    }
-
-    // determine the preference
-    LinkedHashMap<State, Integer> stateCounts =
-        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
-            getReplicaCount());
-    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
-    List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
-    List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
-    AutoRebalanceStrategy strategy =
-        new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
-            getMaxPartitionsPerParticipant(), placementScheme);
-    Map<String, List<String>> rawPreferenceLists =
-        strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
-            .getListFields();
-    Map<PartitionId, List<ParticipantId>> preferenceLists =
-        Maps.newHashMap(IdealState.preferenceListsFromStringLists(rawPreferenceLists));
-    setPreferenceLists(preferenceLists);
-  }
-
-  /**
-   * Build a SemiAutoRebalancerContext. By default, it corresponds to {@link SemiAutoRebalancer}
-   */
-  public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
-    private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
-
-    /**
-     * Instantiate for a resource
-     * @param resourceId resource id
-     */
-    public Builder(ResourceId resourceId) {
-      super(resourceId);
-      super.rebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
-      super.rebalanceMode(RebalanceMode.SEMI_AUTO);
-      _preferenceLists = Maps.newHashMap();
-    }
-
-    /**
-     * Add a preference list for a partition
-     * @param partitionId partition to set
-     * @param preferenceList ordered list of participants who can serve the partition
-     * @return Builder
-     */
-    public Builder preferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
-      _preferenceLists.put(partitionId, preferenceList);
-      return self();
-    }
-
-    @Override
-    protected Builder self() {
-      return this;
-    }
-
-    @Override
-    public SemiAutoRebalancerContext build() {
-      SemiAutoRebalancerContext context = new SemiAutoRebalancerContext();
-      super.update(context);
-      context.setPreferenceLists(_preferenceLists);
-      return context;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/serializer/DefaultStringSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/serializer/DefaultStringSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/serializer/DefaultStringSerializer.java
new file mode 100644
index 0000000..d612451
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/serializer/DefaultStringSerializer.java
@@ -0,0 +1,82 @@
+package org.apache.helix.controller.serializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.StringWriter;
+
+import org.apache.helix.HelixException;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Default serializer implementation for converting to/from strings. Uses the Jackson JSON library
+ * to do the conversion
+ */
+public class DefaultStringSerializer implements StringSerializer {
+
+  private static Logger logger = Logger.getLogger(DefaultStringSerializer.class);
+
+  @Override
+  public <T> String serialize(final T data) {
+    if (data == null) {
+      return null;
+    }
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+    serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+    serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    StringWriter sw = new StringWriter();
+    try {
+      mapper.writeValue(sw, data);
+    } catch (Exception e) {
+      logger.error("Exception during payload data serialization.", e);
+      throw new HelixException(e);
+    }
+    return sw.toString();
+  }
+
+  @Override
+  public <T> T deserialize(final Class<T> clazz, final String string) {
+    if (string == null || string.length() == 0) {
+      return null;
+    }
+
+    ObjectMapper mapper = new ObjectMapper();
+    ByteArrayInputStream bais = new ByteArrayInputStream(string.getBytes());
+
+    DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_CREATORS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    try {
+      T payload = mapper.readValue(bais, clazz);
+      return payload;
+    } catch (Exception e) {
+      logger.error("Exception during deserialization of payload bytes: " + string, e);
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/serializer/StringSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/serializer/StringSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/serializer/StringSerializer.java
new file mode 100644
index 0000000..9311191
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/serializer/StringSerializer.java
@@ -0,0 +1,37 @@
+package org.apache.helix.controller.serializer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public interface StringSerializer {
+  /**
+   * Convert an object instance to a String
+   * @param data instance of an arbitrary type
+   * @return String representing the object
+   */
+  public <T> String serialize(final T data);
+
+  /**
+   * Convert raw bytes to a generic object instance
+   * @param clazz The class represented by the deserialized string
+   * @param string String representing the object
+   * @return instance of the generic type or null if the conversion failed
+   */
+  public <T> T deserialize(final Class<T> clazz, final String string);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index ae0278b..5cedd7c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -26,5 +26,6 @@ public enum AttributeName {
   MESSAGES_ALL,
   MESSAGES_SELECTED,
   MESSAGES_THROTTLE,
-  LOCAL_STATE
+  LOCAL_STATE,
+  CONTEXT_PROVIDER,
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 7b143bd..ec812b2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -25,18 +25,19 @@ import java.util.Set;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Resource;
 import org.apache.helix.api.State;
 import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.context.ControllerContextProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.controller.rebalancer.FallbackRebalancer;
 import org.apache.helix.controller.rebalancer.HelixRebalancer;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
@@ -173,18 +174,29 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       }
       ResourceConfig resourceConfig = resourceMap.get(resourceId);
       RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
-      RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
-      StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
+      StateModelDefinition stateModelDef =
+          stateModelDefs.get(rebalancerConfig.getStateModelDefId());
       ResourceAssignment resourceAssignment = null;
       if (rebalancerConfig != null) {
-        HelixRebalancer rebalancer = rebalancerConfig.getRebalancer();
+        HelixRebalancer rebalancer = null;
+        if (rebalancerConfig != null && rebalancerConfig.getRebalancerRef() != null) {
+          rebalancer = rebalancerConfig.getRebalancerRef().getRebalancer();
+        }
         HelixManager manager = event.getAttribute("helixmanager");
+        ControllerContextProvider provider =
+            event.getAttribute(AttributeName.CONTEXT_PROVIDER.toString());
         if (rebalancer == null) {
           rebalancer = new FallbackRebalancer();
         }
-        rebalancer.init(manager);
+        rebalancer.init(manager, provider);
+        ResourceAssignment currentAssignment = null;
+        Resource resourceSnapshot = cluster.getResource(resourceId);
+        if (resourceSnapshot != null) {
+          currentAssignment = resourceSnapshot.getResourceAssignment();
+        }
         resourceAssignment =
-            rebalancer.computeResourceMapping(rebalancerConfig, cluster, currentStateOutput);
+            rebalancer.computeResourceMapping(rebalancerConfig, currentAssignment, cluster,
+                currentStateOutput);
       }
       if (resourceAssignment == null) {
         resourceAssignment =

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index edceed6..7704378 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -44,8 +44,7 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
@@ -139,11 +138,8 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
         // partitions are finished (COMPLETED or ERROR), update the status update of the original
         // scheduler
         // message, and then remove the partitions from the ideal state
-        RebalancerContext rebalancerContext =
-            (rebalancerConfig != null) ? rebalancerConfig
-                .getRebalancerContext(RebalancerContext.class) : null;
-        if (rebalancerContext != null
-            && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
+        if (rebalancerConfig != null
+            && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
                 StateModelDefId.SchedulerTaskQueue)) {
           updateScheduledTaskStatus(resourceId, view, manager, schedulerTaskConfig);
         }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
index 08e6799..ef5a5fd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
@@ -39,7 +39,7 @@ import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.api.id.StateModelFactoryId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
@@ -76,9 +76,8 @@ public class MessageGenerationStage extends AbstractBaseStage {
       ResourceConfig resourceConfig = resourceMap.get(resourceId);
       int bucketSize = resourceConfig.getBucketSize();
 
-      RebalancerContext rebalancerCtx =
-          resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
-      StateModelDefinition stateModelDef = stateModelDefMap.get(rebalancerCtx.getStateModelDefId());
+      RebalancerConfig rebalancerCfg = resourceConfig.getRebalancerConfig();
+      StateModelDefinition stateModelDef = stateModelDefMap.get(rebalancerCfg.getStateModelDefId());
 
       ResourceAssignment resourceAssignment =
           bestPossibleStateOutput.getResourceAssignment(resourceId);
@@ -132,16 +131,15 @@ public class MessageGenerationStage extends AbstractBaseStage {
             SessionId sessionId =
                 cluster.getLiveParticipantMap().get(participantId).getRunningInstance()
                     .getSessionId();
-            RebalancerContext rebalancerContext =
-                resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
+            RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
             Message message =
                 createMessage(manager, resourceId, subUnitId, participantId, currentState,
                     nextState, sessionId, StateModelDefId.from(stateModelDef.getId()),
-                    rebalancerContext.getStateModelFactoryId(), bucketSize);
+                    rebalancerConfig.getStateModelFactoryId(), bucketSize);
 
             // TODO refactor get/set timeout/inner-message
-            if (rebalancerContext != null
-                && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
+            if (rebalancerConfig != null
+                && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
                     StateModelDefId.SchedulerTaskQueue)) {
               if (resourceConfig.getSubUnitMap().size() > 0) {
                 // TODO refactor it -- we need a way to read in scheduler tasks a priori

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index bbbf5c6..9adc833 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -38,9 +38,9 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.ReplicatedRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.ReplicatedRebalancerConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -109,8 +109,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
     for (ResourceId resourceId : resourceMap.keySet()) {
       ResourceConfig resource = resourceMap.get(resourceId);
       StateModelDefinition stateModelDef =
-          stateModelDefMap.get(resource.getRebalancerConfig()
-              .getRebalancerContext(RebalancerContext.class).getStateModelDefId());
+          stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
 
       // TODO have a logical model for transition
       Map<String, Integer> stateTransitionPriorities = getStateTransitionPriorityMap(stateModelDef);
@@ -266,9 +265,9 @@ public class MessageSelectionStage extends AbstractBaseStage {
    */
   private Map<State, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
       RebalancerConfig rebalancerConfig, Cluster cluster) {
-    ReplicatedRebalancerContext context =
-        (rebalancerConfig != null) ? rebalancerConfig
-            .getRebalancerContext(ReplicatedRebalancerContext.class) : null;
+    ReplicatedRebalancerConfig config =
+        (rebalancerConfig != null) ? BasicRebalancerConfig.convert(rebalancerConfig,
+            ReplicatedRebalancerConfig.class) : null;
     Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
 
     List<State> statePriorityList = stateModelDefinition.getTypedStatesPriorityList();
@@ -282,11 +281,11 @@ public class MessageSelectionStage extends AbstractBaseStage {
       } else if ("R".equals(numInstancesPerState)) {
         // idealState is null when resource has been dropped,
         // R can't be evaluated and ignore state constraints
-        if (context != null) {
-          if (context.anyLiveParticipant()) {
+        if (config != null) {
+          if (config.anyLiveParticipant()) {
             max = cluster.getLiveParticipantMap().size();
           } else {
-            max = context.getReplicaCount();
+            max = config.getReplicaCount();
           }
         }
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index 31dbb08..45fc355 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -1,12 +1,5 @@
 package org.apache.helix.controller.stages;
 
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.accessor.ResourceAccessor;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.model.ResourceAssignment;
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -26,6 +19,13 @@ import org.apache.helix.model.ResourceAssignment;
  * under the License.
  */
 
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.accessor.ResourceAccessor;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.model.ResourceAssignment;
+
 /**
  * Persist the ResourceAssignment of each resource that went through rebalancing
  */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java
new file mode 100644
index 0000000..e63041a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java
@@ -0,0 +1,59 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.id.ContextId;
+import org.apache.helix.controller.context.ControllerContext;
+import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Persist all dirty contexts set in the controller pipeline
+ */
+public class PersistContextStage extends AbstractBaseStage {
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    HelixManager helixManager = event.getAttribute("helixmanager");
+    HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    ControllerContextProvider contextProvider =
+        event.getAttribute(AttributeName.CONTEXT_PROVIDER.toString());
+    Map<ContextId, ControllerContext> pendingContexts = contextProvider.getPendingContexts();
+    List<PropertyKey> keys = Lists.newArrayList();
+    List<ControllerContextHolder> properties = Lists.newArrayList();
+    for (ContextId contextId : pendingContexts.keySet()) {
+      ControllerContextHolder holder = new ControllerContextHolder(pendingContexts.get(contextId));
+      if (holder != null) {
+        keys.add(keyBuilder.controllerContext(contextId.stringify()));
+        properties.add(holder);
+      }
+    }
+    accessor.setChildren(keys, properties);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index 44fddb6..fb016f1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -19,16 +19,23 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.Map;
+
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.accessor.ClusterAccessor;
 import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ContextId;
+import org.apache.helix.controller.context.ControllerContext;
+import org.apache.helix.controller.context.ControllerContextProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Maps;
+
 public class ReadClusterDataStage extends AbstractBaseStage {
   private static final Logger LOG = Logger.getLogger(ReadClusterDataStage.class.getName());
 
@@ -67,6 +74,16 @@ public class ReadClusterDataStage extends AbstractBaseStage {
 
     event.addAttribute("ClusterDataCache", cluster);
 
+    // read contexts (if any)
+    Map<ContextId, ControllerContext> persistedContexts = null;
+    if (cluster != null) {
+      persistedContexts = cluster.getContextMap();
+    } else {
+      persistedContexts = Maps.newHashMap();
+    }
+    ControllerContextProvider contextProvider = new ControllerContextProvider(persistedContexts);
+    event.addAttribute(AttributeName.CONTEXT_PROVIDER.toString(), contextProvider);
+
     long endTime = System.currentTimeMillis();
     LOG.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 1fdd892..5b75535 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -32,9 +32,8 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelFactoryId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.log4j.Logger;
 
@@ -72,7 +71,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
       resCfgBuilder.bucketSize(resource.getBucketSize());
       resCfgBuilder.batchMessageMode(resource.getBatchMessageMode());
       resCfgBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
-      resCfgBuilder.rebalancerContext(rebalancerCfg.getRebalancerContext(RebalancerContext.class));
+      resCfgBuilder.rebalancerConfig(rebalancerCfg);
       resCfgMap.put(resourceId, resCfgBuilder.build());
     }
 
@@ -89,8 +88,8 @@ public class ResourceComputationStage extends AbstractBaseStage {
     Map<ResourceId, ResourceConfig.Builder> resCfgBuilderMap =
         new HashMap<ResourceId, ResourceConfig.Builder>();
 
-    Map<ResourceId, PartitionedRebalancerContext.Builder> rebCtxBuilderMap =
-        new HashMap<ResourceId, PartitionedRebalancerContext.Builder>();
+    Map<ResourceId, PartitionedRebalancerConfig.Builder> rebCtxBuilderMap =
+        new HashMap<ResourceId, PartitionedRebalancerConfig.Builder>();
 
     for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
       for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
@@ -98,15 +97,15 @@ public class ResourceComputationStage extends AbstractBaseStage {
 
         if (currentState.getStateModelDefRef() == null) {
           LOG.error("state model def is null." + "resource:" + currentState.getResourceId()
-              + ", partitions: " + currentState.getPartitionStateMap().keySet()
-              + ", states: " + currentState.getPartitionStateMap().values());
+              + ", partitions: " + currentState.getPartitionStateMap().keySet() + ", states: "
+              + currentState.getPartitionStateMap().values());
           throw new StageException("State model def is null for resource:"
               + currentState.getResourceId());
         }
 
         if (!resCfgBuilderMap.containsKey(resourceId)) {
-          PartitionedRebalancerContext.Builder rebCtxBuilder =
-              new PartitionedRebalancerContext.Builder(resourceId);
+          PartitionedRebalancerConfig.Builder rebCtxBuilder =
+              new PartitionedRebalancerConfig.Builder(resourceId);
           rebCtxBuilder.stateModelDefId(currentState.getStateModelDefId());
           rebCtxBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
               .getStateModelFactoryName()));
@@ -118,7 +117,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
           resCfgBuilderMap.put(resourceId, resCfgBuilder);
         }
 
-        PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
+        PartitionedRebalancerConfig.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
         for (PartitionId partitionId : currentState.getTypedPartitionStateMap().keySet()) {
           rebCtxBuilder.addPartition(new Partition(partitionId));
         }
@@ -128,8 +127,8 @@ public class ResourceComputationStage extends AbstractBaseStage {
     Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
     for (ResourceId resourceId : resCfgBuilderMap.keySet()) {
       ResourceConfig.Builder resCfgBuilder = resCfgBuilderMap.get(resourceId);
-      PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
-      resCfgBuilder.rebalancerContext(rebCtxBuilder.build());
+      PartitionedRebalancerConfig.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
+      resCfgBuilder.rebalancerConfig(rebCtxBuilder.build());
       resCfgMap.put(resourceId, resCfgBuilder.build());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
index f32649f..8c5b863 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
@@ -6,8 +6,8 @@ import org.apache.helix.api.config.NamespacedConfig;
 import org.apache.helix.api.config.ResourceConfig.ResourceType;
 import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
 
 import com.google.common.base.Enums;
 import com.google.common.base.Optional;
@@ -105,11 +105,11 @@ public class ResourceConfiguration extends HelixProperty {
   }
 
   /**
-   * Get a RebalancerContext if available
-   * @return RebalancerContext, or null
+   * Get a RebalancerConfig if available
+   * @return RebalancerConfig, or null
    */
-  public RebalancerContext getRebalancerContext(Class<? extends RebalancerContext> clazz) {
-    RebalancerConfig config = new RebalancerConfig(this);
-    return config.getRebalancerContext(clazz);
+  public RebalancerConfig getRebalancerConfig(Class<? extends RebalancerConfig> clazz) {
+    RebalancerConfigHolder config = new RebalancerConfigHolder(this);
+    return config.getRebalancerConfig(clazz);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
index 98f7cac..ba8958d 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
@@ -64,10 +64,12 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -300,9 +302,9 @@ public class NewClusterSetup {
     idealState.setMaxPartitionsPerInstance(maxPartitionsPerNode);
     idealState.setStateModelDefId(stateModelDefId);
 
-    RebalancerContext rebalancerCtx = PartitionedRebalancerContext.from(idealState);
+    RebalancerConfig rebalancerCtx = PartitionedRebalancerConfig.from(idealState);
     ResourceConfig.Builder builder =
-        new ResourceConfig.Builder(resourceId).rebalancerContext(rebalancerCtx).bucketSize(
+        new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalancerCtx).bucketSize(
             bucketSize);
 
     ClusterAccessor accessor = clusterAccessor(clusterName);
@@ -407,10 +409,10 @@ public class NewClusterSetup {
           new IdealState(
               (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(idealStateJsonFile))));
 
-      RebalancerContext rebalancerCtx = PartitionedRebalancerContext.from(idealState);
+      RebalancerConfig rebalancerCtx = PartitionedRebalancerConfig.from(idealState);
       ResourceConfig.Builder builder =
-          new ResourceConfig.Builder(ResourceId.from(resourceName))
-              .rebalancerContext(rebalancerCtx).bucketSize(idealState.getBucketSize());
+          new ResourceConfig.Builder(ResourceId.from(resourceName)).rebalancerConfig(rebalancerCtx)
+              .bucketSize(idealState.getBucketSize());
 
       ClusterAccessor accessor = clusterAccessor(clusterName);
       accessor.addResourceToCluster(builder.build());
@@ -459,23 +461,25 @@ public class NewClusterSetup {
     StringBuilder sb = new StringBuilder();
     Map<ParticipantId, State> stateMap = resource.getExternalView().getStateMap(partitionId);
     sb.append(resourceName + "/" + partitionName + ", externalView: " + stateMap);
-    PartitionedRebalancerContext partitionedContext =
-        resource.getRebalancerConfig().getRebalancerContext(PartitionedRebalancerContext.class);
-    if (partitionedContext != null) {
+    PartitionedRebalancerConfig partitionedConfig =
+        PartitionedRebalancerConfig.from(resource.getRebalancerConfig());
+    if (partitionedConfig != null) {
       // for partitioned contexts, check the mode and apply mode-specific information if possible
-      if (partitionedContext.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
-        SemiAutoRebalancerContext semiAutoContext =
-            resource.getRebalancerConfig().getRebalancerContext(SemiAutoRebalancerContext.class);
-        sb.append(", preferenceList: " + semiAutoContext.getPreferenceList(partitionId));
-      } else if (partitionedContext.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
-        CustomRebalancerContext customContext =
-            resource.getRebalancerConfig().getRebalancerContext(CustomRebalancerContext.class);
-        sb.append(", preferenceMap: " + customContext.getPreferenceMap(partitionId));
+      if (partitionedConfig.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+        SemiAutoRebalancerConfig semiAutoConfig =
+            BasicRebalancerConfig.convert(resource.getRebalancerConfig(),
+                SemiAutoRebalancerConfig.class);
+        sb.append(", preferenceList: " + semiAutoConfig.getPreferenceList(partitionId));
+      } else if (partitionedConfig.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+        CustomRebalancerConfig customConfig =
+            BasicRebalancerConfig.convert(resource.getRebalancerConfig(),
+                CustomRebalancerConfig.class);
+        sb.append(", preferenceMap: " + customConfig.getPreferenceMap(partitionId));
       }
-      if (partitionedContext.anyLiveParticipant()) {
-        sb.append(", anyLiveParticipant: " + partitionedContext.anyLiveParticipant());
+      if (partitionedConfig.anyLiveParticipant()) {
+        sb.append(", anyLiveParticipant: " + partitionedConfig.anyLiveParticipant());
       } else {
-        sb.append(", replicaCount: " + partitionedContext.getReplicaCount());
+        sb.append(", replicaCount: " + partitionedConfig.getReplicaCount());
       }
     }
 
@@ -750,12 +754,13 @@ public class NewClusterSetup {
     ResourceAccessor accessor = resourceAccessor(clusterName);
     ResourceId resourceId = ResourceId.from(resourceName);
     Resource resource = accessor.readResource(resourceId);
+    RebalancerConfigHolder holder = new RebalancerConfigHolder(resource.getRebalancerConfig());
     StringBuilder sb =
         new StringBuilder("Resource ").append(resourceName).append(" in cluster ")
             .append(clusterName).append(":\n").append("externalView: ")
             .append(resource.getExternalView()).append(", userConfig: ")
-            .append(resource.getUserConfig()).append(", rebalancerContext: ")
-            .append(resource.getRebalancerConfig().getSerializedContext());
+            .append(resource.getUserConfig()).append(", rebalancerConfig: ")
+            .append(holder.getSerializedConfig());
     System.out.println(sb.toString());
   }
 
@@ -956,17 +961,18 @@ public class NewClusterSetup {
   private void expandResource(ClusterId clusterId, ResourceId resourceId) {
     ResourceAccessor accessor = resourceAccessor(clusterId.stringify());
     Resource resource = accessor.readResource(resourceId);
-    SemiAutoRebalancerContext context =
-        resource.getRebalancerConfig().getRebalancerContext(SemiAutoRebalancerContext.class);
-    if (context == null) {
+    SemiAutoRebalancerConfig config =
+        BasicRebalancerConfig.convert(resource.getRebalancerConfig(),
+            SemiAutoRebalancerConfig.class);
+    if (config == null) {
       LOG.info("Only SEMI_AUTO mode supported for resource expansion");
       return;
     }
-    if (context.anyLiveParticipant()) {
+    if (config.anyLiveParticipant()) {
       LOG.info("Resource uses ANY_LIVE_PARTICIPANT, skipping default assignment");
       return;
     }
-    if (context.getPreferenceLists().size() == 0) {
+    if (config.getPreferenceLists().size() == 0) {
       LOG.info("No preference lists have been set yet, skipping default assignment");
       return;
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index 0d597f6..ab4ead0 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -33,11 +33,12 @@ import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
 import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
@@ -99,8 +100,8 @@ public class TestNewStages extends ZkUnitTestBase {
     ResourceId resourceId = ResourceId.from("TestDB0");
     Assert.assertTrue(resourceMap.containsKey(resourceId));
     Resource resource = resourceMap.get(resourceId);
-    Assert.assertNotNull(resource.getRebalancerConfig().getRebalancerContext(
-        SemiAutoRebalancerContext.class));
+    Assert.assertNotNull(BasicRebalancerConfig.convert(resource.getRebalancerConfig(),
+        SemiAutoRebalancerConfig.class));
 
     System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
   }
@@ -163,8 +164,12 @@ public class TestNewStages extends ZkUnitTestBase {
     Resource resource = cluster.getResource(resourceId);
     ResourceCurrentState currentStateOutput = new ResourceCurrentState();
     ResourceAssignment semiAutoResult =
-        resource.getRebalancerConfig().getRebalancer()
-            .computeResourceMapping(resource.getRebalancerConfig(), cluster, currentStateOutput);
+        resource
+            .getRebalancerConfig()
+            .getRebalancerRef()
+            .getRebalancer()
+            .computeResourceMapping(resource.getRebalancerConfig(), null, cluster,
+                currentStateOutput);
     verifySemiAutoRebalance(resource, semiAutoResult);
 
     System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
@@ -177,8 +182,9 @@ public class TestNewStages extends ZkUnitTestBase {
    */
   private void verifySemiAutoRebalance(Resource resource, ResourceAssignment assignment) {
     Assert.assertEquals(assignment.getMappedPartitionIds().size(), resource.getSubUnitSet().size());
-    SemiAutoRebalancerContext context =
-        resource.getRebalancerConfig().getRebalancerContext(SemiAutoRebalancerContext.class);
+    SemiAutoRebalancerConfig context =
+        BasicRebalancerConfig.convert(resource.getRebalancerConfig(),
+            SemiAutoRebalancerConfig.class);
     for (PartitionId partitionId : assignment.getMappedPartitionIds()) {
       List<ParticipantId> preferenceList = context.getPreferenceList(partitionId);
       Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java b/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
index 74781cd..3c8fb2c 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
@@ -9,8 +9,9 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -91,29 +92,29 @@ public class TestUpdateConfig {
     // message mode
     UserConfig userConfig = new UserConfig(Scope.resource(resourceId));
     userConfig.setSimpleField("key1", "value1");
-    SemiAutoRebalancerContext rebalancerContext =
-        new SemiAutoRebalancerContext.Builder(resourceId).build();
+    SemiAutoRebalancerConfig rebalancerContext =
+        new SemiAutoRebalancerConfig.Builder(resourceId).build();
     ResourceConfig config =
         new ResourceConfig.Builder(resourceId).userConfig(userConfig)
-            .rebalancerContext(rebalancerContext).bucketSize(OLD_BUCKET_SIZE)
-            .batchMessageMode(true).build();
+            .rebalancerConfig(rebalancerContext).bucketSize(OLD_BUCKET_SIZE).batchMessageMode(true)
+            .build();
 
     // update: overwrite user config, change to full auto rebalancer context, and change the bucket
     // size
     UserConfig newUserConfig = new UserConfig(Scope.resource(resourceId));
     newUserConfig.setSimpleField("key2", "value2");
-    FullAutoRebalancerContext newRebalancerContext =
-        new FullAutoRebalancerContext.Builder(resourceId).build();
+    FullAutoRebalancerConfig newRebalancerContext =
+        new FullAutoRebalancerConfig.Builder(resourceId).build();
     ResourceConfig updated =
         new ResourceConfig.Delta(resourceId).setBucketSize(NEW_BUCKET_SIZE)
-            .setUserConfig(newUserConfig).setRebalancerContext(newRebalancerContext)
+            .setUserConfig(newUserConfig).setRebalancerConfig(newRebalancerContext)
             .mergeInto(config);
     Assert.assertEquals(updated.getBucketSize(), NEW_BUCKET_SIZE);
     Assert.assertTrue(updated.getBatchMessageMode());
-    Assert.assertNull(updated.getRebalancerConfig().getRebalancerContext(
-        SemiAutoRebalancerContext.class));
-    Assert.assertNotNull(updated.getRebalancerConfig().getRebalancerContext(
-        FullAutoRebalancerContext.class));
+    Assert.assertNull(BasicRebalancerConfig.convert(updated.getRebalancerConfig(),
+        SemiAutoRebalancerConfig.class));
+    Assert.assertNotNull(BasicRebalancerConfig.convert(updated.getRebalancerConfig(),
+        FullAutoRebalancerConfig.class));
     Assert.assertNull(updated.getUserConfig().getSimpleField("key1"));
     Assert.assertEquals(updated.getUserConfig().getSimpleField("key2"), "value2");
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
index 5bbe54f..94deaac 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
@@ -8,6 +8,9 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
 import org.apache.helix.model.ResourceConfiguration;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -41,7 +44,7 @@ public class TestSerializeRebalancerContext {
   @Test
   public void basicTest() {
     // populate a context
-    CustomRebalancerContext context = new CustomRebalancerContext();
+    CustomRebalancerConfig context = new CustomRebalancerConfig();
     context.setAnyLiveParticipant(false);
     context.setMaxPartitionsPerParticipant(Integer.MAX_VALUE);
     Map<PartitionId, Partition> partitionMap = Maps.newHashMap();
@@ -61,9 +64,8 @@ public class TestSerializeRebalancerContext {
     context.setResourceId(resourceId);
 
     // serialize and deserialize by wrapping in a config
-    RebalancerConfig config = new RebalancerConfig(context);
-    CustomRebalancerContext deserialized =
-        config.getRebalancerContext(CustomRebalancerContext.class);
+    RebalancerConfigHolder config = new RebalancerConfigHolder(context);
+    CustomRebalancerConfig deserialized = config.getRebalancerConfig(CustomRebalancerConfig.class);
 
     // check to make sure that the two objects contain the same data
     Assert.assertNotNull(deserialized);
@@ -79,9 +81,9 @@ public class TestSerializeRebalancerContext {
     // wrap in a physical config and then unwrap it
     ResourceConfiguration physicalConfig = new ResourceConfiguration(resourceId);
     physicalConfig.addNamespacedConfig(config.toNamespacedConfig());
-    RebalancerConfig extractedConfig = new RebalancerConfig(physicalConfig);
-    CustomRebalancerContext extractedContext =
-        extractedConfig.getRebalancerContext(CustomRebalancerContext.class);
+    RebalancerConfigHolder extractedConfig = new RebalancerConfigHolder(physicalConfig);
+    CustomRebalancerConfig extractedContext =
+        extractedConfig.getRebalancerConfig(CustomRebalancerConfig.class);
 
     // make sure the unwrapped data hasn't changed
     Assert.assertNotNull(extractedContext);
@@ -95,8 +97,8 @@ public class TestSerializeRebalancerContext {
     Assert.assertEquals(extractedContext.getResourceId(), context.getResourceId());
 
     // make sure that it's legal to use a base rebalancer context
-    RebalancerContext rebalancerContext =
-        extractedConfig.getRebalancerContext(RebalancerContext.class);
+    RebalancerConfig rebalancerContext =
+        extractedConfig.getRebalancerConfig(RebalancerConfig.class);
     Assert.assertNotNull(rebalancerContext);
     Assert.assertEquals(rebalancerContext.getResourceId(), context.getResourceId());
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index e53530c..22c4168 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -41,8 +41,8 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
@@ -169,7 +169,7 @@ public class BaseStageTest {
     Map<ResourceId, ResourceConfig> resourceMap = new HashMap<ResourceId, ResourceConfig>();
     for (IdealState idealState : idealStates) {
       ResourceId resourceId = idealState.getResourceId();
-      RebalancerContext context = PartitionedRebalancerContext.from(idealState);
+      RebalancerConfig context = PartitionedRebalancerConfig.from(idealState);
       Resource resource =
           new Resource(resourceId, ResourceType.DATA, idealState, null, null, context,
               new UserConfig(Scope.resource(resourceId)), idealState.getBucketSize(),

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
index e320011..90ea393 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
@@ -33,7 +33,6 @@ import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
@@ -79,8 +78,7 @@ public class TestResourceComputationStage extends BaseStageTest {
     AssertJUnit.assertEquals(resource.values().iterator().next().getId(),
         ResourceId.from(resourceName));
     AssertJUnit.assertEquals(resource.values().iterator().next().getRebalancerConfig()
-        .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
-        idealState.getStateModelDefId());
+        .getStateModelDefId(), idealState.getStateModelDefId());
     AssertJUnit
         .assertEquals(resource.values().iterator().next().getSubUnitSet().size(), partitions);
   }
@@ -107,8 +105,7 @@ public class TestResourceComputationStage extends BaseStageTest {
       AssertJUnit.assertTrue(resourceMap.containsKey(resourceId));
       AssertJUnit.assertEquals(resourceMap.get(resourceId).getId(), resourceId);
       AssertJUnit.assertEquals(resourceMap.get(resourceId).getRebalancerConfig()
-          .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
-          idealState.getStateModelDefId());
+          .getStateModelDefId(), idealState.getStateModelDefId());
       AssertJUnit.assertEquals(resourceMap.get(resourceId).getSubUnitSet().size(),
           idealState.getNumPartitions());
     }
@@ -182,8 +179,7 @@ public class TestResourceComputationStage extends BaseStageTest {
       AssertJUnit.assertTrue(resourceMap.containsKey(resourceId));
       AssertJUnit.assertEquals(resourceMap.get(resourceId).getId(), resourceId);
       AssertJUnit.assertEquals(resourceMap.get(resourceId).getRebalancerConfig()
-          .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
-          idealState.getStateModelDefId());
+          .getStateModelDefId(), idealState.getStateModelDefId());
       AssertJUnit.assertEquals(resourceMap.get(resourceId).getSubUnitSet().size(),
           idealState.getNumPartitions());
     }
@@ -192,8 +188,7 @@ public class TestResourceComputationStage extends BaseStageTest {
     AssertJUnit.assertTrue(resourceMap.containsKey(oldResourceId));
     AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getId(), oldResourceId);
     AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getRebalancerConfig()
-        .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
-        currentState.getStateModelDefId());
+        .getStateModelDefId(), currentState.getStateModelDefId());
     AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getSubUnitSet().size(), currentState
         .getTypedPartitionStateMap().size());
     AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getSubUnit(

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index ce26e2e..11805fe 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -30,11 +30,15 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.State;
+import org.apache.helix.api.id.ContextId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.controller.context.BasicControllerContext;
+import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.controller.context.ControllerContextProvider;
 import org.apache.helix.controller.rebalancer.HelixRebalancer;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -60,22 +64,23 @@ public class TestCustomizedIdealStateRebalancer extends
 
   public static class TestRebalancer implements HelixRebalancer {
 
+    private ControllerContextProvider _contextProvider;
+
     /**
      * Very basic mapping that evenly assigns one replica of each partition to live nodes, each of
      * which is in the highest-priority state.
      */
     @Override
-    public ResourceAssignment computeResourceMapping(RebalancerConfig config, Cluster cluster,
-        ResourceCurrentState currentState) {
-      PartitionedRebalancerContext context =
-          config.getRebalancerContext(PartitionedRebalancerContext.class);
+    public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+        ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+      PartitionedRebalancerConfig config = PartitionedRebalancerConfig.from(rebalancerConfig);
       StateModelDefinition stateModelDef =
-          cluster.getStateModelMap().get(context.getStateModelDefId());
+          cluster.getStateModelMap().get(config.getStateModelDefId());
       List<ParticipantId> liveParticipants =
           new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
-      ResourceAssignment resourceMapping = new ResourceAssignment(context.getResourceId());
+      ResourceAssignment resourceMapping = new ResourceAssignment(config.getResourceId());
       int i = 0;
-      for (PartitionId partitionId : context.getPartitionSet()) {
+      for (PartitionId partitionId : config.getPartitionSet()) {
         int nodeIndex = i % liveParticipants.size();
         Map<ParticipantId, State> replicaMap = new HashMap<ParticipantId, State>();
         replicaMap.put(liveParticipants.get(nodeIndex), stateModelDef.getTypedStatesPriorityList()
@@ -84,12 +89,17 @@ public class TestCustomizedIdealStateRebalancer extends
         i++;
       }
       testRebalancerInvoked = true;
+
+      // set some basic context
+      ContextId contextId = ContextId.from(config.getResourceId().stringify());
+      _contextProvider.putControllerContext(contextId, new BasicControllerContext(contextId));
       return resourceMapping;
     }
 
     @Override
-    public void init(HelixManager helixManager) {
+    public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
       testRebalancerCreated = true;
+      _contextProvider = contextProvider;
     }
   }
 
@@ -124,6 +134,11 @@ public class TestCustomizedIdealStateRebalancer extends
     }
     Assert.assertTrue(testRebalancerCreated);
     Assert.assertTrue(testRebalancerInvoked);
+
+    // check that context can be extracted
+    ControllerContextHolder holder = accessor.getProperty(keyBuilder.controllerContext(db2));
+    Assert.assertNotNull(holder);
+    Assert.assertNotNull(holder.getContext());
   }
 
   public static class ExternalViewBalancedVerifier implements ZkVerifier {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
index 29e0a44..b415393 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
@@ -42,8 +42,8 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
 import org.apache.helix.manager.zk.ZkHelixConnection;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.Message;
@@ -122,13 +122,13 @@ public class TestHelixConnection extends ZkUnitTestBase {
             .addTransition(slave, offline, 4).addTransition(slave, master, 2)
             .addTransition(master, slave, 1).addTransition(offline, dropped).initialState(offline)
             .upperBound(master, 1).dynamicUpperBound(slave, "R").build();
-    RebalancerContext rebalancerCtx =
-        new SemiAutoRebalancerContext.Builder(resourceId).addPartitions(1).replicaCount(1)
+    RebalancerConfig rebalancerCtx =
+        new SemiAutoRebalancerConfig.Builder(resourceId).addPartitions(1).replicaCount(1)
             .stateModelDefId(stateModelDefId)
             .preferenceList(PartitionId.from("testDB_0"), Arrays.asList(participantId)).build();
     clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId).addStateModelDefinition(
         stateModelDef).build());
-    clusterAccessor.addResourceToCluster(new ResourceConfig.Builder(resourceId).rebalancerContext(
+    clusterAccessor.addResourceToCluster(new ResourceConfig.Builder(resourceId).rebalancerConfig(
         rebalancerCtx).build());
     clusterAccessor.addParticipantToCluster(new ParticipantConfig.Builder(participantId).build());
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
index c233417..880d31c 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
@@ -23,7 +23,7 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
 import org.apache.helix.manager.zk.ZkHelixConnection;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.Message;
@@ -219,10 +219,10 @@ public class LogicalModelExample {
     Partition partition2 = new Partition(PartitionId.from(resourceId, "2"));
 
     // specify the rebalancer configuration
-    // this resource will be rebalanced in FULL_AUTO mode, so use the FullAutoRebalancerContext
+    // this resource will be rebalanced in FULL_AUTO mode, so use the FullAutoRebalancerConfig
     // builder
-    FullAutoRebalancerContext.Builder rebalanceContextBuilder =
-        new FullAutoRebalancerContext.Builder(resourceId).replicaCount(1).addPartition(partition1)
+    FullAutoRebalancerConfig.Builder rebalanceConfigBuilder =
+        new FullAutoRebalancerConfig.Builder(resourceId).replicaCount(1).addPartition(partition1)
             .addPartition(partition2).stateModelDefId(stateModelDef.getStateModelDefId());
 
     // create (optional) user-defined configuration properties for the resource
@@ -231,7 +231,7 @@ public class LogicalModelExample {
 
     // create the configuration for a new resource
     ResourceConfig.Builder resourceBuilder =
-        new ResourceConfig.Builder(resourceId).rebalancerContext(rebalanceContextBuilder.build())
+        new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalanceConfigBuilder.build())
             .userConfig(userConfig);
     return resourceBuilder.build();
   }


[3/4] [HELIX-327] Simplify rebalancer, rename rebalancer configs, support settable contexts, rb=15981

Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
new file mode 100644
index 0000000..73c3ccc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
@@ -0,0 +1,164 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.CustomRebalancer;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerConfig for a resource that should be rebalanced in CUSTOMIZED mode. By default, it
+ * corresponds to {@link CustomRebalancer}
+ */
+public class CustomRebalancerConfig extends PartitionedRebalancerConfig {
+  private Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
+
+  /**
+   * Instantiate a CustomRebalancerConfig
+   */
+  public CustomRebalancerConfig() {
+    setRebalanceMode(RebalanceMode.CUSTOMIZED);
+    setRebalancerRef(RebalancerRef.from(CustomRebalancer.class));
+    _preferenceMaps = Maps.newHashMap();
+  }
+
+  /**
+   * Get the preference maps of the partitions and replicas of the resource
+   * @return map of partition to participant and state
+   */
+  public Map<PartitionId, Map<ParticipantId, State>> getPreferenceMaps() {
+    return _preferenceMaps;
+  }
+
+  /**
+   * Set the preference maps of the partitions and replicas of the resource
+   * @param preferenceMaps map of partition to participant and state
+   */
+  public void setPreferenceMaps(Map<PartitionId, Map<ParticipantId, State>> preferenceMaps) {
+    _preferenceMaps = preferenceMaps;
+  }
+
+  /**
+   * Get the preference map of a partition
+   * @param partitionId the partition to look up
+   * @return map of participant to state
+   */
+  @JsonIgnore
+  public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
+    return _preferenceMaps.get(partitionId);
+  }
+
+  /**
+   * Generate preference maps based on a default cluster setup
+   * @param stateModelDef the state model definition to follow
+   * @param participantSet the set of participant ids to configure for
+   */
+  @Override
+  @JsonIgnore
+  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+      Set<ParticipantId> participantSet) {
+    // compute default upper bounds
+    Map<State, String> upperBounds = Maps.newHashMap();
+    for (State state : stateModelDef.getTypedStatesPriorityList()) {
+      upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
+    }
+
+    // determine the current mapping
+    Map<PartitionId, Map<ParticipantId, State>> currentMapping = getPreferenceMaps();
+
+    // determine the preference maps
+    LinkedHashMap<State, Integer> stateCounts =
+        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
+            getReplicaCount());
+    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+    List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
+    List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
+    AutoRebalanceStrategy strategy =
+        new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
+            getMaxPartitionsPerParticipant(), placementScheme);
+    Map<String, Map<String, String>> rawPreferenceMaps =
+        strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
+            .getMapFields();
+    Map<PartitionId, Map<ParticipantId, State>> preferenceMaps =
+        Maps.newHashMap(ResourceAssignment.replicaMapsFromStringMaps(rawPreferenceMaps));
+    setPreferenceMaps(preferenceMaps);
+  }
+
+  /**
+   * Build a CustomRebalancerConfig. By default, it corresponds to {@link CustomRebalancer}
+   */
+  public static final class Builder extends PartitionedRebalancerConfig.AbstractBuilder<Builder> {
+    private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
+
+    /**
+     * Instantiate for a resource
+     * @param resourceId resource id
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+      super.rebalancerRef(RebalancerRef.from(CustomRebalancer.class));
+      super.rebalanceMode(RebalanceMode.CUSTOMIZED);
+      _preferenceMaps = Maps.newHashMap();
+    }
+
+    /**
+     * Add a preference map for a partition
+     * @param partitionId partition to set
+     * @param preferenceList map of participant id to state indicating where replicas are served
+     * @return Builder
+     */
+    public Builder preferenceMap(PartitionId partitionId, Map<ParticipantId, State> preferenceMap) {
+      _preferenceMaps.put(partitionId, preferenceMap);
+      return self();
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    @Override
+    public CustomRebalancerConfig build() {
+      CustomRebalancerConfig config = new CustomRebalancerConfig();
+      super.update(config);
+      config.setPreferenceMaps(_preferenceMaps);
+      return config;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
new file mode 100644
index 0000000..828d509
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
@@ -0,0 +1,64 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.model.IdealState.RebalanceMode;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerConfig for FULL_AUTO rebalancing mode. By default, it corresponds to
+ * {@link FullAutoRebalancer}
+ */
+public class FullAutoRebalancerConfig extends PartitionedRebalancerConfig {
+  public FullAutoRebalancerConfig() {
+    setRebalanceMode(RebalanceMode.FULL_AUTO);
+    setRebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
+  }
+
+  /**
+   * Builder for a full auto rebalancer config. By default, it corresponds to
+   * {@link FullAutoRebalancer}
+   */
+  public static final class Builder extends PartitionedRebalancerConfig.AbstractBuilder<Builder> {
+    /**
+     * Instantiate with a resource
+     * @param resourceId resource id
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+      super.rebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
+      super.rebalanceMode(RebalanceMode.FULL_AUTO);
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    @Override
+    public FullAutoRebalancerConfig build() {
+      FullAutoRebalancerConfig config = new FullAutoRebalancerConfig();
+      super.update(config);
+      return config;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
new file mode 100644
index 0000000..2c9769d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
@@ -0,0 +1,405 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixConstants.StateModelToken;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerConfig for a resource whose subunits are partitions. In addition, these partitions can
+ * be replicated.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class PartitionedRebalancerConfig extends BasicRebalancerConfig implements
+    ReplicatedRebalancerConfig {
+  private Map<PartitionId, Partition> _partitionMap;
+  private boolean _anyLiveParticipant;
+  private int _replicaCount;
+  private int _maxPartitionsPerParticipant;
+  private RebalanceMode _rebalanceMode;
+
+  /**
+   * Instantiate a PartitionedRebalancerConfig
+   */
+  public PartitionedRebalancerConfig() {
+    _partitionMap = Collections.emptyMap();
+    _replicaCount = 1;
+    _anyLiveParticipant = false;
+    _maxPartitionsPerParticipant = Integer.MAX_VALUE;
+    _rebalanceMode = RebalanceMode.USER_DEFINED;
+  }
+
+  /**
+   * Get a map from partition id to partition
+   * @return partition map (mutable)
+   */
+  public Map<PartitionId, Partition> getPartitionMap() {
+    return _partitionMap;
+  }
+
+  /**
+   * Set a map of partition id to partition
+   * @param partitionMap partition map
+   */
+  public void setPartitionMap(Map<PartitionId, Partition> partitionMap) {
+    _partitionMap = Maps.newHashMap(partitionMap);
+  }
+
+  /**
+   * Get the set of partitions for this resource
+   * @return set of partition ids
+   */
+  @JsonIgnore
+  public Set<PartitionId> getPartitionSet() {
+    return _partitionMap.keySet();
+  }
+
+  /**
+   * Get a partition
+   * @param partitionId id of the partition to get
+   * @return Partition object, or null if not present
+   */
+  @JsonIgnore
+  public Partition getPartition(PartitionId partitionId) {
+    return _partitionMap.get(partitionId);
+  }
+
+  @Override
+  public boolean anyLiveParticipant() {
+    return _anyLiveParticipant;
+  }
+
+  /**
+   * Indicate if this resource should be assigned to any live participant
+   * @param anyLiveParticipant true if any live participant expected, false otherwise
+   */
+  public void setAnyLiveParticipant(boolean anyLiveParticipant) {
+    _anyLiveParticipant = anyLiveParticipant;
+  }
+
+  @Override
+  public int getReplicaCount() {
+    return _replicaCount;
+  }
+
+  /**
+   * Set the number of replicas that each partition should have
+   * @param replicaCount
+   */
+  public void setReplicaCount(int replicaCount) {
+    _replicaCount = replicaCount;
+  }
+
+  /**
+   * Get the maximum number of partitions that a participant can serve
+   * @return maximum number of partitions per participant
+   */
+  public int getMaxPartitionsPerParticipant() {
+    return _maxPartitionsPerParticipant;
+  }
+
+  /**
+   * Set the maximum number of partitions that a participant can serve
+   * @param maxPartitionsPerParticipant maximum number of partitions per participant
+   */
+  public void setMaxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
+    _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
+  }
+
+  /**
+   * Set the rebalancer mode of the partitioned resource
+   * @param rebalanceMode {@link RebalanceMode} enum value
+   */
+  public void setRebalanceMode(RebalanceMode rebalanceMode) {
+    _rebalanceMode = rebalanceMode;
+  }
+
+  /**
+   * Get the rebalancer mode of the resource
+   * @return RebalanceMode
+   */
+  public RebalanceMode getRebalanceMode() {
+    return _rebalanceMode;
+  }
+
+  @Override
+  @JsonIgnore
+  public Map<PartitionId, Partition> getSubUnitMap() {
+    return getPartitionMap();
+  }
+
+  /**
+   * Generate a default configuration given the state model and a participant.
+   * @param stateModelDef the state model definition to follow
+   * @param participantSet the set of participant ids to configure for
+   */
+  @JsonIgnore
+  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+      Set<ParticipantId> participantSet) {
+    // the base config does not understand enough to know do to anything
+  }
+
+  /**
+   * Safely get a {@link PartitionedRebalancerConfig} from a {@link RebalancerConfig}
+   * @param config the base config
+   * @return a {@link PartitionedRebalancerConfig}, or null if the conversion is not possible
+   */
+  public static PartitionedRebalancerConfig from(RebalancerConfig config) {
+    try {
+      return PartitionedRebalancerConfig.class.cast(config);
+    } catch (ClassCastException e) {
+      return null;
+    }
+  }
+
+  /**
+   * Convert a physically-stored IdealState into a rebalancer config for a partitioned resource
+   * @param idealState populated IdealState
+   * @return PartitionedRebalancerConfig
+   */
+  public static PartitionedRebalancerConfig from(IdealState idealState) {
+    PartitionedRebalancerConfig config;
+    switch (idealState.getRebalanceMode()) {
+    case FULL_AUTO:
+      FullAutoRebalancerConfig.Builder fullAutoBuilder =
+          new FullAutoRebalancerConfig.Builder(idealState.getResourceId());
+      populateConfig(fullAutoBuilder, idealState);
+      config = fullAutoBuilder.build();
+      break;
+    case SEMI_AUTO:
+      SemiAutoRebalancerConfig.Builder semiAutoBuilder =
+          new SemiAutoRebalancerConfig.Builder(idealState.getResourceId());
+      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+        semiAutoBuilder.preferenceList(partitionId, idealState.getPreferenceList(partitionId));
+      }
+      populateConfig(semiAutoBuilder, idealState);
+      config = semiAutoBuilder.build();
+      break;
+    case CUSTOMIZED:
+      CustomRebalancerConfig.Builder customBuilder =
+          new CustomRebalancerConfig.Builder(idealState.getResourceId());
+      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+        customBuilder.preferenceMap(partitionId, idealState.getParticipantStateMap(partitionId));
+      }
+      populateConfig(customBuilder, idealState);
+      config = customBuilder.build();
+      break;
+    default:
+      Builder baseBuilder = new Builder(idealState.getResourceId());
+      populateConfig(baseBuilder, idealState);
+      config = baseBuilder.build();
+      break;
+    }
+    return config;
+  }
+
+  /**
+   * Update a builder subclass with all the fields of the ideal state
+   * @param builder builder that extends AbstractBuilder
+   * @param idealState populated IdealState
+   */
+  private static <T extends AbstractBuilder<T>> void populateConfig(T builder, IdealState idealState) {
+    String replicas = idealState.getReplicas();
+    int replicaCount = 0;
+    boolean anyLiveParticipant = false;
+    if (replicas.equals(StateModelToken.ANY_LIVEINSTANCE.toString())) {
+      anyLiveParticipant = true;
+    } else {
+      replicaCount = Integer.parseInt(replicas);
+    }
+    if (idealState.getNumPartitions() > 0 && idealState.getPartitionIdSet().size() == 0) {
+      // backwards compatibility: partition sets were based on pref lists/maps previously
+      builder.addPartitions(idealState.getNumPartitions());
+    } else {
+      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+        builder.addPartition(new Partition(partitionId));
+      }
+    }
+    builder.anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
+        .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
+        .participantGroupTag(idealState.getInstanceGroupTag())
+        .stateModelDefId(idealState.getStateModelDefId())
+        .stateModelFactoryId(idealState.getStateModelFactoryId());
+    RebalancerRef rebalancerRef = idealState.getRebalancerRef();
+    if (rebalancerRef != null) {
+      builder.rebalancerRef(rebalancerRef);
+    }
+  }
+
+  /**
+   * Builder for a basic data rebalancer config
+   */
+  public static final class Builder extends AbstractBuilder<Builder> {
+    /**
+     * Instantiate with a resource
+     * @param resourceId resource id
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    @Override
+    public PartitionedRebalancerConfig build() {
+      PartitionedRebalancerConfig config = new PartitionedRebalancerConfig();
+      super.update(config);
+      return config;
+    }
+  }
+
+  /**
+   * Abstract builder for a generic partitioned resource rebalancer config
+   */
+  public static abstract class AbstractBuilder<T extends BasicRebalancerConfig.AbstractBuilder<T>>
+      extends BasicRebalancerConfig.AbstractBuilder<T> {
+    private final ResourceId _resourceId;
+    private final Map<PartitionId, Partition> _partitionMap;
+    private RebalanceMode _rebalanceMode;
+    private boolean _anyLiveParticipant;
+    private int _replicaCount;
+    private int _maxPartitionsPerParticipant;
+
+    /**
+     * Instantiate with a resource
+     * @param resourceId resource id
+     */
+    public AbstractBuilder(ResourceId resourceId) {
+      super(resourceId);
+      _resourceId = resourceId;
+      _partitionMap = Maps.newHashMap();
+      _rebalanceMode = RebalanceMode.USER_DEFINED;
+      _anyLiveParticipant = false;
+      _replicaCount = 1;
+      _maxPartitionsPerParticipant = Integer.MAX_VALUE;
+    }
+
+    /**
+     * Set the rebalance mode for a partitioned rebalancer config
+     * @param rebalanceMode {@link RebalanceMode} enum value
+     * @return Builder
+     */
+    public T rebalanceMode(RebalanceMode rebalanceMode) {
+      _rebalanceMode = rebalanceMode;
+      return self();
+    }
+
+    /**
+     * Add a partition that the resource serves
+     * @param partition fully-qualified partition
+     * @return Builder
+     */
+    public T addPartition(Partition partition) {
+      _partitionMap.put(partition.getId(), partition);
+      return self();
+    }
+
+    /**
+     * Add a collection of partitions
+     * @param partitions any collection of Partition objects
+     * @return Builder
+     */
+    public T addPartitions(Collection<Partition> partitions) {
+      for (Partition partition : partitions) {
+        addPartition(partition);
+      }
+      return self();
+    }
+
+    /**
+     * Add a specified number of partitions with a default naming scheme, namely
+     * resourceId_partitionNumber where partitionNumber starts at 0
+     * @param partitionCount number of partitions to add
+     * @return Builder
+     */
+    public T addPartitions(int partitionCount) {
+      for (int i = 0; i < partitionCount; i++) {
+        addPartition(new Partition(PartitionId.from(_resourceId, Integer.toString(i))));
+      }
+      return self();
+    }
+
+    /**
+     * Set whether any live participant should be used in rebalancing
+     * @param anyLiveParticipant true if any live participant can be used, false otherwise
+     * @return Builder
+     */
+    public T anyLiveParticipant(boolean anyLiveParticipant) {
+      _anyLiveParticipant = anyLiveParticipant;
+      return self();
+    }
+
+    /**
+     * Set the number of replicas
+     * @param replicaCount number of replicas
+     * @return Builder
+     */
+    public T replicaCount(int replicaCount) {
+      _replicaCount = replicaCount;
+      return self();
+    }
+
+    /**
+     * Set the maximum number of partitions to assign to any participant
+     * @param maxPartitionsPerParticipant the maximum
+     * @return Builder
+     */
+    public T maxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
+      _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
+      return self();
+    }
+
+    /**
+     * Update a PartitionedRebalancerConfig with fields from this builder level
+     * @param config PartitionedRebalancerConfig
+     */
+    protected final void update(PartitionedRebalancerConfig config) {
+      super.update(config);
+      // enforce at least one partition
+      if (_partitionMap.isEmpty()) {
+        addPartitions(1);
+      }
+      config.setRebalanceMode(_rebalanceMode);
+      config.setPartitionMap(_partitionMap);
+      config.setAnyLiveParticipant(_anyLiveParticipant);
+      config.setMaxPartitionsPerParticipant(_maxPartitionsPerParticipant);
+      config.setReplicaCount(_replicaCount);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java
new file mode 100644
index 0000000..3f8c9d1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java
@@ -0,0 +1,95 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.serializer.StringSerializer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Defines the state available to a rebalancer. The most common use case is to use a
+ * {@link PartitionedRebalancerConfig} or a subclass and set up a resource with it. A rebalancer
+ * configuration, at a minimum, is aware of subunits of a resource, the state model to follow, and
+ * how the configuration should be serialized.
+ */
+public interface RebalancerConfig {
+  /**
+   * Get a map of resource partition identifiers to partitions. A partition is a subunit of a
+   * resource, e.g. a subtask of a task
+   * @return map of (subunit id, subunit) pairs
+   */
+  public Map<? extends PartitionId, ? extends Partition> getSubUnitMap();
+
+  /**
+   * Get the subunits of the resource (e.g. partitions)
+   * @return set of subunit ids
+   */
+  public Set<? extends PartitionId> getSubUnitIdSet();
+
+  /**
+   * Get a specific subunit
+   * @param subUnitId the id of the subunit
+   * @return SubUnit
+   */
+  public Partition getSubUnit(PartitionId subUnitId);
+
+  /**
+   * Get the resource to rebalance
+   * @return resource id
+   */
+  public ResourceId getResourceId();
+
+  /**
+   * Get the state model definition that the resource follows
+   * @return state model definition id
+   */
+  public StateModelDefId getStateModelDefId();
+
+  /**
+   * Get the state model factory of this resource
+   * @return state model factory id
+   */
+  public StateModelFactoryId getStateModelFactoryId();
+
+  /**
+   * Get the tag, if any, that participants must have in order to serve this resource
+   * @return participant group tag, or null
+   */
+  public String getParticipantGroupTag();
+
+  /**
+   * Get the serializer for this config
+   * @return StringSerializer class object
+   */
+  public Class<? extends StringSerializer> getSerializerClass();
+
+  /**
+   * Get a reference to the class used to rebalance this resource
+   * @return RebalancerRef
+   */
+  public RebalancerRef getRebalancerRef();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
new file mode 100644
index 0000000..8581732
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
@@ -0,0 +1,185 @@
+package org.apache.helix.controller.rebalancer.config;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.NamespacedConfig;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.controller.serializer.StringSerializer;
+import org.apache.helix.model.ResourceConfiguration;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Configuration for a resource rebalancer. This contains a RebalancerConfig, which contains
+ * information specific to each rebalancer.
+ */
+public final class RebalancerConfigHolder {
+  private enum Fields {
+    SERIALIZER_CLASS,
+    REBALANCER_CONFIG,
+    REBALANCER_CONFIG_CLASS
+  }
+
+  private static final Logger LOG = Logger.getLogger(RebalancerConfigHolder.class);
+  private StringSerializer _serializer;
+  private HelixRebalancer _rebalancer;
+  private final RebalancerConfig _config;
+  private final NamespacedConfig _backingConfig;
+
+  /**
+   * Instantiate a RebalancerConfig
+   * @param config rebalancer config
+   * @param rebalancerRef reference to the rebalancer class that will be used
+   */
+  public RebalancerConfigHolder(RebalancerConfig config) {
+    _backingConfig =
+        new NamespacedConfig(Scope.resource(config.getResourceId()),
+            RebalancerConfigHolder.class.getSimpleName());
+    _backingConfig.setSimpleField(Fields.SERIALIZER_CLASS.toString(), config.getSerializerClass()
+        .getName());
+    _backingConfig.setSimpleField(Fields.REBALANCER_CONFIG_CLASS.toString(), config.getClass()
+        .getName());
+    _config = config;
+    try {
+      _serializer = config.getSerializerClass().newInstance();
+      _backingConfig.setSimpleField(Fields.REBALANCER_CONFIG.toString(),
+          _serializer.serialize(config));
+    } catch (InstantiationException e) {
+      LOG.error("Error initializing the configuration", e);
+    } catch (IllegalAccessException e) {
+      LOG.error("Error initializing the configuration", e);
+    }
+  }
+
+  /**
+   * Instantiate from a physical ResourceConfiguration
+   * @param resourceConfiguration populated ResourceConfiguration
+   */
+  public RebalancerConfigHolder(ResourceConfiguration resourceConfiguration) {
+    _backingConfig =
+        new NamespacedConfig(resourceConfiguration, RebalancerConfigHolder.class.getSimpleName());
+    _serializer = getSerializer();
+    _config = getConfig();
+  }
+
+  /**
+   * Get the class that can serialize and deserialize the rebalancer config
+   * @return StringSerializer
+   */
+  private StringSerializer getSerializer() {
+    String serializerClassName = _backingConfig.getSimpleField(Fields.SERIALIZER_CLASS.toString());
+    if (serializerClassName != null) {
+      try {
+        return (StringSerializer) HelixUtil.loadClass(getClass(), serializerClassName)
+            .newInstance();
+      } catch (InstantiationException e) {
+        LOG.error("Error getting the serializer", e);
+      } catch (IllegalAccessException e) {
+        LOG.error("Error getting the serializer", e);
+      } catch (ClassNotFoundException e) {
+        LOG.error("Error getting the serializer", e);
+      }
+    }
+    return null;
+  }
+
+  private RebalancerConfig getConfig() {
+    String className = _backingConfig.getSimpleField(Fields.REBALANCER_CONFIG_CLASS.toString());
+    if (className != null) {
+      try {
+        Class<? extends RebalancerConfig> configClass =
+            HelixUtil.loadClass(getClass(), className).asSubclass(RebalancerConfig.class);
+        String serialized = _backingConfig.getSimpleField(Fields.REBALANCER_CONFIG.toString());
+        return _serializer.deserialize(configClass, serialized);
+      } catch (ClassNotFoundException e) {
+        LOG.error(className + " is not a valid class");
+      } catch (ClassCastException e) {
+        LOG.error("Could not convert the persisted data into a " + className, e);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get a rebalancer class instance
+   * @return Rebalancer
+   */
+  public HelixRebalancer getRebalancer() {
+    // cache the rebalancer to avoid loading and instantiating it excessively
+    if (_rebalancer == null) {
+      if (_config == null || _config.getRebalancerRef() == null) {
+        return null;
+      }
+      _rebalancer = _config.getRebalancerRef().getRebalancer();
+    }
+    return _rebalancer;
+  }
+
+  /**
+   * Get the instantiated RebalancerConfig
+   * @param configClass specific class of the RebalancerConfig
+   * @return RebalancerConfig subclass instance, or null if conversion is not possible
+   */
+  public <T extends RebalancerConfig> T getRebalancerConfig(Class<T> configClass) {
+    if (_config != null) {
+      try {
+        return configClass.cast(_config);
+      } catch (ClassCastException e) {
+        LOG.warn(configClass + " is incompatible with config class: " + _config.getClass());
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get the rebalancer config serialized as a string
+   * @return string representing the config
+   */
+  public String getSerializedConfig() {
+    return _backingConfig.getSimpleField(Fields.REBALANCER_CONFIG.toString());
+  }
+
+  /**
+   * Convert this to a namespaced config
+   * @return NamespacedConfig
+   */
+  public NamespacedConfig toNamespacedConfig() {
+    return _backingConfig;
+  }
+
+  /**
+   * Get a RebalancerConfig from a physical resource config
+   * @param resourceConfiguration physical resource config
+   * @return RebalancerConfig
+   */
+  public static RebalancerConfigHolder from(ResourceConfiguration resourceConfiguration) {
+    return new RebalancerConfigHolder(resourceConfiguration);
+  }
+
+  /**
+   * Get a RebalancerConfigHolder from a RebalancerConfig
+   * @param config instantiated RebalancerConfig
+   * @return RebalancerConfigHolder
+   */
+  public static RebalancerConfigHolder from(RebalancerConfig config) {
+    return new RebalancerConfigHolder(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java
new file mode 100644
index 0000000..3118b2a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java
@@ -0,0 +1,40 @@
+package org.apache.helix.controller.rebalancer.config;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Methods specifying a rebalancer config that allows replicas. For instance, a rebalancer config
+ * with partitions may accept state model definitions that support multiple replicas per partition,
+ * and it's possible that the policy is that each live participant in the system should have a
+ * replica.
+ */
+public interface ReplicatedRebalancerConfig extends RebalancerConfig {
+  /**
+   * Check if this resource should be assigned to any live participant
+   * @return true if any live participant expected, false otherwise
+   */
+  public boolean anyLiveParticipant();
+
+  /**
+   * Get the number of replicas that each resource subunit should have
+   * @return replica count
+   */
+  public int getReplicaCount();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
new file mode 100644
index 0000000..bfc3309
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
@@ -0,0 +1,178 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerConfig for SEMI_AUTO rebalancer mode. It indicates the preferred locations of each
+ * partition replica. By default, it corresponds to {@link SemiAutoRebalancer}
+ */
+public final class SemiAutoRebalancerConfig extends PartitionedRebalancerConfig {
+  @JsonProperty("preferenceLists")
+  private Map<PartitionId, List<ParticipantId>> _preferenceLists;
+
+  /**
+   * Instantiate a SemiAutoRebalancerConfig
+   */
+  public SemiAutoRebalancerConfig() {
+    setRebalanceMode(RebalanceMode.SEMI_AUTO);
+    setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
+    _preferenceLists = Maps.newHashMap();
+  }
+
+  /**
+   * Get the preference lists of all partitions of the resource
+   * @return map of partition id to list of participant ids
+   */
+  public Map<PartitionId, List<ParticipantId>> getPreferenceLists() {
+    return _preferenceLists;
+  }
+
+  /**
+   * Set the preference lists of all partitions of the resource
+   * @param preferenceLists
+   */
+  public void setPreferenceLists(Map<PartitionId, List<ParticipantId>> preferenceLists) {
+    _preferenceLists = preferenceLists;
+  }
+
+  /**
+   * Get the preference list of a partition
+   * @param partitionId the partition to look up
+   * @return list of participant ids
+   */
+  @JsonIgnore
+  public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
+    return _preferenceLists.get(partitionId);
+  }
+
+  /**
+   * Generate preference lists based on a default cluster setup
+   * @param stateModelDef the state model definition to follow
+   * @param participantSet the set of participant ids to configure for
+   */
+  @Override
+  @JsonIgnore
+  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+      Set<ParticipantId> participantSet) {
+    // compute default upper bounds
+    Map<State, String> upperBounds = Maps.newHashMap();
+    for (State state : stateModelDef.getTypedStatesPriorityList()) {
+      upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
+    }
+
+    // determine the current mapping
+    Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
+    for (PartitionId partitionId : getPartitionSet()) {
+      List<ParticipantId> preferenceList = getPreferenceList(partitionId);
+      if (preferenceList != null && !preferenceList.isEmpty()) {
+        Set<ParticipantId> disabledParticipants = Collections.emptySet();
+        Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
+        Map<ParticipantId, State> initialMap =
+            ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, participantSet,
+                stateModelDef, preferenceList, emptyCurrentState, disabledParticipants);
+        currentMapping.put(partitionId, initialMap);
+      }
+    }
+
+    // determine the preference
+    LinkedHashMap<State, Integer> stateCounts =
+        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
+            getReplicaCount());
+    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+    List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
+    List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
+    AutoRebalanceStrategy strategy =
+        new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
+            getMaxPartitionsPerParticipant(), placementScheme);
+    Map<String, List<String>> rawPreferenceLists =
+        strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
+            .getListFields();
+    Map<PartitionId, List<ParticipantId>> preferenceLists =
+        Maps.newHashMap(IdealState.preferenceListsFromStringLists(rawPreferenceLists));
+    setPreferenceLists(preferenceLists);
+  }
+
+  /**
+   * Build a SemiAutoRebalancerConfig. By default, it corresponds to {@link SemiAutoRebalancer}
+   */
+  public static final class Builder extends PartitionedRebalancerConfig.AbstractBuilder<Builder> {
+    private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
+
+    /**
+     * Instantiate for a resource
+     * @param resourceId resource id
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+      super.rebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
+      super.rebalanceMode(RebalanceMode.SEMI_AUTO);
+      _preferenceLists = Maps.newHashMap();
+    }
+
+    /**
+     * Add a preference list for a partition
+     * @param partitionId partition to set
+     * @param preferenceList ordered list of participants who can serve the partition
+     * @return Builder
+     */
+    public Builder preferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
+      _preferenceLists.put(partitionId, preferenceList);
+      return self();
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    @Override
+    public SemiAutoRebalancerConfig build() {
+      SemiAutoRebalancerConfig config = new SemiAutoRebalancerConfig();
+      super.update(config);
+      config.setPreferenceLists(_preferenceLists);
+      return config;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
deleted file mode 100644
index ec765d7..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
+++ /dev/null
@@ -1,240 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.Set;
-
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.codehaus.jackson.annotate.JsonIgnore;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Abstract RebalancerContext that functions for generic subunits. Use a subclass that more
- * concretely defines the subunits.
- */
-public abstract class BasicRebalancerContext implements RebalancerContext {
-  private ResourceId _resourceId;
-  private StateModelDefId _stateModelDefId;
-  private StateModelFactoryId _stateModelFactoryId;
-  private String _participantGroupTag;
-  private Class<? extends ContextSerializer> _serializer;
-  private RebalancerRef _rebalancerRef;
-
-  /**
-   * Instantiate a basic rebalancer context
-   */
-  public BasicRebalancerContext() {
-    _serializer = DefaultContextSerializer.class;
-  }
-
-  @Override
-  public ResourceId getResourceId() {
-    return _resourceId;
-  }
-
-  /**
-   * Set the resource to rebalance
-   * @param resourceId resource id
-   */
-  public void setResourceId(ResourceId resourceId) {
-    _resourceId = resourceId;
-  }
-
-  @Override
-  public StateModelDefId getStateModelDefId() {
-    return _stateModelDefId;
-  }
-
-  /**
-   * Set the state model definition that the resource follows
-   * @param stateModelDefId state model definition id
-   */
-  public void setStateModelDefId(StateModelDefId stateModelDefId) {
-    _stateModelDefId = stateModelDefId;
-  }
-
-  @Override
-  public StateModelFactoryId getStateModelFactoryId() {
-    return _stateModelFactoryId;
-  }
-
-  /**
-   * Set the state model factory that the resource uses
-   * @param stateModelFactoryId state model factory id
-   */
-  public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
-    _stateModelFactoryId = stateModelFactoryId;
-  }
-
-  @Override
-  public String getParticipantGroupTag() {
-    return _participantGroupTag;
-  }
-
-  /**
-   * Set a tag that participants must have in order to serve this resource
-   * @param participantGroupTag string group tag
-   */
-  public void setParticipantGroupTag(String participantGroupTag) {
-    _participantGroupTag = participantGroupTag;
-  }
-
-  /**
-   * Get the serializer. If none is provided, {@link DefaultContextSerializer} is used
-   */
-  @Override
-  public Class<? extends ContextSerializer> getSerializerClass() {
-    return _serializer;
-  }
-
-  /**
-   * Set the class that can serialize this context
-   * @param serializer serializer class that implements ContextSerializer
-   */
-  public void setSerializerClass(Class<? extends ContextSerializer> serializer) {
-    _serializer = serializer;
-  }
-
-  @Override
-  @JsonIgnore
-  public Set<? extends PartitionId> getSubUnitIdSet() {
-    return getSubUnitMap().keySet();
-  }
-
-  @Override
-  @JsonIgnore
-  public Partition getSubUnit(PartitionId subUnitId) {
-    return getSubUnitMap().get(subUnitId);
-  }
-
-  @Override
-  public RebalancerRef getRebalancerRef() {
-    return _rebalancerRef;
-  }
-
-  /**
-   * Set the reference to the class used to rebalance this resource
-   * @param rebalancerRef RebalancerRef instance
-   */
-  public void setRebalancerRef(RebalancerRef rebalancerRef) {
-    _rebalancerRef = rebalancerRef;
-  }
-
-  /**
-   * Abstract builder for the base rebalancer context
-   */
-  public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
-    private final ResourceId _resourceId;
-    private StateModelDefId _stateModelDefId;
-    private StateModelFactoryId _stateModelFactoryId;
-    private String _participantGroupTag;
-    private Class<? extends ContextSerializer> _serializerClass;
-    private RebalancerRef _rebalancerRef;
-
-    /**
-     * Instantiate with a resource id
-     * @param resourceId resource id
-     */
-    public AbstractBuilder(ResourceId resourceId) {
-      _resourceId = resourceId;
-      _serializerClass = DefaultContextSerializer.class;
-    }
-
-    /**
-     * Set the state model definition that the resource should follow
-     * @param stateModelDefId state model definition id
-     * @return Builder
-     */
-    public T stateModelDefId(StateModelDefId stateModelDefId) {
-      _stateModelDefId = stateModelDefId;
-      return self();
-    }
-
-    /**
-     * Set the state model factory that the resource should use
-     * @param stateModelFactoryId state model factory id
-     * @return Builder
-     */
-    public T stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
-      _stateModelFactoryId = stateModelFactoryId;
-      return self();
-    }
-
-    /**
-     * Set the tag that all participants require in order to serve this resource
-     * @param participantGroupTag the tag
-     * @return Builder
-     */
-    public T participantGroupTag(String participantGroupTag) {
-      _participantGroupTag = participantGroupTag;
-      return self();
-    }
-
-    /**
-     * Set the serializer class for this rebalancer context
-     * @param serializerClass class that implements ContextSerializer
-     * @return Builder
-     */
-    public T serializerClass(Class<? extends ContextSerializer> serializerClass) {
-      _serializerClass = serializerClass;
-      return self();
-    }
-
-    /**
-     * Specify a custom class to use for rebalancing
-     * @param rebalancerRef RebalancerRef instance
-     * @return Builder
-     */
-    public T rebalancerRef(RebalancerRef rebalancerRef) {
-      _rebalancerRef = rebalancerRef;
-      return self();
-    }
-
-    /**
-     * Update an existing context with base fields
-     * @param context derived context
-     */
-    protected final void update(BasicRebalancerContext context) {
-      context.setResourceId(_resourceId);
-      context.setStateModelDefId(_stateModelDefId);
-      context.setStateModelFactoryId(_stateModelFactoryId);
-      context.setParticipantGroupTag(_participantGroupTag);
-      context.setSerializerClass(_serializerClass);
-      context.setRebalancerRef(_rebalancerRef);
-    }
-
-    /**
-     * Get a typed reference to "this" class. Final derived classes should simply return the this
-     * reference.
-     * @return this for the most specific type
-     */
-    protected abstract T self();
-
-    /**
-     * Get the rebalancer context from the built fields
-     * @return RebalancerContext
-     */
-    public abstract RebalancerContext build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java
deleted file mode 100644
index ef12a09..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public interface ContextSerializer {
-  /**
-   * Convert a RebalancerContext object instance to a String
-   * @param data instance of the rebalancer context type
-   * @return String representing the object
-   */
-  public <T> String serialize(final T data);
-
-  /**
-   * Convert raw bytes to a generic object instance
-   * @param clazz The class represented by the deserialized string
-   * @param string String representing the object
-   * @return instance of the generic type or null if the conversion failed
-   */
-  public <T> T deserialize(final Class<T> clazz, final String string);
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
deleted file mode 100644
index 0d2c1f2..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
+++ /dev/null
@@ -1,164 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.CustomRebalancer;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for a resource that should be rebalanced in CUSTOMIZED mode. By default, it
- * corresponds to {@link CustomRebalancer}
- */
-public class CustomRebalancerContext extends PartitionedRebalancerContext {
-  private Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
-
-  /**
-   * Instantiate a CustomRebalancerContext
-   */
-  public CustomRebalancerContext() {
-    setRebalanceMode(RebalanceMode.CUSTOMIZED);
-    setRebalancerRef(RebalancerRef.from(CustomRebalancer.class));
-    _preferenceMaps = Maps.newHashMap();
-  }
-
-  /**
-   * Get the preference maps of the partitions and replicas of the resource
-   * @return map of partition to participant and state
-   */
-  public Map<PartitionId, Map<ParticipantId, State>> getPreferenceMaps() {
-    return _preferenceMaps;
-  }
-
-  /**
-   * Set the preference maps of the partitions and replicas of the resource
-   * @param preferenceMaps map of partition to participant and state
-   */
-  public void setPreferenceMaps(Map<PartitionId, Map<ParticipantId, State>> preferenceMaps) {
-    _preferenceMaps = preferenceMaps;
-  }
-
-  /**
-   * Get the preference map of a partition
-   * @param partitionId the partition to look up
-   * @return map of participant to state
-   */
-  @JsonIgnore
-  public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
-    return _preferenceMaps.get(partitionId);
-  }
-
-  /**
-   * Generate preference maps based on a default cluster setup
-   * @param stateModelDef the state model definition to follow
-   * @param participantSet the set of participant ids to configure for
-   */
-  @Override
-  @JsonIgnore
-  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
-      Set<ParticipantId> participantSet) {
-    // compute default upper bounds
-    Map<State, String> upperBounds = Maps.newHashMap();
-    for (State state : stateModelDef.getTypedStatesPriorityList()) {
-      upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
-    }
-
-    // determine the current mapping
-    Map<PartitionId, Map<ParticipantId, State>> currentMapping = getPreferenceMaps();
-
-    // determine the preference maps
-    LinkedHashMap<State, Integer> stateCounts =
-        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
-            getReplicaCount());
-    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
-    List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
-    List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
-    AutoRebalanceStrategy strategy =
-        new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
-            getMaxPartitionsPerParticipant(), placementScheme);
-    Map<String, Map<String, String>> rawPreferenceMaps =
-        strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
-            .getMapFields();
-    Map<PartitionId, Map<ParticipantId, State>> preferenceMaps =
-        Maps.newHashMap(ResourceAssignment.replicaMapsFromStringMaps(rawPreferenceMaps));
-    setPreferenceMaps(preferenceMaps);
-  }
-
-  /**
-   * Build a CustomRebalancerContext. By default, it corresponds to {@link CustomRebalancer}
-   */
-  public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
-    private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
-
-    /**
-     * Instantiate for a resource
-     * @param resourceId resource id
-     */
-    public Builder(ResourceId resourceId) {
-      super(resourceId);
-      super.rebalancerRef(RebalancerRef.from(CustomRebalancer.class));
-      super.rebalanceMode(RebalanceMode.CUSTOMIZED);
-      _preferenceMaps = Maps.newHashMap();
-    }
-
-    /**
-     * Add a preference map for a partition
-     * @param partitionId partition to set
-     * @param preferenceList map of participant id to state indicating where replicas are served
-     * @return Builder
-     */
-    public Builder preferenceMap(PartitionId partitionId, Map<ParticipantId, State> preferenceMap) {
-      _preferenceMaps.put(partitionId, preferenceMap);
-      return self();
-    }
-
-    @Override
-    protected Builder self() {
-      return this;
-    }
-
-    @Override
-    public CustomRebalancerContext build() {
-      CustomRebalancerContext context = new CustomRebalancerContext();
-      super.update(context);
-      context.setPreferenceMaps(_preferenceMaps);
-      return context;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java
deleted file mode 100644
index ecc93fb..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.io.ByteArrayInputStream;
-import java.io.StringWriter;
-
-import org.apache.helix.HelixException;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Default serializer implementation for RebalancerContexts. Uses the Jackson JSON library to
- * convert to and from strings
- */
-public class DefaultContextSerializer implements ContextSerializer {
-
-  private static Logger logger = Logger.getLogger(DefaultContextSerializer.class);
-
-  @Override
-  public <T> String serialize(final T data) {
-    if (data == null) {
-      return null;
-    }
-
-    ObjectMapper mapper = new ObjectMapper();
-    SerializationConfig serializationConfig = mapper.getSerializationConfig();
-    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-    serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
-    serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
-    StringWriter sw = new StringWriter();
-    try {
-      mapper.writeValue(sw, data);
-    } catch (Exception e) {
-      logger.error("Exception during payload data serialization.", e);
-      throw new HelixException(e);
-    }
-    return sw.toString();
-  }
-
-  @Override
-  public <T> T deserialize(final Class<T> clazz, final String string) {
-    if (string == null || string.length() == 0) {
-      return null;
-    }
-
-    ObjectMapper mapper = new ObjectMapper();
-    ByteArrayInputStream bais = new ByteArrayInputStream(string.getBytes());
-
-    DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
-    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
-    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
-    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_CREATORS, true);
-    deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
-    deserializationConfig.set(DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
-    try {
-      T payload = mapper.readValue(bais, clazz);
-      return payload;
-    } catch (Exception e) {
-      logger.error("Exception during deserialization of payload bytes: " + string, e);
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
deleted file mode 100644
index 2400707..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.model.IdealState.RebalanceMode;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for FULL_AUTO rebalancing mode. By default, it corresponds to
- * {@link FullAutoRebalancer}
- */
-public class FullAutoRebalancerContext extends PartitionedRebalancerContext {
-  public FullAutoRebalancerContext() {
-    setRebalanceMode(RebalanceMode.FULL_AUTO);
-    setRebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
-  }
-
-  /**
-   * Builder for a full auto rebalancer context. By default, it corresponds to
-   * {@link FullAutoRebalancer}
-   */
-  public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
-    /**
-     * Instantiate with a resource
-     * @param resourceId resource id
-     */
-    public Builder(ResourceId resourceId) {
-      super(resourceId);
-      super.rebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
-      super.rebalanceMode(RebalanceMode.FULL_AUTO);
-    }
-
-    @Override
-    protected Builder self() {
-      return this;
-    }
-
-    @Override
-    public FullAutoRebalancerContext build() {
-      FullAutoRebalancerContext context = new FullAutoRebalancerContext();
-      super.update(context);
-      return context;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
deleted file mode 100644
index 15fcf9c..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
+++ /dev/null
@@ -1,393 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixConstants.StateModelToken;
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for a resource whose subunits are partitions. In addition, these partitions can
- * be replicated.
- */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class PartitionedRebalancerContext extends BasicRebalancerContext implements
-    ReplicatedRebalancerContext {
-  private Map<PartitionId, Partition> _partitionMap;
-  private boolean _anyLiveParticipant;
-  private int _replicaCount;
-  private int _maxPartitionsPerParticipant;
-  private RebalanceMode _rebalanceMode;
-
-  /**
-   * Instantiate a DataRebalancerContext
-   */
-  public PartitionedRebalancerContext() {
-    _partitionMap = Collections.emptyMap();
-    _replicaCount = 1;
-    _anyLiveParticipant = false;
-    _maxPartitionsPerParticipant = Integer.MAX_VALUE;
-    _rebalanceMode = RebalanceMode.USER_DEFINED;
-  }
-
-  /**
-   * Get a map from partition id to partition
-   * @return partition map (mutable)
-   */
-  public Map<PartitionId, Partition> getPartitionMap() {
-    return _partitionMap;
-  }
-
-  /**
-   * Set a map of partition id to partition
-   * @param partitionMap partition map
-   */
-  public void setPartitionMap(Map<PartitionId, Partition> partitionMap) {
-    _partitionMap = Maps.newHashMap(partitionMap);
-  }
-
-  /**
-   * Get the set of partitions for this resource
-   * @return set of partition ids
-   */
-  @JsonIgnore
-  public Set<PartitionId> getPartitionSet() {
-    return _partitionMap.keySet();
-  }
-
-  /**
-   * Get a partition
-   * @param partitionId id of the partition to get
-   * @return Partition object, or null if not present
-   */
-  @JsonIgnore
-  public Partition getPartition(PartitionId partitionId) {
-    return _partitionMap.get(partitionId);
-  }
-
-  @Override
-  public boolean anyLiveParticipant() {
-    return _anyLiveParticipant;
-  }
-
-  /**
-   * Indicate if this resource should be assigned to any live participant
-   * @param anyLiveParticipant true if any live participant expected, false otherwise
-   */
-  public void setAnyLiveParticipant(boolean anyLiveParticipant) {
-    _anyLiveParticipant = anyLiveParticipant;
-  }
-
-  @Override
-  public int getReplicaCount() {
-    return _replicaCount;
-  }
-
-  /**
-   * Set the number of replicas that each partition should have
-   * @param replicaCount
-   */
-  public void setReplicaCount(int replicaCount) {
-    _replicaCount = replicaCount;
-  }
-
-  /**
-   * Get the maximum number of partitions that a participant can serve
-   * @return maximum number of partitions per participant
-   */
-  public int getMaxPartitionsPerParticipant() {
-    return _maxPartitionsPerParticipant;
-  }
-
-  /**
-   * Set the maximum number of partitions that a participant can serve
-   * @param maxPartitionsPerParticipant maximum number of partitions per participant
-   */
-  public void setMaxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
-    _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
-  }
-
-  /**
-   * Set the rebalancer mode of the partitioned resource
-   * @param rebalanceMode {@link RebalanceMode} enum value
-   */
-  public void setRebalanceMode(RebalanceMode rebalanceMode) {
-    _rebalanceMode = rebalanceMode;
-  }
-
-  /**
-   * Get the rebalancer mode of the resource
-   * @return RebalanceMode
-   */
-  public RebalanceMode getRebalanceMode() {
-    return _rebalanceMode;
-  }
-
-  @Override
-  @JsonIgnore
-  public Map<PartitionId, Partition> getSubUnitMap() {
-    return getPartitionMap();
-  }
-
-  /**
-   * Generate a default configuration given the state model and a participant.
-   * @param stateModelDef the state model definition to follow
-   * @param participantSet the set of participant ids to configure for
-   */
-  @JsonIgnore
-  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
-      Set<ParticipantId> participantSet) {
-    // the base context does not understand enough to know do to anything
-  }
-
-  /**
-   * Convert a physically-stored IdealState into a rebalancer context for a partitioned resource
-   * @param idealState populated IdealState
-   * @return PartitionedRebalancerContext
-   */
-  public static PartitionedRebalancerContext from(IdealState idealState) {
-    PartitionedRebalancerContext context;
-    switch (idealState.getRebalanceMode()) {
-    case FULL_AUTO:
-      FullAutoRebalancerContext.Builder fullAutoBuilder =
-          new FullAutoRebalancerContext.Builder(idealState.getResourceId());
-      populateContext(fullAutoBuilder, idealState);
-      context = fullAutoBuilder.build();
-      break;
-    case SEMI_AUTO:
-      SemiAutoRebalancerContext.Builder semiAutoBuilder =
-          new SemiAutoRebalancerContext.Builder(idealState.getResourceId());
-      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
-        semiAutoBuilder.preferenceList(partitionId, idealState.getPreferenceList(partitionId));
-      }
-      populateContext(semiAutoBuilder, idealState);
-      context = semiAutoBuilder.build();
-      break;
-    case CUSTOMIZED:
-      CustomRebalancerContext.Builder customBuilder =
-          new CustomRebalancerContext.Builder(idealState.getResourceId());
-      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
-        customBuilder.preferenceMap(partitionId, idealState.getParticipantStateMap(partitionId));
-      }
-      populateContext(customBuilder, idealState);
-      context = customBuilder.build();
-      break;
-    default:
-      Builder baseBuilder = new Builder(idealState.getResourceId());
-      populateContext(baseBuilder, idealState);
-      context = baseBuilder.build();
-      break;
-    }
-    return context;
-  }
-
-  /**
-   * Update a builder subclass with all the fields of the ideal state
-   * @param builder builder that extends AbstractBuilder
-   * @param idealState populated IdealState
-   */
-  private static <T extends AbstractBuilder<T>> void populateContext(T builder,
-      IdealState idealState) {
-    String replicas = idealState.getReplicas();
-    int replicaCount = 0;
-    boolean anyLiveParticipant = false;
-    if (replicas.equals(StateModelToken.ANY_LIVEINSTANCE.toString())) {
-      anyLiveParticipant = true;
-    } else {
-      replicaCount = Integer.parseInt(replicas);
-    }
-    if (idealState.getNumPartitions() > 0 && idealState.getPartitionIdSet().size() == 0) {
-      // backwards compatibility: partition sets were based on pref lists/maps previously
-      builder.addPartitions(idealState.getNumPartitions());
-    } else {
-      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
-        builder.addPartition(new Partition(partitionId));
-      }
-    }
-    builder.anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
-        .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
-        .participantGroupTag(idealState.getInstanceGroupTag())
-        .stateModelDefId(idealState.getStateModelDefId())
-        .stateModelFactoryId(idealState.getStateModelFactoryId());
-    RebalancerRef rebalancerRef = idealState.getRebalancerRef();
-    if (rebalancerRef != null) {
-      builder.rebalancerRef(rebalancerRef);
-    }
-  }
-
-  /**
-   * Builder for a basic data rebalancer context
-   */
-  public static final class Builder extends AbstractBuilder<Builder> {
-    /**
-     * Instantiate with a resource
-     * @param resourceId resource id
-     */
-    public Builder(ResourceId resourceId) {
-      super(resourceId);
-    }
-
-    @Override
-    protected Builder self() {
-      return this;
-    }
-
-    @Override
-    public PartitionedRebalancerContext build() {
-      PartitionedRebalancerContext context = new PartitionedRebalancerContext();
-      super.update(context);
-      return context;
-    }
-  }
-
-  /**
-   * Abstract builder for a generic partitioned resource rebalancer context
-   */
-  public static abstract class AbstractBuilder<T extends BasicRebalancerContext.AbstractBuilder<T>>
-      extends BasicRebalancerContext.AbstractBuilder<T> {
-    private final ResourceId _resourceId;
-    private final Map<PartitionId, Partition> _partitionMap;
-    private RebalanceMode _rebalanceMode;
-    private boolean _anyLiveParticipant;
-    private int _replicaCount;
-    private int _maxPartitionsPerParticipant;
-
-    /**
-     * Instantiate with a resource
-     * @param resourceId resource id
-     */
-    public AbstractBuilder(ResourceId resourceId) {
-      super(resourceId);
-      _resourceId = resourceId;
-      _partitionMap = Maps.newHashMap();
-      _rebalanceMode = RebalanceMode.USER_DEFINED;
-      _anyLiveParticipant = false;
-      _replicaCount = 1;
-      _maxPartitionsPerParticipant = Integer.MAX_VALUE;
-    }
-
-    /**
-     * Set the rebalance mode for a partitioned rebalancer context
-     * @param rebalanceMode {@link RebalanceMode} enum value
-     * @return Builder
-     */
-    public T rebalanceMode(RebalanceMode rebalanceMode) {
-      _rebalanceMode = rebalanceMode;
-      return self();
-    }
-
-    /**
-     * Add a partition that the resource serves
-     * @param partition fully-qualified partition
-     * @return Builder
-     */
-    public T addPartition(Partition partition) {
-      _partitionMap.put(partition.getId(), partition);
-      return self();
-    }
-
-    /**
-     * Add a collection of partitions
-     * @param partitions any collection of Partition objects
-     * @return Builder
-     */
-    public T addPartitions(Collection<Partition> partitions) {
-      for (Partition partition : partitions) {
-        addPartition(partition);
-      }
-      return self();
-    }
-
-    /**
-     * Add a specified number of partitions with a default naming scheme, namely
-     * resourceId_partitionNumber where partitionNumber starts at 0
-     * @param partitionCount number of partitions to add
-     * @return Builder
-     */
-    public T addPartitions(int partitionCount) {
-      for (int i = 0; i < partitionCount; i++) {
-        addPartition(new Partition(PartitionId.from(_resourceId, Integer.toString(i))));
-      }
-      return self();
-    }
-
-    /**
-     * Set whether any live participant should be used in rebalancing
-     * @param anyLiveParticipant true if any live participant can be used, false otherwise
-     * @return Builder
-     */
-    public T anyLiveParticipant(boolean anyLiveParticipant) {
-      _anyLiveParticipant = anyLiveParticipant;
-      return self();
-    }
-
-    /**
-     * Set the number of replicas
-     * @param replicaCount number of replicas
-     * @return Builder
-     */
-    public T replicaCount(int replicaCount) {
-      _replicaCount = replicaCount;
-      return self();
-    }
-
-    /**
-     * Set the maximum number of partitions to assign to any participant
-     * @param maxPartitionsPerParticipant the maximum
-     * @return Builder
-     */
-    public T maxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
-      _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
-      return self();
-    }
-
-    /**
-     * Update a DataRebalancerContext with fields from this builder level
-     * @param context DataRebalancerContext
-     */
-    protected final void update(PartitionedRebalancerContext context) {
-      super.update(context);
-      // enforce at least one partition
-      if (_partitionMap.isEmpty()) {
-        addPartitions(1);
-      }
-      context.setRebalanceMode(_rebalanceMode);
-      context.setPartitionMap(_partitionMap);
-      context.setAnyLiveParticipant(_anyLiveParticipant);
-      context.setMaxPartitionsPerParticipant(_maxPartitionsPerParticipant);
-      context.setReplicaCount(_replicaCount);
-    }
-  }
-}