You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/12/04 02:21:07 UTC

[4/4] git commit: [HELIX-327] Simplify rebalancer, rename rebalancer configs, support settable contexts, rb=15981

[HELIX-327] Simplify rebalancer, rename rebalancer configs, support settable contexts, rb=15981


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/015e7dda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/015e7dda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/015e7dda

Branch: refs/heads/master
Commit: 015e7dda2c1f96beb6d387ba71c118d246840f1b
Parents: 7521c0c
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Dec 3 16:52:23 2013 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Dec 3 17:20:31 2013 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/helix/PropertyKey.java |  18 +
 .../org/apache/helix/PropertyPathConfig.java    |   5 +
 .../java/org/apache/helix/PropertyType.java     |   3 +-
 .../main/java/org/apache/helix/api/Cluster.java |  20 +-
 .../java/org/apache/helix/api/Resource.java     |   9 +-
 .../api/accessor/AtomicResourceAccessor.java    |   6 +-
 .../helix/api/accessor/ClusterAccessor.java     |  52 ++-
 .../helix/api/accessor/ParticipantAccessor.java |  23 +-
 .../helix/api/accessor/ResourceAccessor.java    | 138 ++++---
 .../apache/helix/api/config/ResourceConfig.java |  32 +-
 .../java/org/apache/helix/api/id/ContextId.java |  57 +++
 .../controller/GenericHelixController.java      |   8 +-
 .../context/BasicControllerContext.java         |  60 +++
 .../controller/context/ControllerContext.java   |  40 ++
 .../context/ControllerContextHolder.java        | 156 +++++++
 .../context/ControllerContextProvider.java      | 120 ++++++
 .../controller/rebalancer/CustomRebalancer.java |  14 +-
 .../rebalancer/FallbackRebalancer.java          |  20 +-
 .../rebalancer/FullAutoRebalancer.java          |  16 +-
 .../controller/rebalancer/HelixRebalancer.java  |  20 +-
 .../rebalancer/SemiAutoRebalancer.java          |  18 +-
 .../config/BasicRebalancerConfig.java           | 256 ++++++++++++
 .../config/CustomRebalancerConfig.java          | 164 ++++++++
 .../config/FullAutoRebalancerConfig.java        |  64 +++
 .../config/PartitionedRebalancerConfig.java     | 405 +++++++++++++++++++
 .../rebalancer/config/RebalancerConfig.java     |  95 +++++
 .../config/RebalancerConfigHolder.java          | 185 +++++++++
 .../config/ReplicatedRebalancerConfig.java      |  40 ++
 .../config/SemiAutoRebalancerConfig.java        | 178 ++++++++
 .../context/BasicRebalancerContext.java         | 240 -----------
 .../rebalancer/context/ContextSerializer.java   |  37 --
 .../context/CustomRebalancerContext.java        | 164 --------
 .../context/DefaultContextSerializer.java       |  83 ----
 .../context/FullAutoRebalancerContext.java      |  64 ---
 .../context/PartitionedRebalancerContext.java   | 393 ------------------
 .../rebalancer/context/RebalancerConfig.java    | 182 ---------
 .../rebalancer/context/RebalancerContext.java   |  94 -----
 .../context/ReplicatedRebalancerContext.java    |  40 --
 .../context/SemiAutoRebalancerContext.java      | 178 --------
 .../serializer/DefaultStringSerializer.java     |  82 ++++
 .../controller/serializer/StringSerializer.java |  37 ++
 .../helix/controller/stages/AttributeName.java  |   3 +-
 .../stages/BestPossibleStateCalcStage.java      |  26 +-
 .../stages/ExternalViewComputeStage.java        |  10 +-
 .../stages/MessageGenerationStage.java          |  16 +-
 .../stages/MessageSelectionStage.java           |  21 +-
 .../stages/PersistAssignmentStage.java          |  14 +-
 .../controller/stages/PersistContextStage.java  |  59 +++
 .../controller/stages/ReadClusterDataStage.java |  17 +
 .../stages/ResourceComputationStage.java        |  25 +-
 .../helix/model/ResourceConfiguration.java      |  14 +-
 .../org/apache/helix/tools/NewClusterSetup.java |  66 +--
 .../org/apache/helix/api/TestNewStages.java     |  22 +-
 .../org/apache/helix/api/TestUpdateConfig.java  |  27 +-
 .../context/TestSerializeRebalancerContext.java |  20 +-
 .../helix/controller/stages/BaseStageTest.java  |   6 +-
 .../stages/TestResourceComputationStage.java    |  13 +-
 .../TestCustomizedIdealStateRebalancer.java     |  35 +-
 .../helix/integration/TestHelixConnection.java  |  10 +-
 .../helix/examples/LogicalModelExample.java     |  10 +-
 .../LockManagerRebalancer.java                  |  18 +-
 61 files changed, 2450 insertions(+), 1798 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index ebd7a35..2af63a5 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -24,6 +24,7 @@ import static org.apache.helix.PropertyType.ALERT_HISTORY;
 import static org.apache.helix.PropertyType.ALERT_STATUS;
 import static org.apache.helix.PropertyType.CLUSTER;
 import static org.apache.helix.PropertyType.CONFIGS;
+import static org.apache.helix.PropertyType.CONTEXT;
 import static org.apache.helix.PropertyType.CONTROLLER;
 import static org.apache.helix.PropertyType.CURRENTSTATES;
 import static org.apache.helix.PropertyType.ERRORS;
@@ -46,6 +47,7 @@ import static org.apache.helix.PropertyType.STATUSUPDATES_CONTROLLER;
 
 import java.util.Arrays;
 
+import org.apache.helix.controller.context.ControllerContextHolder;
 import org.apache.helix.model.AlertHistory;
 import org.apache.helix.model.AlertStatus;
 import org.apache.helix.model.Alerts;
@@ -721,6 +723,22 @@ public class PropertyKey {
     public PropertyKey propertyStore() {
       return new PropertyKey(PROPERTYSTORE, null, _clusterName);
     }
+
+    /**
+     * Get a propertykey associated with the root of the Helix contexts
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey controllerContexts() {
+      return new PropertyKey(CONTEXT, ControllerContextHolder.class, _clusterName);
+    }
+
+    /**
+     * Get a propertykey associated with the root of the Helix contexts
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey controllerContext(String contextId) {
+      return new PropertyKey(CONTEXT, ControllerContextHolder.class, _clusterName, contextId);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java b/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
index 60a92f4..d66e7d9 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
@@ -22,6 +22,7 @@ package org.apache.helix;
 import static org.apache.helix.PropertyType.ALERTS;
 import static org.apache.helix.PropertyType.ALERT_STATUS;
 import static org.apache.helix.PropertyType.CONFIGS;
+import static org.apache.helix.PropertyType.CONTEXT;
 import static org.apache.helix.PropertyType.CURRENTSTATES;
 import static org.apache.helix.PropertyType.EXTERNALVIEW;
 import static org.apache.helix.PropertyType.HEALTHREPORT;
@@ -40,6 +41,7 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.helix.controller.context.ControllerContextHolder;
 import org.apache.helix.model.AlertStatus;
 import org.apache.helix.model.Alerts;
 import org.apache.helix.model.CurrentState;
@@ -81,6 +83,7 @@ public class PropertyPathConfig {
     typeToClassMapping.put(ALERT_STATUS, AlertStatus.class);
     typeToClassMapping.put(PAUSE, PauseSignal.class);
     typeToClassMapping.put(RESOURCEASSIGNMENTS, ResourceAssignment.class);
+    typeToClassMapping.put(CONTEXT, ControllerContextHolder.class);
 
     // @formatter:off
     addEntry(PropertyType.CLUSTER, 1, "/{clusterName}");
@@ -148,6 +151,8 @@ public class PropertyPathConfig {
     addEntry(PropertyType.ALERTS, 1, "/{clusterName}/CONTROLLER/ALERTS");
     addEntry(PropertyType.ALERT_STATUS, 1, "/{clusterName}/CONTROLLER/ALERT_STATUS");
     addEntry(PropertyType.ALERT_HISTORY, 1, "/{clusterName}/CONTROLLER/ALERT_HISTORY");
+    addEntry(PropertyType.CONTEXT, 1, "/{clusterName}/CONTROLLER/CONTEXT");
+    addEntry(PropertyType.CONTEXT, 2, "/{clusterName}/CONTROLLER/CONTEXT/{contextId}");
     // @formatter:on
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/PropertyType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyType.java b/helix-core/src/main/java/org/apache/helix/PropertyType.java
index 680dc06..75adb20 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyType.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyType.java
@@ -64,7 +64,8 @@ public enum PropertyType {
   PERSISTENTSTATS(Type.CONTROLLER, true, false, false, false),
   ALERTS(Type.CONTROLLER, true, false, false, false),
   ALERT_STATUS(Type.CONTROLLER, true, false, false, false),
-  ALERT_HISTORY(Type.CONTROLLER, true, false, false, false);
+  ALERT_HISTORY(Type.CONTROLLER, true, false, false, false),
+  CONTEXT(Type.CONTROLLER, true, false);
 
   // @formatter:on
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index fdeb879..98072d1 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -27,11 +27,13 @@ import org.apache.helix.api.config.ParticipantConfig;
 import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ContextId;
 import org.apache.helix.api.id.ControllerId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SpectatorId;
 import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.context.ControllerContext;
 import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -73,6 +75,8 @@ public class Cluster {
    */
   private final Map<SpectatorId, Spectator> _spectatorMap;
 
+  private final Map<ContextId, ControllerContext> _contextMap;
+
   private final ControllerId _leaderId;
 
   private final ClusterConfig _config;
@@ -86,6 +90,7 @@ public class Cluster {
    * @param leaderId
    * @param constraintMap
    * @param stateModelMap
+   * @param contextMap
    * @param stats
    * @param alerts
    * @param userConfig
@@ -95,8 +100,9 @@ public class Cluster {
   public Cluster(ClusterId id, Map<ResourceId, Resource> resourceMap,
       Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
       ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap,
-      Map<StateModelDefId, StateModelDefinition> stateModelMap, PersistentStats stats,
-      Alerts alerts, UserConfig userConfig, boolean isPaused, boolean autoJoinAllowed) {
+      Map<StateModelDefId, StateModelDefinition> stateModelMap,
+      Map<ContextId, ControllerContext> contextMap, PersistentStats stats, Alerts alerts,
+      UserConfig userConfig, boolean isPaused, boolean autoJoinAllowed) {
 
     // build the config
     // Guava's transform and "copy" functions really return views so the maps all reflect each other
@@ -137,6 +143,8 @@ public class Cluster {
 
     _leaderId = leaderId;
 
+    _contextMap = ImmutableMap.copyOf(contextMap);
+
     // TODO impl this when we persist controllers and spectators on zookeeper
     _controllerMap = ImmutableMap.copyOf(controllerMap);
     _spectatorMap = Collections.emptyMap();
@@ -224,6 +232,14 @@ public class Cluster {
   }
 
   /**
+   * Get all the controller context currently on the cluster
+   * @return map of context id to controller context objects
+   */
+  public Map<ContextId, ControllerContext> getContextMap() {
+    return _contextMap;
+  }
+
+  /**
    * Get all the persisted stats for the cluster
    * @return PersistentStats instance
    */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 79a1e09..0726510 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -30,8 +30,7 @@ import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
@@ -52,18 +51,16 @@ public class Resource {
    * @param idealState ideal state of the resource
    * @param externalView external view of the resource
    * @param resourceAssignment current resource assignment of the cluster
-   * @param rebalancerContext contextual parameters that the rebalancer should be aware of
+   * @param rebalancerConfig parameters that the rebalancer should be aware of
    * @param userConfig any resource user-defined configuration
    * @param bucketSize the bucket size to use for physically saved state
    * @param batchMessageMode true if batch messaging allowed, false otherwise
    */
   public Resource(ResourceId id, ResourceType type, IdealState idealState,
       ResourceAssignment resourceAssignment, ExternalView externalView,
-      RebalancerContext rebalancerContext, UserConfig userConfig, int bucketSize,
+      RebalancerConfig rebalancerConfig, UserConfig userConfig, int bucketSize,
       boolean batchMessageMode) {
     SchedulerTaskConfig schedulerTaskConfig = schedulerTaskConfig(idealState);
-    RebalancerConfig rebalancerConfig = new RebalancerConfig(rebalancerContext);
-
     _config =
         new ResourceConfig(id, type, schedulerTaskConfig, rebalancerConfig, userConfig, bucketSize,
             batchMessageMode);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
index 6d69981..cda83d8 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
@@ -6,7 +6,7 @@ import org.apache.helix.api.Scope;
 import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.lock.HelixLock;
 import org.apache.helix.lock.HelixLockable;
 import org.apache.log4j.Logger;
@@ -96,12 +96,12 @@ public class AtomicResourceAccessor extends ResourceAccessor {
   }
 
   @Override
-  public boolean setRebalancerContext(ResourceId resourceId, RebalancerContext context) {
+  public boolean setRebalancerConfig(ResourceId resourceId, RebalancerConfig config) {
     HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
     boolean locked = lock.lock();
     if (locked) {
       try {
-        return _resourceAccessor.setRebalancerContext(resourceId, context);
+        return _resourceAccessor.setRebalancerConfig(resourceId, config);
       } finally {
         lock.unlock();
       }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index b01a3ec..36c7aaa 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -45,14 +45,18 @@ import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ConstraintId;
+import org.apache.helix.api.id.ContextId;
 import org.apache.helix.api.id.ControllerId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SessionId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.context.ControllerContext;
+import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
 import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConfiguration;
 import org.apache.helix.model.ClusterConstraints;
@@ -60,6 +64,7 @@ import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
@@ -266,9 +271,13 @@ public class ClusterAccessor {
     // read the alerts
     Alerts alerts = _accessor.getProperty(_keyBuilder.alerts());
 
+    // read controller context
+    Map<ContextId, ControllerContext> contextMap = readControllerContext();
+
     // create the cluster snapshot object
     return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
-        clusterConstraintMap, stateModelMap, stats, alerts, userConfig, isPaused, autoJoinAllowed);
+        clusterConstraintMap, stateModelMap, contextMap, stats, alerts, userConfig, isPaused,
+        autoJoinAllowed);
   }
 
   /**
@@ -459,6 +468,20 @@ public class ClusterAccessor {
   }
 
   /**
+   * Read the persisted controller contexts
+   * @return map of context id to controller context
+   */
+  public Map<ContextId, ControllerContext> readControllerContext() {
+    Map<String, ControllerContextHolder> contextHolders =
+        _accessor.getChildValuesMap(_keyBuilder.controllerContexts());
+    Map<ContextId, ControllerContext> contexts = Maps.newHashMap();
+    for (String contextName : contextHolders.keySet()) {
+      contexts.put(ContextId.from(contextName), contextHolders.get(contextName).getContext());
+    }
+    return contexts;
+  }
+
+  /**
    * Add a statistic specification to the cluster. Existing stat specifications will not be
    * overwritten
    * @param statName string representing a stat specification
@@ -623,7 +646,7 @@ public class ClusterAccessor {
    */
   public boolean addResourceToCluster(ResourceConfig resource) {
     if (resource == null || resource.getRebalancerConfig() == null) {
-      LOG.error("Resource not fully defined with a rebalancer context");
+      LOG.error("Resource not fully defined with a rebalancer config");
       return false;
     }
 
@@ -631,9 +654,8 @@ public class ClusterAccessor {
       LOG.error("Cluster: " + _clusterId + " structure is not valid");
       return false;
     }
-    RebalancerContext context =
-        resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
-    StateModelDefId stateModelDefId = context.getStateModelDefId();
+    RebalancerConfig config = resource.getRebalancerConfig();
+    StateModelDefId stateModelDefId = config.getStateModelDefId();
     if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
       LOG.error("State model: " + stateModelDefId + " not found in cluster: " + _clusterId);
       return false;
@@ -656,17 +678,21 @@ public class ClusterAccessor {
       ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
       configuration.setType(resource.getType());
       configuration.addNamespacedConfig(resource.getUserConfig());
-      configuration.addNamespacedConfig(resource.getRebalancerConfig().toNamespacedConfig());
-      configuration.setBucketSize(resource.getBucketSize());
-      configuration.setBatchMessageMode(resource.getBatchMessageMode());
+      PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
+      if (partitionedConfig == null
+          || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+        // only persist if this is not easily convertible to an ideal state
+        configuration
+            .addNamespacedConfig(new RebalancerConfigHolder(resource.getRebalancerConfig())
+                .toNamespacedConfig());
+      }
       _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
     }
 
     // Create an IdealState from a RebalancerConfig (if the resource is partitioned)
-    RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
     IdealState idealState =
-        ResourceAccessor.rebalancerConfigToIdealState(rebalancerConfig, resource.getBucketSize(),
-            resource.getBatchMessageMode());
+        ResourceAccessor.rebalancerConfigToIdealState(resource.getRebalancerConfig(),
+            resource.getBucketSize(), resource.getBatchMessageMode());
     if (idealState != null) {
       _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index 2721d91..c3deafe 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -51,8 +51,8 @@ import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SessionId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -315,16 +315,15 @@ public class ParticipantAccessor {
       return false;
     }
 
-    // need the rebalancer context for the resource
-    RebalancerContext context =
-        resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
-    if (context == null) {
-      LOG.error("Rebalancer context for resource does not exist");
+    // need the rebalancer config for the resource
+    RebalancerConfig config = resource.getRebalancerConfig();
+    if (config == null) {
+      LOG.error("Rebalancer config for resource does not exist");
       return false;
     }
 
     // ensure that all partitions to reset exist
-    Set<PartitionId> partitionSet = ImmutableSet.copyOf(context.getSubUnitIdSet());
+    Set<PartitionId> partitionSet = ImmutableSet.copyOf(config.getSubUnitIdSet());
     if (!partitionSet.containsAll(resetPartitionIdSet)) {
       LOG.error("Not all of the specified partitions to reset exist for the resource");
       return false;
@@ -368,7 +367,7 @@ public class ParticipantAccessor {
     }
 
     // build messages to signal the transition
-    StateModelDefId stateModelDefId = context.getStateModelDefId();
+    StateModelDefId stateModelDefId = config.getStateModelDefId();
     StateModelDefinition stateModelDef =
         _accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify()));
     Map<MessageId, Message> messageMap = Maps.newHashMap();
@@ -385,7 +384,7 @@ public class ParticipantAccessor {
       message.setStateModelDef(stateModelDefId);
       message.setFromState(State.from(HelixDefinedState.ERROR.toString()));
       message.setToState(stateModelDef.getTypedInitialState());
-      message.setStateModelFactoryId(context.getStateModelFactoryId());
+      message.setStateModelFactoryId(config.getStateModelFactoryId());
 
       messageMap.put(message.getMessageId(), message);
     }
@@ -666,8 +665,8 @@ public class ParticipantAccessor {
     for (String resourceName : idealStateMap.keySet()) {
       IdealState idealState = idealStateMap.get(resourceName);
       swapParticipantsInIdealState(idealState, oldParticipantId, newParticipantId);
-      PartitionedRebalancerContext context = PartitionedRebalancerContext.from(idealState);
-      resourceAccessor.setRebalancerContext(ResourceId.from(resourceName), context);
+      PartitionedRebalancerConfig config = PartitionedRebalancerConfig.from(idealState);
+      resourceAccessor.setRebalancerConfig(ResourceId.from(resourceName), config);
       _accessor.setProperty(_keyBuilder.idealStates(resourceName), idealState);
     }
     return true;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index e5ac57c..b308b98 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -38,11 +38,12 @@ import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
@@ -131,11 +132,11 @@ public class ResourceAccessor {
    * @param configuration
    * @return true if set, false otherwise
    */
-  private boolean setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) {
+  private boolean setConfiguration(ResourceId resourceId, ResourceConfiguration configuration,
+      RebalancerConfig rebalancerConfig) {
     boolean status =
         _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
-    // also set an ideal state if the resource supports it
-    RebalancerConfig rebalancerConfig = new RebalancerConfig(configuration);
+    // set an ideal state if the resource supports it
     IdealState idealState =
         rebalancerConfigToIdealState(rebalancerConfig, configuration.getBucketSize(),
             configuration.getBatchMessageMode());
@@ -146,16 +147,20 @@ public class ResourceAccessor {
   }
 
   /**
-   * Set the context of the rebalancer. This includes all properties required for rebalancing this
+   * Set the config of the rebalancer. This includes all properties required for rebalancing this
    * resource
    * @param resourceId the resource to update
-   * @param context the new rebalancer context
-   * @return true if the context was set, false otherwise
+   * @param config the new rebalancer config
+   * @return true if the config was set, false otherwise
    */
-  public boolean setRebalancerContext(ResourceId resourceId, RebalancerContext context) {
-    RebalancerConfig config = new RebalancerConfig(context);
+  public boolean setRebalancerConfig(ResourceId resourceId, RebalancerConfig config) {
     ResourceConfiguration resourceConfig = new ResourceConfiguration(resourceId);
-    resourceConfig.addNamespacedConfig(config.toNamespacedConfig());
+    PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
+    if (partitionedConfig == null
+        || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+      // only persist if this is not easily convertible to an ideal state
+      resourceConfig.addNamespacedConfig(new RebalancerConfigHolder(config).toNamespacedConfig());
+    }
 
     // update the ideal state if applicable
     IdealState oldIdealState =
@@ -190,9 +195,13 @@ public class ResourceAccessor {
    * @return RebalancerConfig, or null
    */
   public RebalancerConfig readRebalancerConfig(ResourceId resourceId) {
+    IdealState idealState = _accessor.getProperty(_keyBuilder.idealStates(resourceId.stringify()));
+    if (idealState != null && idealState.getRebalanceMode() != RebalanceMode.USER_DEFINED) {
+      return PartitionedRebalancerConfig.from(idealState);
+    }
     ResourceConfiguration resourceConfig =
         _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
-    return resourceConfig != null ? RebalancerConfig.from(resourceConfig) : null;
+    return resourceConfig.getRebalancerConfig(RebalancerConfig.class);
   }
 
   /**
@@ -242,10 +251,17 @@ public class ResourceAccessor {
     ResourceId resourceId = resourceConfig.getId();
     ResourceConfiguration config = new ResourceConfiguration(resourceId);
     config.addNamespacedConfig(resourceConfig.getUserConfig());
-    config.addNamespacedConfig(resourceConfig.getRebalancerConfig().toNamespacedConfig());
+    PartitionedRebalancerConfig partitionedConfig =
+        PartitionedRebalancerConfig.from(resourceConfig.getRebalancerConfig());
+    if (partitionedConfig == null
+        || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+      // only persist if this is not easily convertible to an ideal state
+      config.addNamespacedConfig(new RebalancerConfigHolder(resourceConfig.getRebalancerConfig())
+          .toNamespacedConfig());
+    }
     config.setBucketSize(resourceConfig.getBucketSize());
     config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
-    setConfiguration(resourceId, config);
+    setConfiguration(resourceId, config, resourceConfig.getRebalancerConfig());
     return true;
   }
 
@@ -332,28 +348,28 @@ public class ResourceAccessor {
       String participantGroupTag) {
     Resource resource = readResource(resourceId);
     RebalancerConfig config = resource.getRebalancerConfig();
-    PartitionedRebalancerContext context =
-        config.getRebalancerContext(PartitionedRebalancerContext.class);
-    if (context == null) {
+    PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
+    if (partitionedConfig == null) {
       LOG.error("Only partitioned resource types are supported");
       return false;
     }
     if (replicaCount != -1) {
-      context.setReplicaCount(replicaCount);
+      partitionedConfig.setReplicaCount(replicaCount);
     }
     if (participantGroupTag != null) {
-      context.setParticipantGroupTag(participantGroupTag);
+      partitionedConfig.setParticipantGroupTag(participantGroupTag);
     }
     StateModelDefinition stateModelDef =
-        _accessor.getProperty(_keyBuilder.stateModelDef(context.getStateModelDefId().stringify()));
+        _accessor.getProperty(_keyBuilder.stateModelDef(partitionedConfig.getStateModelDefId()
+            .stringify()));
     List<InstanceConfig> participantConfigs =
         _accessor.getChildValues(_keyBuilder.instanceConfigs());
     Set<ParticipantId> participantSet = Sets.newHashSet();
     for (InstanceConfig participantConfig : participantConfigs) {
       participantSet.add(participantConfig.getParticipantId());
     }
-    context.generateDefaultConfiguration(stateModelDef, participantSet);
-    setRebalancerContext(resourceId, context);
+    partitionedConfig.generateDefaultConfiguration(stateModelDef, participantSet);
+    setRebalancerConfig(resourceId, partitionedConfig);
     return true;
   }
 
@@ -366,41 +382,40 @@ public class ResourceAccessor {
    */
   static IdealState rebalancerConfigToIdealState(RebalancerConfig config, int bucketSize,
       boolean batchMessageMode) {
-    PartitionedRebalancerContext partitionedContext =
-        config.getRebalancerContext(PartitionedRebalancerContext.class);
-    if (partitionedContext != null) {
-      IdealState idealState = new IdealState(partitionedContext.getResourceId());
-      idealState.setRebalanceMode(partitionedContext.getRebalanceMode());
-      idealState.setRebalancerRef(partitionedContext.getRebalancerRef());
+    PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
+    if (partitionedConfig != null) {
+      IdealState idealState = new IdealState(partitionedConfig.getResourceId());
+      idealState.setRebalanceMode(partitionedConfig.getRebalanceMode());
+      idealState.setRebalancerRef(partitionedConfig.getRebalancerRef());
       String replicas = null;
-      if (partitionedContext.anyLiveParticipant()) {
+      if (partitionedConfig.anyLiveParticipant()) {
         replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
       } else {
-        replicas = Integer.toString(partitionedContext.getReplicaCount());
+        replicas = Integer.toString(partitionedConfig.getReplicaCount());
       }
       idealState.setReplicas(replicas);
-      idealState.setNumPartitions(partitionedContext.getPartitionSet().size());
-      idealState.setInstanceGroupTag(partitionedContext.getParticipantGroupTag());
-      idealState.setMaxPartitionsPerInstance(partitionedContext.getMaxPartitionsPerParticipant());
-      idealState.setStateModelDefId(partitionedContext.getStateModelDefId());
-      idealState.setStateModelFactoryId(partitionedContext.getStateModelFactoryId());
+      idealState.setNumPartitions(partitionedConfig.getPartitionSet().size());
+      idealState.setInstanceGroupTag(partitionedConfig.getParticipantGroupTag());
+      idealState.setMaxPartitionsPerInstance(partitionedConfig.getMaxPartitionsPerParticipant());
+      idealState.setStateModelDefId(partitionedConfig.getStateModelDefId());
+      idealState.setStateModelFactoryId(partitionedConfig.getStateModelFactoryId());
       idealState.setBucketSize(bucketSize);
       idealState.setBatchMessageMode(batchMessageMode);
-      if (partitionedContext.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
-        SemiAutoRebalancerContext semiAutoContext =
-            config.getRebalancerContext(SemiAutoRebalancerContext.class);
-        for (PartitionId partitionId : semiAutoContext.getPartitionSet()) {
-          idealState.setPreferenceList(partitionId, semiAutoContext.getPreferenceList(partitionId));
+      if (partitionedConfig.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+        SemiAutoRebalancerConfig semiAutoConfig =
+            BasicRebalancerConfig.convert(config, SemiAutoRebalancerConfig.class);
+        for (PartitionId partitionId : semiAutoConfig.getPartitionSet()) {
+          idealState.setPreferenceList(partitionId, semiAutoConfig.getPreferenceList(partitionId));
         }
-      } else if (partitionedContext.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
-        CustomRebalancerContext customContext =
-            config.getRebalancerContext(CustomRebalancerContext.class);
-        for (PartitionId partitionId : customContext.getPartitionSet()) {
-          idealState.setParticipantStateMap(partitionId,
-              customContext.getPreferenceMap(partitionId));
+      } else if (partitionedConfig.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+        CustomRebalancerConfig customConfig =
+            BasicRebalancerConfig.convert(config, CustomRebalancerConfig.class);
+        for (PartitionId partitionId : customConfig.getPartitionSet()) {
+          idealState
+              .setParticipantStateMap(partitionId, customConfig.getPreferenceMap(partitionId));
         }
       } else {
-        for (PartitionId partitionId : partitionedContext.getPartitionSet()) {
+        for (PartitionId partitionId : partitionedConfig.getPartitionSet()) {
           List<ParticipantId> preferenceList = Collections.emptyList();
           idealState.setPreferenceList(partitionId, preferenceList);
           Map<ParticipantId, State> participantStateMap = Collections.emptyMap();
@@ -425,7 +440,7 @@ public class ResourceAccessor {
       ResourceConfiguration resourceConfiguration, IdealState idealState,
       ExternalView externalView, ResourceAssignment resourceAssignment) {
     UserConfig userConfig;
-    RebalancerContext rebalancerContext = null;
+    RebalancerConfig rebalancerConfig = null;
     ResourceType type = ResourceType.DATA;
     if (resourceConfiguration != null) {
       userConfig = resourceConfiguration.getUserConfig();
@@ -437,14 +452,14 @@ public class ResourceAccessor {
     boolean batchMessageMode = false;
     if (idealState != null) {
       if (resourceConfiguration != null
-          && idealState.getRebalanceMode() != RebalanceMode.USER_DEFINED) {
-        // prefer rebalancer context for non-user_defined data rebalancing
-        rebalancerContext =
-            resourceConfiguration.getRebalancerContext(PartitionedRebalancerContext.class);
+          && idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+        // prefer rebalancer config for user_defined data rebalancing
+        rebalancerConfig =
+            resourceConfiguration.getRebalancerConfig(PartitionedRebalancerConfig.class);
       }
-      if (rebalancerContext == null) {
+      if (rebalancerConfig == null) {
         // prefer ideal state for non-user_defined data rebalancing
-        rebalancerContext = PartitionedRebalancerContext.from(idealState);
+        rebalancerConfig = PartitionedRebalancerConfig.from(idealState);
       }
       bucketSize = idealState.getBucketSize();
       batchMessageMode = idealState.getBatchMessageMode();
@@ -452,14 +467,13 @@ public class ResourceAccessor {
     } else if (resourceConfiguration != null) {
       bucketSize = resourceConfiguration.getBucketSize();
       batchMessageMode = resourceConfiguration.getBatchMessageMode();
-      RebalancerConfig rebalancerConfig = new RebalancerConfig(resourceConfiguration);
-      rebalancerContext = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+      rebalancerConfig = resourceConfiguration.getRebalancerConfig(RebalancerConfig.class);
     }
-    if (rebalancerContext == null) {
-      rebalancerContext = new PartitionedRebalancerContext();
+    if (rebalancerConfig == null) {
+      rebalancerConfig = new PartitionedRebalancerConfig();
     }
     return new Resource(resourceId, type, idealState, resourceAssignment, externalView,
-        rebalancerContext, userConfig, bucketSize, batchMessageMode);
+        rebalancerConfig, userConfig, bucketSize, batchMessageMode);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
index 38d48ab..b148a49 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
@@ -7,8 +7,7 @@ import org.apache.helix.api.Partition;
 import org.apache.helix.api.Scope;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 
 import com.google.common.collect.Sets;
 
@@ -80,7 +79,7 @@ public class ResourceConfig {
    * @return map of subunit id to subunit or empty map if none
    */
   public Map<? extends PartitionId, ? extends Partition> getSubUnitMap() {
-    return _rebalancerConfig.getRebalancerContext(RebalancerContext.class).getSubUnitMap();
+    return _rebalancerConfig.getSubUnitMap();
   }
 
   /**
@@ -167,7 +166,7 @@ public class ResourceConfig {
   public static class Delta {
     private enum Fields {
       TYPE,
-      REBALANCER_CONTEXT,
+      REBALANCER_CONFIG,
       USER_CONFIG,
       BUCKET_SIZE,
       BATCH_MESSAGE_MODE
@@ -198,12 +197,12 @@ public class ResourceConfig {
 
     /**
      * Set the rebalancer configuration
-     * @param context properties of interest for rebalancing
+     * @param config properties of interest for rebalancing
      * @return Delta
      */
-    public Delta setRebalancerContext(RebalancerContext context) {
-      _builder.rebalancerContext(context);
-      _updateFields.add(Fields.REBALANCER_CONTEXT);
+    public Delta setRebalancerConfig(RebalancerConfig config) {
+      _builder.rebalancerConfig(config);
+      _updateFields.add(Fields.REBALANCER_CONFIG);
       return this;
     }
 
@@ -248,10 +247,8 @@ public class ResourceConfig {
     public ResourceConfig mergeInto(ResourceConfig orig) {
       ResourceConfig deltaConfig = _builder.build();
       Builder builder =
-          new Builder(orig.getId())
-              .type(orig.getType())
-              .rebalancerContext(
-                  orig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class))
+          new Builder(orig.getId()).type(orig.getType())
+              .rebalancerConfig(orig.getRebalancerConfig())
               .schedulerTaskConfig(orig.getSchedulerTaskConfig()).userConfig(orig.getUserConfig())
               .bucketSize(orig.getBucketSize()).batchMessageMode(orig.getBatchMessageMode());
       for (Fields field : _updateFields) {
@@ -259,9 +256,8 @@ public class ResourceConfig {
         case TYPE:
           builder.type(deltaConfig.getType());
           break;
-        case REBALANCER_CONTEXT:
-          builder.rebalancerContext(deltaConfig.getRebalancerConfig().getRebalancerContext(
-              RebalancerContext.class));
+        case REBALANCER_CONFIG:
+          builder.rebalancerConfig(deltaConfig.getRebalancerConfig());
           break;
         case USER_CONFIG:
           builder.userConfig(deltaConfig.getUserConfig());
@@ -314,11 +310,11 @@ public class ResourceConfig {
 
     /**
      * Set the rebalancer configuration
-     * @param rebalancerContext properties of interest for rebalancing
+     * @param rebalancerConfig properties of interest for rebalancing
      * @return Builder
      */
-    public Builder rebalancerContext(RebalancerContext rebalancerContext) {
-      _rebalancerConfig = new RebalancerConfig(rebalancerContext);
+    public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
+      _rebalancerConfig = rebalancerConfig;
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/id/ContextId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/id/ContextId.java b/helix-core/src/main/java/org/apache/helix/api/id/ContextId.java
new file mode 100644
index 0000000..29a72b7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/id/ContextId.java
@@ -0,0 +1,57 @@
+package org.apache.helix.api.id;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Identifies a context
+ */
+public class ContextId extends Id {
+  @JsonProperty("id")
+  final private String _id;
+
+  /**
+   * Create a context id
+   * @param id string representation of the id
+   */
+  @JsonCreator
+  public ContextId(@JsonProperty("id") String id) {
+    _id = id;
+  }
+
+  @Override
+  public String stringify() {
+    return _id;
+  }
+
+  /**
+   * Get a concrete context id for a string name
+   * @param contextId string context identifier
+   * @return ContextId
+   */
+  public static ContextId from(String contextId) {
+    if (contextId == null) {
+      return null;
+    }
+    return new ContextId(contextId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index eec745e..96be0fa 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -44,18 +44,19 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.PipelineRegistry;
-import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.CompatibilityCheckStage;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ExternalViewComputeStage;
 import org.apache.helix.controller.stages.MessageGenerationStage;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.PersistAssignmentStage;
+import org.apache.helix.controller.stages.PersistContextStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.controller.stages.TaskAssignmentStage;
-import org.apache.helix.controller.stages.PersistAssignmentStage;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HealthStat;
@@ -183,11 +184,12 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       rebalancePipeline.addStage(new ResourceComputationStage());
       rebalancePipeline.addStage(new CurrentStateComputationStage());
       rebalancePipeline.addStage(new BestPossibleStateCalcStage());
-      rebalancePipeline.addStage(new PersistAssignmentStage());
       rebalancePipeline.addStage(new MessageGenerationStage());
       rebalancePipeline.addStage(new MessageSelectionStage());
       rebalancePipeline.addStage(new MessageThrottleStage());
       rebalancePipeline.addStage(new TaskAssignmentStage());
+      rebalancePipeline.addStage(new PersistAssignmentStage());
+      rebalancePipeline.addStage(new PersistContextStage());
 
       // external view generation
       Pipeline externalViewPipeline = new Pipeline();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/context/BasicControllerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/context/BasicControllerContext.java b/helix-core/src/main/java/org/apache/helix/controller/context/BasicControllerContext.java
new file mode 100644
index 0000000..2d98347
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/context/BasicControllerContext.java
@@ -0,0 +1,60 @@
+package org.apache.helix.controller.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.id.ContextId;
+import org.apache.helix.controller.serializer.DefaultStringSerializer;
+import org.apache.helix.controller.serializer.StringSerializer;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * A simple context that can be serialized by a {@link DefaultStringSerializer}
+ */
+public class BasicControllerContext implements ControllerContext {
+  private final ContextId _id;
+  private Class<? extends StringSerializer> _serializer;
+
+  /**
+   * Instantiate with an id
+   * @param id ContextId, unique among all contexts in this cluster
+   */
+  public BasicControllerContext(@JsonProperty("id") ContextId id) {
+    _id = id;
+    _serializer = DefaultStringSerializer.class;
+  }
+
+  /**
+   * Set the class that can serialize this object into a String, and back
+   * @param serializer StringSerializer implementation class
+   */
+  public void setSerializerClass(Class<? extends StringSerializer> serializer) {
+    _serializer = serializer;
+  }
+
+  @Override
+  public Class<? extends StringSerializer> getSerializerClass() {
+    return _serializer;
+  }
+
+  @Override
+  public ContextId getId() {
+    return _id;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContext.java b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContext.java
new file mode 100644
index 0000000..b1be5c2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContext.java
@@ -0,0 +1,40 @@
+package org.apache.helix.controller.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.id.ContextId;
+import org.apache.helix.controller.serializer.StringSerializer;
+
+/**
+ * Controller stages can implement this interface to set and get persistent state
+ */
+public interface ControllerContext {
+  /**
+   * Get the identifier of this context
+   * @return {@link ContextId}
+   */
+  ContextId getId();
+
+  /**
+   * Get the class that can be used to convert this object to and from a String
+   * @return {@link StringSerializer} implementation class
+   */
+  Class<? extends StringSerializer> getSerializerClass();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextHolder.java b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextHolder.java
new file mode 100644
index 0000000..8c1d03a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextHolder.java
@@ -0,0 +1,156 @@
+package org.apache.helix.controller.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.serializer.StringSerializer;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Wrap a {@link ControllerContext} so that it can be persisted in the backing store
+ */
+public class ControllerContextHolder extends HelixProperty {
+  private enum Fields {
+    SERIALIZER_CLASS,
+    CONTEXT,
+    CONTEXT_CLASS
+  }
+
+  private static final Logger LOG = Logger.getLogger(ControllerContextHolder.class);
+
+  private final ControllerContext _context;
+
+  /**
+   * Instantiate from a populated ControllerContext
+   * @param context instantiated context
+   */
+  public ControllerContextHolder(ControllerContext context) {
+    super(context.getId().toString());
+    _context = context;
+    StringSerializer serializer = instantiateSerializerFromContext(_context);
+    if (_context != null && serializer != null) {
+      _record.setSimpleField(Fields.SERIALIZER_CLASS.toString(), _context.getSerializerClass()
+          .getName());
+      _record.setSimpleField(Fields.CONTEXT_CLASS.toString(), _context.getClass().getName());
+      _record.setSimpleField(Fields.CONTEXT.toString(), serializer.serialize(_context));
+    }
+  }
+
+  /**
+   * Instantiate from a record
+   * @param record populated ZNRecord
+   */
+  public ControllerContextHolder(ZNRecord record) {
+    super(record);
+    StringSerializer serializer =
+        instantiateSerializerFromName(_record.getSimpleField(Fields.SERIALIZER_CLASS.toString()));
+    _context =
+        loadContext(serializer, _record.getSimpleField(Fields.CONTEXT_CLASS.toString()),
+            _record.getSimpleField(Fields.CONTEXT.toString()));
+  }
+
+  /**
+   * Get a ControllerContext
+   * @return instantiated {@link ControllerContext}
+   */
+  public ControllerContext getContext() {
+    return _context;
+  }
+
+  /**
+   * Get a ControllerContext as a specific subtyple
+   * @return instantiated typed {@link ControllerContext}, or null
+   */
+  public <T extends ControllerContext> T getContext(Class<T> contextClass) {
+    if (_context != null) {
+      try {
+        return contextClass.cast(_context);
+      } catch (ClassCastException e) {
+        LOG.warn(contextClass + " is incompatible with context class: " + _context.getClass());
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Instantiate a StringSerializer that can serialize the context
+   * @param context instantiated ControllerContext
+   * @return StringSerializer object, or null if it could not be instantiated
+   */
+  private StringSerializer instantiateSerializerFromContext(ControllerContext context) {
+    if (context == null) {
+      return null;
+    }
+    try {
+      return context.getSerializerClass().newInstance();
+    } catch (InstantiationException e) {
+      LOG.error("Serializer could not be instatiated", e);
+    } catch (IllegalAccessException e) {
+      LOG.error("Serializer default constructor not accessible", e);
+    }
+    return null;
+  }
+
+  /**
+   * Instantiate a StringSerializer from its class name
+   * @param serializerClassName name of a StringSerializer implementation class
+   * @return instantiated StringSerializer, or null if it could not be instantiated
+   */
+  private StringSerializer instantiateSerializerFromName(String serializerClassName) {
+    if (serializerClassName != null) {
+      try {
+        return (StringSerializer) HelixUtil.loadClass(getClass(), serializerClassName)
+            .newInstance();
+      } catch (InstantiationException e) {
+        LOG.error("Error getting the serializer", e);
+      } catch (IllegalAccessException e) {
+        LOG.error("Error getting the serializer", e);
+      } catch (ClassNotFoundException e) {
+        LOG.error("Error getting the serializer", e);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Deserialize a context
+   * @param serializer StringSerializer that can deserialize the context
+   * @param className the name of the context class
+   * @param contextData the raw context data as a String
+   * @return ControllerContext, or null if there was a conversion issue
+   */
+  private ControllerContext loadContext(StringSerializer serializer, String className,
+      String contextData) {
+    if (serializer != null && className != null && contextData != null) {
+      try {
+        Class<? extends ControllerContext> contextClass =
+            HelixUtil.loadClass(getClass(), className).asSubclass(ControllerContext.class);
+        return serializer.deserialize(contextClass, contextData);
+      } catch (ClassNotFoundException e) {
+        LOG.error("Could not convert the persisted data into a " + className, e);
+      } catch (ClassCastException e) {
+        LOG.error("Could not convert the persisted data into a " + className, e);
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextProvider.java b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextProvider.java
new file mode 100644
index 0000000..1541585
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextProvider.java
@@ -0,0 +1,120 @@
+package org.apache.helix.controller.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.api.id.ContextId;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+
+/**
+ * An interface for getting and setting {@link ControllerContext} objects, which will eventually
+ * be persisted and acessible across runs of the controller pipeline.
+ */
+public class ControllerContextProvider {
+  private static final Logger LOG = Logger.getLogger(ControllerContextProvider.class);
+
+  private Map<ContextId, ControllerContext> _persistedContexts;
+  private Map<ContextId, ControllerContext> _pendingContexts;
+
+  /**
+   * Instantiate with already-persisted controller contexts
+   * @param contexts
+   */
+  public ControllerContextProvider(Map<ContextId, ControllerContext> contexts) {
+    _persistedContexts = contexts != null ? contexts : new HashMap<ContextId, ControllerContext>();
+    _pendingContexts = Maps.newHashMap();
+  }
+
+  /**
+   * Get a base ControllerContext
+   * @param contextId the context id to look up
+   * @return a ControllerContext, or null if not found
+   */
+  public ControllerContext getControllerContext(ContextId contextId) {
+    return getControllerContext(contextId, ControllerContext.class);
+  }
+
+  /**
+   * Get a typed ControllerContext
+   * @param contextId the context id to look up
+   * @param contextClass the class which the context should be returned as
+   * @return a typed ControllerContext, or null if no context with given id is available for this
+   *         type
+   */
+  public <T extends ControllerContext> T getControllerContext(ContextId contextId,
+      Class<T> contextClass) {
+    try {
+      if (_pendingContexts.containsKey(contextId)) {
+        return contextClass.cast(_pendingContexts.get(contextId));
+      } else if (_persistedContexts.containsKey(contextId)) {
+        return contextClass.cast(_persistedContexts.get(contextId));
+      }
+    } catch (ClassCastException e) {
+      LOG.error("Could not convert context " + contextId + " into " + contextClass.getName());
+    }
+    return null;
+  }
+
+  /**
+   * Put a controller context, overwriting any existing ones
+   * @param contextId the id to set
+   * @param context the context object
+   */
+  public void putControllerContext(ContextId contextId, ControllerContext context) {
+    putControllerContext(contextId, context, true);
+  }
+
+  /**
+   * Put a controller context, specifying overwrite behavior
+   * @param contextId the id to set
+   * @param context the context object
+   * @param overwriteAllowed true if existing objects can be overwritten, false otherwise
+   * @return true if saved, false if an object with that id exists and overwrite is not allowed
+   */
+  public boolean putControllerContext(ContextId contextId, ControllerContext context,
+      boolean overwriteAllowed) {
+    if (overwriteAllowed || !exists(contextId)) {
+      _pendingContexts.put(contextId, context);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Check if a context exists
+   * @param contextId the id to look up
+   * @return true if a context exists with that id, false otherwise
+   */
+  public boolean exists(ContextId contextId) {
+    return _persistedContexts.containsKey(contextId) || _pendingContexts.containsKey(contextId);
+  }
+
+  /**
+   * Get all contexts that have been put, but not yet persisted
+   * @return a map of context id to context
+   */
+  public Map<ContextId, ControllerContext> getPendingContexts() {
+    return _pendingContexts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 5209e2c..6629bec 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -8,8 +8,10 @@ import org.apache.helix.api.Cluster;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.model.ResourceAssignment;
@@ -40,15 +42,15 @@ public class CustomRebalancer implements HelixRebalancer {
   private static final Logger LOG = Logger.getLogger(CustomRebalancer.class);
 
   @Override
-  public void init(HelixManager helixManager) {
+  public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
     // do nothing
   }
 
   @Override
   public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      Cluster cluster, ResourceCurrentState currentState) {
-    CustomRebalancerContext config =
-        rebalancerConfig.getRebalancerContext(CustomRebalancerContext.class);
+      ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+    CustomRebalancerConfig config =
+        BasicRebalancerConfig.convert(rebalancerConfig, CustomRebalancerConfig.class);
     StateModelDefinition stateModelDef =
         cluster.getStateModelMap().get(config.getStateModelDefId());
     if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
index fc4bfa0..3aa41d7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
@@ -30,8 +30,9 @@ import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -54,31 +55,30 @@ public class FallbackRebalancer implements HelixRebalancer {
   private HelixManager _helixManager;
 
   @Override
-  public void init(HelixManager helixManager) {
+  public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
     _helixManager = helixManager;
   }
 
   @Override
   public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      Cluster cluster, ResourceCurrentState currentState) {
+      ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
     // make sure the manager is not null
     if (_helixManager == null) {
       LOG.info("HelixManager is null!");
       return null;
     }
 
-    // get the context
-    PartitionedRebalancerContext context =
-        rebalancerConfig.getRebalancerContext(PartitionedRebalancerContext.class);
-    if (context == null) {
+    // get the config
+    PartitionedRebalancerConfig config = PartitionedRebalancerConfig.from(rebalancerConfig);
+    if (config == null) {
       LOG.info("Resource is not partitioned");
       return null;
     }
 
     // get the ideal state and rebalancer class
-    ResourceId resourceId = context.getResourceId();
+    ResourceId resourceId = config.getResourceId();
     StateModelDefinition stateModelDef =
-        cluster.getStateModelMap().get(context.getStateModelDefId());
+        cluster.getStateModelMap().get(config.getStateModelDefId());
     if (stateModelDef == null) {
       LOG.info("StateModelDefinition unavailable for " + resourceId);
       return null;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
index 6d7b0ef..0c55d45 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
@@ -16,8 +16,10 @@ import org.apache.helix.api.Participant;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
@@ -56,15 +58,15 @@ public class FullAutoRebalancer implements HelixRebalancer {
   private static Logger LOG = Logger.getLogger(FullAutoRebalancer.class);
 
   @Override
-  public void init(HelixManager helixManager) {
+  public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
     // do nothing
   }
 
   @Override
   public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      Cluster cluster, ResourceCurrentState currentState) {
-    FullAutoRebalancerContext config =
-        rebalancerConfig.getRebalancerContext(FullAutoRebalancerContext.class);
+      ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+    FullAutoRebalancerConfig config =
+        BasicRebalancerConfig.convert(rebalancerConfig, FullAutoRebalancerConfig.class);
     StateModelDefinition stateModelDef =
         cluster.getStateModelMap().get(config.getStateModelDefId());
     // Compute a preference list based on the current ideal state
@@ -181,7 +183,7 @@ public class FullAutoRebalancer implements HelixRebalancer {
   }
 
   private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
-      FullAutoRebalancerContext config, ResourceCurrentState currentStateOutput,
+      FullAutoRebalancerConfig config, ResourceCurrentState currentStateOutput,
       Map<State, Integer> stateCountMap) {
     Map<PartitionId, Map<ParticipantId, State>> map =
         new HashMap<PartitionId, Map<ParticipantId, State>>();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
index 7fcbba5..1fbb02f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
@@ -2,7 +2,8 @@ package org.apache.helix.controller.rebalancer;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.Cluster;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.model.ResourceAssignment;
 
@@ -32,10 +33,11 @@ import org.apache.helix.model.ResourceAssignment;
  */
 public interface HelixRebalancer {
   /**
-   * Initialize the rebalancer with a HelixManager if necessary
-   * @param manager
+   * Initialize the rebalancer with a HelixManager and ControllerContextProvider if necessary
+   * @param manager HelixManager instance
+   * @param contextProvider An object that supports getting and setting context across pipeline runs
    */
-  public void init(HelixManager helixManager);
+  public void init(HelixManager helixManager, ControllerContextProvider contextProvider);
 
   /**
    * Given an ideal state for a resource and liveness of participants, compute a assignment of
@@ -47,18 +49,20 @@ public interface HelixRebalancer {
    * Say that you have:<br/>
    * 
    * <pre>
-   * class MyRebalancerContext implements RebalancerContext
+   * class MyRebalancerConfig implements RebalancerConfig
    * </pre>
    * 
-   * as your rebalancer context. To extract it from a RebalancerConfig, do the following:<br/>
+   * as your rebalancer config. To get a typed version, you can do the following:<br/>
    * 
    * <pre>
-   * MyRebalancerContext context = rebalancerConfig.getRebalancerContext(MyRebalancerContext.class);
+   * MyRebalancerConfig config = BasicRebalancerConfig.convert(rebalancerConfig,
+   *     MyRebalancerConfig.class);
    * </pre>
    * @param rebalancerConfig the properties of the resource for which a mapping will be computed
+   * @param prevAssignment the previous ResourceAssignment of this cluster, or null if none
    * @param cluster complete snapshot of the cluster
    * @param currentState the current states of all partitions
    */
   public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      Cluster cluster, ResourceCurrentState currentState);
+      ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState);
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index a0ad6f3..51ca463 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -9,9 +9,10 @@ import org.apache.helix.api.Cluster;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.model.ResourceAssignment;
@@ -45,15 +46,15 @@ public class SemiAutoRebalancer implements HelixRebalancer {
   private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
 
   @Override
-  public void init(HelixManager helixManager) {
+  public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
     // do nothing
   }
 
   @Override
   public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      Cluster cluster, ResourceCurrentState currentState) {
-    SemiAutoRebalancerContext config =
-        rebalancerConfig.getRebalancerContext(SemiAutoRebalancerContext.class);
+      ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+    SemiAutoRebalancerConfig config =
+        BasicRebalancerConfig.convert(rebalancerConfig, SemiAutoRebalancerConfig.class);
     StateModelDefinition stateModelDef =
         cluster.getStateModelMap().get(config.getStateModelDefId());
     if (LOG.isDebugEnabled()) {
@@ -65,8 +66,7 @@ public class SemiAutoRebalancer implements HelixRebalancer {
       Map<ParticipantId, State> currentStateMap =
           currentState.getCurrentStateMap(config.getResourceId(), partition);
       Set<ParticipantId> disabledInstancesForPartition =
-          ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
-              partition);
+          ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(), partition);
       List<ParticipantId> preferenceList =
           ConstraintBasedAssignment.getPreferenceList(cluster, partition,
               config.getPreferenceList(partition));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java
new file mode 100644
index 0000000..d6aa10e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java
@@ -0,0 +1,256 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import java.util.Set;
+
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.serializer.DefaultStringSerializer;
+import org.apache.helix.controller.serializer.StringSerializer;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Abstract RebalancerConfig that functions for generic subunits. Use a subclass that more
+ * concretely defines the subunits.
+ */
+public abstract class BasicRebalancerConfig implements RebalancerConfig {
+  private ResourceId _resourceId;
+  private StateModelDefId _stateModelDefId;
+  private StateModelFactoryId _stateModelFactoryId;
+  private String _participantGroupTag;
+  private Class<? extends StringSerializer> _serializer;
+  private RebalancerRef _rebalancerRef;
+
+  /**
+   * Instantiate a basic rebalancer config
+   */
+  public BasicRebalancerConfig() {
+    _serializer = DefaultStringSerializer.class;
+  }
+
+  @Override
+  public ResourceId getResourceId() {
+    return _resourceId;
+  }
+
+  /**
+   * Set the resource to rebalance
+   * @param resourceId resource id
+   */
+  public void setResourceId(ResourceId resourceId) {
+    _resourceId = resourceId;
+  }
+
+  @Override
+  public StateModelDefId getStateModelDefId() {
+    return _stateModelDefId;
+  }
+
+  /**
+   * Set the state model definition that the resource follows
+   * @param stateModelDefId state model definition id
+   */
+  public void setStateModelDefId(StateModelDefId stateModelDefId) {
+    _stateModelDefId = stateModelDefId;
+  }
+
+  @Override
+  public StateModelFactoryId getStateModelFactoryId() {
+    return _stateModelFactoryId;
+  }
+
+  /**
+   * Set the state model factory that the resource uses
+   * @param stateModelFactoryId state model factory id
+   */
+  public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+    _stateModelFactoryId = stateModelFactoryId;
+  }
+
+  @Override
+  public String getParticipantGroupTag() {
+    return _participantGroupTag;
+  }
+
+  /**
+   * Set a tag that participants must have in order to serve this resource
+   * @param participantGroupTag string group tag
+   */
+  public void setParticipantGroupTag(String participantGroupTag) {
+    _participantGroupTag = participantGroupTag;
+  }
+
+  /**
+   * Get the serializer. If none is provided, {@link DefaultStringSerializer} is used
+   */
+  @Override
+  public Class<? extends StringSerializer> getSerializerClass() {
+    return _serializer;
+  }
+
+  /**
+   * Set the class that can serialize this config
+   * @param serializer serializer class that implements StringSerializer
+   */
+  public void setSerializerClass(Class<? extends StringSerializer> serializer) {
+    _serializer = serializer;
+  }
+
+  @Override
+  @JsonIgnore
+  public Set<? extends PartitionId> getSubUnitIdSet() {
+    return getSubUnitMap().keySet();
+  }
+
+  @Override
+  @JsonIgnore
+  public Partition getSubUnit(PartitionId subUnitId) {
+    return getSubUnitMap().get(subUnitId);
+  }
+
+  @Override
+  public RebalancerRef getRebalancerRef() {
+    return _rebalancerRef;
+  }
+
+  /**
+   * Set the reference to the class used to rebalance this resource
+   * @param rebalancerRef RebalancerRef instance
+   */
+  public void setRebalancerRef(RebalancerRef rebalancerRef) {
+    _rebalancerRef = rebalancerRef;
+  }
+
+  /**
+   * Safely cast a RebalancerConfig into a subtype
+   * @param config RebalancerConfig object
+   * @param clazz the target class
+   * @return An instance of clazz, or null if the conversion is not possible
+   */
+  public static <T extends RebalancerConfig> T convert(RebalancerConfig config, Class<T> clazz) {
+    try {
+      return clazz.cast(config);
+    } catch (ClassCastException e) {
+      return null;
+    }
+  }
+
+  /**
+   * Abstract builder for the base rebalancer config
+   */
+  public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
+    private final ResourceId _resourceId;
+    private StateModelDefId _stateModelDefId;
+    private StateModelFactoryId _stateModelFactoryId;
+    private String _participantGroupTag;
+    private Class<? extends StringSerializer> _serializerClass;
+    private RebalancerRef _rebalancerRef;
+
+    /**
+     * Instantiate with a resource id
+     * @param resourceId resource id
+     */
+    public AbstractBuilder(ResourceId resourceId) {
+      _resourceId = resourceId;
+      _serializerClass = DefaultStringSerializer.class;
+    }
+
+    /**
+     * Set the state model definition that the resource should follow
+     * @param stateModelDefId state model definition id
+     * @return Builder
+     */
+    public T stateModelDefId(StateModelDefId stateModelDefId) {
+      _stateModelDefId = stateModelDefId;
+      return self();
+    }
+
+    /**
+     * Set the state model factory that the resource should use
+     * @param stateModelFactoryId state model factory id
+     * @return Builder
+     */
+    public T stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+      _stateModelFactoryId = stateModelFactoryId;
+      return self();
+    }
+
+    /**
+     * Set the tag that all participants require in order to serve this resource
+     * @param participantGroupTag the tag
+     * @return Builder
+     */
+    public T participantGroupTag(String participantGroupTag) {
+      _participantGroupTag = participantGroupTag;
+      return self();
+    }
+
+    /**
+     * Set the serializer class for this rebalancer config
+     * @param serializerClass class that implements StringSerializer
+     * @return Builder
+     */
+    public T serializerClass(Class<? extends StringSerializer> serializerClass) {
+      _serializerClass = serializerClass;
+      return self();
+    }
+
+    /**
+     * Specify a custom class to use for rebalancing
+     * @param rebalancerRef RebalancerRef instance
+     * @return Builder
+     */
+    public T rebalancerRef(RebalancerRef rebalancerRef) {
+      _rebalancerRef = rebalancerRef;
+      return self();
+    }
+
+    /**
+     * Update an existing context with base fields
+     * @param config derived config
+     */
+    protected final void update(BasicRebalancerConfig config) {
+      config.setResourceId(_resourceId);
+      config.setStateModelDefId(_stateModelDefId);
+      config.setStateModelFactoryId(_stateModelFactoryId);
+      config.setParticipantGroupTag(_participantGroupTag);
+      config.setSerializerClass(_serializerClass);
+      config.setRebalancerRef(_rebalancerRef);
+    }
+
+    /**
+     * Get a typed reference to "this" class. Final derived classes should simply return the this
+     * reference.
+     * @return this for the most specific type
+     */
+    protected abstract T self();
+
+    /**
+     * Get the rebalancer config from the built fields
+     * @return RebalancerConfig
+     */
+    public abstract RebalancerConfig build();
+  }
+}