You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:27 UTC

[19/53] [abbrv] [HELIX-100] Improve the helix config api

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java
deleted file mode 100644
index d9517a9..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package org.apache.helix.api;
-
-import java.util.Map;
-
-import org.apache.helix.model.IdealState.RebalanceMode;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Configuration properties for the USER_DEFINED rebalancer. If additional fields are necessary, all
- * getters and setters for simple, list, and map fields are available.
- */
-public final class UserDefinedRebalancerConfig extends RebalancerConfig {
-  public enum Fields {
-    REBALANCER_CLASS_NAME
-  }
-
-  /**
-   * Instantiate a new config for USER_DEFINED
-   * @param resourceId the resource to rebalance
-   * @param stateModelDefId the state model that the resource follows
-   * @param partitionMap map of partition id to partition
-   * @param rebalancerRef instantiated rebalancer reference
-   */
-  public UserDefinedRebalancerConfig(ResourceId resourceId, StateModelDefId stateModelDefId,
-      Map<PartitionId, Partition> partitionMap, RebalancerRef rebalancerRef) {
-    super(resourceId, RebalanceMode.USER_DEFINED, stateModelDefId, partitionMap);
-    setSimpleField(Fields.REBALANCER_CLASS_NAME.toString(), rebalancerRef.toString());
-  }
-
-  /**
-   * Instantiate from a base RebalancerConfig
-   * @param config populated rebalancer config
-   */
-  private UserDefinedRebalancerConfig(RebalancerConfig config) {
-    super(config);
-  }
-
-  /**
-   * Get a reference to the class used to rebalance this resource
-   * @return RebalancerRef, or null if none set
-   */
-  public RebalancerRef getRebalancerRef() {
-    String rebalancerClassName = getStringField(Fields.REBALANCER_CLASS_NAME.toString(), null);
-    if (rebalancerClassName != null) {
-      return RebalancerRef.from(rebalancerClassName);
-    }
-    return null;
-  }
-
-  /**
-   * Get a UserDefinedRebalancerConfig from a RebalancerConfig
-   * @param config populated RebalancerConfig
-   * @return UserDefinedRebalancerConfig
-   */
-  public static UserDefinedRebalancerConfig from(RebalancerConfig config) {
-    return new UserDefinedRebalancerConfig(config);
-  }
-
-  /**
-   * Assembler for a USER_DEFINED configuration
-   */
-  public static class Builder extends RebalancerConfig.Builder<Builder> {
-    private RebalancerRef _rebalancerRef;
-
-    /**
-     * Build for a specific resource
-     * @param resourceId the resource to rebalance
-     */
-    public Builder(ResourceId resourceId) {
-      super(resourceId);
-    }
-
-    /**
-     * Construct a builder using an existing user-defined rebalancer config
-     * @param config
-     */
-		public Builder(UserDefinedRebalancerConfig config) {
-			super(config);
-			_rebalancerRef = config.getRebalancerRef();
-		}
-
-    public Builder rebalancerRef(RebalancerRef rebalancerRef) {
-      _rebalancerRef = rebalancerRef;
-      return this;
-    }
-
-    @Override
-    public UserDefinedRebalancerConfig build() {
-      if (_partitionMap.isEmpty()) {
-        addPartitions(1);
-      }
-      UserDefinedRebalancerConfig config =
-          new UserDefinedRebalancerConfig(_resourceId, _stateModelDefId, _partitionMap,
-              _rebalancerRef);
-      update(config);
-      return config;
-    }
-
-    @Override
-    protected Builder self() {
-      return this;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
deleted file mode 100644
index 1972f03..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
+++ /dev/null
@@ -1,203 +0,0 @@
-package org.apache.helix.controller.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.FullAutoRebalancerConfig;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.State;
-import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.collect.Lists;
-
-/**
- * This is a Rebalancer specific to full automatic mode. It is tasked with computing the ideal
- * state of a resource, fully adapting to the addition or removal of instances. This includes
- * computation of a new preference list and a partition to instance and state mapping based on the
- * computed instance preferences.
- * The input is the current assignment of partitions to instances, as well as existing instance
- * preferences, if any.
- * The output is a preference list and a mapping based on that preference list, i.e. partition p
- * has a replica on node k with state s.
- */
-public class NewAutoRebalancer implements NewRebalancer<FullAutoRebalancerConfig> {
-  // These should be final, but are initialized in init rather than a constructor
-  private AutoRebalanceStrategy _algorithm;
-
-  private static final Logger LOG = Logger.getLogger(NewAutoRebalancer.class);
-
-  @Override
-  public ResourceAssignment computeResourceMapping(FullAutoRebalancerConfig config,
-      Cluster cluster, ResourceCurrentState currentState) {
-    StateModelDefinition stateModelDef =
-        cluster.getStateModelMap().get(config.getStateModelDefId());
-    // Compute a preference list based on the current ideal state
-    List<PartitionId> partitions = new ArrayList<PartitionId>(config.getPartitionSet());
-    List<String> partitionNames = Lists.transform(partitions, Functions.toStringFunction());
-    Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
-    Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
-    int replicas = -1;
-    if (config.canAssignAnyLiveParticipant()) {
-      replicas = liveParticipants.size();
-    } else {
-      replicas = config.getReplicaCount();
-    }
-
-    // count how many replicas should be in each state
-    LinkedHashMap<State, Integer> stateCountMap =
-        NewConstraintBasedAssignment.stateCount(cluster.getConfig(), config.getResourceId(),
-            stateModelDef, liveParticipants.size(), replicas);
-    LinkedHashMap<String, Integer> rawStateCountMap = new LinkedHashMap<String, Integer>();
-    for (State state : stateCountMap.keySet()) {
-      rawStateCountMap.put(state.toString(), stateCountMap.get(state));
-    }
-
-    // get the participant lists
-    List<ParticipantId> liveParticipantList =
-        new ArrayList<ParticipantId>(liveParticipants.keySet());
-    List<ParticipantId> allParticipantList =
-        new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
-    List<String> liveNodes = Lists.transform(liveParticipantList, Functions.toStringFunction());
-
-    // compute the current mapping from the current state
-    Map<PartitionId, Map<ParticipantId, State>> currentMapping =
-        currentMapping(config, currentState, stateCountMap);
-
-    // If there are nodes tagged with resource, use only those nodes
-    Set<String> taggedNodes = new HashSet<String>();
-    if (config.getParticipantGroupTag() != null) {
-      for (ParticipantId participantId : liveParticipantList) {
-        if (liveParticipants.get(participantId).hasTag(config.getParticipantGroupTag())) {
-          taggedNodes.add(participantId.stringify());
-        }
-      }
-    }
-    if (taggedNodes.size() > 0) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("found the following instances with tag " + config.getResourceId() + " "
-            + taggedNodes);
-      }
-      liveNodes = new ArrayList<String>(taggedNodes);
-    }
-
-    // determine which nodes the replicas should live on
-    List<String> allNodes = Lists.transform(allParticipantList, Functions.toStringFunction());
-    int maxPartition = config.getMaxPartitionsPerParticipant();
-    if (LOG.isInfoEnabled()) {
-      LOG.info("currentMapping: " + currentMapping);
-      LOG.info("stateCountMap: " + stateCountMap);
-      LOG.info("liveNodes: " + liveNodes);
-      LOG.info("allNodes: " + allNodes);
-      LOG.info("maxPartition: " + maxPartition);
-    }
-    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
-    _algorithm =
-        new AutoRebalanceStrategy(config.getResourceId().stringify(), partitionNames,
-            rawStateCountMap, maxPartition, placementScheme);
-    ZNRecord newMapping =
-        _algorithm.computePartitionAssignment(liveNodes,
-            ResourceAssignment.stringMapsFromReplicaMaps(currentMapping), allNodes);
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("newMapping: " + newMapping);
-    }
-
-    // compute a full partition mapping for the resource
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + config.getResourceId());
-    }
-    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
-    for (PartitionId partition : partitions) {
-      Set<ParticipantId> disabledParticipantsForPartition =
-          NewConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition);
-      List<String> rawPreferenceList = newMapping.getListField(partition.stringify());
-      if (rawPreferenceList == null) {
-        rawPreferenceList = Collections.emptyList();
-      }
-      List<ParticipantId> preferenceList =
-          Lists.transform(rawPreferenceList, new Function<String, ParticipantId>() {
-            @Override
-            public ParticipantId apply(String participantName) {
-              return ParticipantId.from(participantName);
-            }
-          });
-      preferenceList =
-          NewConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
-      Map<ParticipantId, State> bestStateForPartition =
-          NewConstraintBasedAssignment.computeAutoBestStateForPartition(cluster.getConfig(),
-              config.getResourceId(), liveParticipants, stateModelDef, preferenceList,
-              currentState.getCurrentStateMap(config.getResourceId(), partition),
-              disabledParticipantsForPartition);
-      partitionMapping.addReplicaMap(partition, bestStateForPartition);
-    }
-    return partitionMapping;
-  }
-
-  private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
-      FullAutoRebalancerConfig config, ResourceCurrentState currentStateOutput,
-      Map<State, Integer> stateCountMap) {
-    Map<PartitionId, Map<ParticipantId, State>> map =
-        new HashMap<PartitionId, Map<ParticipantId, State>>();
-
-    for (PartitionId partition : config.getPartitionSet()) {
-      Map<ParticipantId, State> curStateMap =
-          currentStateOutput.getCurrentStateMap(config.getResourceId(), partition);
-      map.put(partition, new HashMap<ParticipantId, State>());
-      for (ParticipantId node : curStateMap.keySet()) {
-        State state = curStateMap.get(node);
-        if (stateCountMap.containsKey(state)) {
-          map.get(partition).put(node, state);
-        }
-      }
-
-      Map<ParticipantId, State> pendingStateMap =
-          currentStateOutput.getPendingStateMap(config.getResourceId(), partition);
-      for (ParticipantId node : pendingStateMap.keySet()) {
-        State state = pendingStateMap.get(node);
-        if (stateCountMap.containsKey(state)) {
-          map.get(partition).put(node, state);
-        }
-      }
-    }
-    return map;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
deleted file mode 100644
index 5570c36..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
+++ /dev/null
@@ -1,125 +0,0 @@
-package org.apache.helix.controller.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.CustomRebalancerConfig;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.State;
-import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * This is a Rebalancer specific to custom mode. It is tasked with checking an existing mapping of
- * partitions against the set of live instances to mark assignment states as dropped or erroneous
- * as necessary.
- * The input is the required current assignment of partitions to instances, as well as the required
- * existing instance preferences.
- * The output is a verified mapping based on that preference list, i.e. partition p has a replica
- * on node k with state s, where s may be a dropped or error state if necessary.
- */
-public class NewCustomRebalancer implements NewRebalancer<CustomRebalancerConfig> {
-
-  private static final Logger LOG = Logger.getLogger(NewCustomRebalancer.class);
-
-  @Override
-  public ResourceAssignment computeResourceMapping(CustomRebalancerConfig config, Cluster cluster,
-      ResourceCurrentState currentState) {
-    StateModelDefinition stateModelDef =
-        cluster.getStateModelMap().get(config.getStateModelDefId());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + config.getResourceId());
-    }
-    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
-    for (PartitionId partition : config.getPartitionSet()) {
-      Map<ParticipantId, State> currentStateMap =
-          currentState.getCurrentStateMap(config.getResourceId(), partition);
-      Set<ParticipantId> disabledInstancesForPartition =
-          NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
-              partition);
-      Map<ParticipantId, State> bestStateForPartition =
-          computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap(), stateModelDef,
-              config.getPreferenceMap(partition), currentStateMap, disabledInstancesForPartition);
-      partitionMapping.addReplicaMap(partition, bestStateForPartition);
-    }
-    return partitionMapping;
-  }
-
-  /**
-   * compute best state for resource in CUSTOMIZED rebalancer mode
-   * @param liveParticipantMap
-   * @param stateModelDef
-   * @param idealStateMap
-   * @param currentStateMap
-   * @param disabledParticipantsForPartition
-   * @return
-   */
-  private Map<ParticipantId, State> computeCustomizedBestStateForPartition(
-      Map<ParticipantId, Participant> liveParticipantMap, StateModelDefinition stateModelDef,
-      Map<ParticipantId, State> idealStateMap, Map<ParticipantId, State> currentStateMap,
-      Set<ParticipantId> disabledParticipantsForPartition) {
-    Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
-
-    // if the resource is deleted, idealStateMap will be null/empty and
-    // we should drop all resources.
-    if (currentStateMap != null) {
-      for (ParticipantId participantId : currentStateMap.keySet()) {
-        if ((idealStateMap == null || !idealStateMap.containsKey(participantId))
-            && !disabledParticipantsForPartition.contains(participantId)) {
-          // if dropped and not disabled, transit to DROPPED
-          participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
-        } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
-            participantId).equals(State.from(HelixDefinedState.ERROR)))
-            && disabledParticipantsForPartition.contains(participantId)) {
-          // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
-          participantStateMap.put(participantId, stateModelDef.getInitialState());
-        }
-      }
-    }
-
-    // ideal state is deleted
-    if (idealStateMap == null) {
-      return participantStateMap;
-    }
-
-    for (ParticipantId participantId : idealStateMap.keySet()) {
-      boolean notInErrorState =
-          currentStateMap == null || currentStateMap.get(participantId) == null
-              || !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
-
-      if (liveParticipantMap.containsKey(participantId) && notInErrorState
-          && !disabledParticipantsForPartition.contains(participantId)) {
-        participantStateMap.put(participantId, idealStateMap.get(participantId));
-      }
-    }
-
-    return participantStateMap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
deleted file mode 100644
index 4792877..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package org.apache.helix.controller.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.RebalancerConfig;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.model.ResourceAssignment;
-
-/**
- * Arbitrary configurable rebalancer interface.
- * @see {@link NewUserDefinedRebalancer} for an interface with a plugged-in class
- */
-interface NewRebalancer<T extends RebalancerConfig> {
-
-  /**
-   * Given a resource, existing mapping, and liveness of resources, compute a new mapping of
-   * resources.
-   * @param rebalancerConfig resource properties used by the rebalancer
-   * @param cluster a snapshot of the entire cluster state
-   * @param currentState a combination of the current states and pending current states
-   */
-  ResourceAssignment computeResourceMapping(final T rebalancerConfig, final Cluster cluster,
-      final ResourceCurrentState currentState);
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
deleted file mode 100644
index 3153623..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package org.apache.helix.controller.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.SemiAutoRebalancerConfig;
-import org.apache.helix.api.State;
-import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * This is a Rebalancer specific to semi-automatic mode. It is tasked with computing the ideal
- * state of a resource based on a predefined preference list of instances willing to accept
- * replicas.
- * The input is the optional current assignment of partitions to instances, as well as the required
- * existing instance preferences.
- * The output is a mapping based on that preference list, i.e. partition p has a replica on node k
- * with state s.
- */
-public class NewSemiAutoRebalancer implements NewRebalancer<SemiAutoRebalancerConfig> {
-
-  private static final Logger LOG = Logger.getLogger(NewSemiAutoRebalancer.class);
-
-  @Override
-  public ResourceAssignment computeResourceMapping(SemiAutoRebalancerConfig config,
-      Cluster cluster, ResourceCurrentState currentState) {
-    StateModelDefinition stateModelDef =
-        cluster.getStateModelMap().get(config.getStateModelDefId());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + config.getResourceId());
-    }
-    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
-    for (PartitionId partition : config.getPartitionSet()) {
-      Map<ParticipantId, State> currentStateMap =
-          currentState.getCurrentStateMap(config.getResourceId(), partition);
-      Set<ParticipantId> disabledInstancesForPartition =
-          NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
-              partition);
-      List<ParticipantId> preferenceList =
-          NewConstraintBasedAssignment.getPreferenceList(cluster, partition,
-              config.getPreferenceList(partition));
-      Map<ParticipantId, State> bestStateForPartition =
-          NewConstraintBasedAssignment.computeAutoBestStateForPartition(cluster.getConfig(),
-              config.getResourceId(), cluster.getLiveParticipantMap(), stateModelDef,
-              preferenceList, currentStateMap, disabledInstancesForPartition);
-      partitionMapping.addReplicaMap(partition, bestStateForPartition);
-    }
-    return partitionMapping;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewUserDefinedRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewUserDefinedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewUserDefinedRebalancer.java
deleted file mode 100644
index ae7bba4..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewUserDefinedRebalancer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.helix.controller.rebalancer;
-
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.UserDefinedRebalancerConfig;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.model.ResourceAssignment;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Allows one to come up with a user-defined implementation of a rebalancer.<br/>
- * This will be invoked on all changes that happen in the cluster.<br/>
- * Simply return the resourceMapping for a resource in this method.<br/>
- */
-public interface NewUserDefinedRebalancer extends NewRebalancer<UserDefinedRebalancerConfig> {
-  @Override
-  ResourceAssignment computeResourceMapping(final UserDefinedRebalancerConfig rebalancerConfig,
-      final Cluster cluster, final ResourceCurrentState currentState);
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
new file mode 100644
index 0000000..be3a280
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
@@ -0,0 +1,239 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.Set;
+
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.StateModelFactoryId;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Abstract RebalancerContext that functions for generic subunits. Use a subclass that more
+ * concretely defines the subunits.
+ */
+public abstract class BasicRebalancerContext implements RebalancerContext {
+  private ResourceId _resourceId;
+  private StateModelDefId _stateModelDefId;
+  private StateModelFactoryId _stateModelFactoryId;
+  private String _participantGroupTag;
+  private Class<? extends ContextSerializer> _serializer;
+  private RebalancerRef _rebalancerRef;
+
+  /**
+   * Instantiate a basic rebalancer context
+   */
+  public BasicRebalancerContext() {
+    _serializer = DefaultContextSerializer.class;
+  }
+
+  @Override
+  public ResourceId getResourceId() {
+    return _resourceId;
+  }
+
+  /**
+   * Set the resource to rebalance
+   * @param resourceId resource id
+   */
+  public void setResourceId(ResourceId resourceId) {
+    _resourceId = resourceId;
+  }
+
+  @Override
+  public StateModelDefId getStateModelDefId() {
+    return _stateModelDefId;
+  }
+
+  /**
+   * Set the state model definition that the resource follows
+   * @param stateModelDefId state model definition id
+   */
+  public void setStateModelDefId(StateModelDefId stateModelDefId) {
+    _stateModelDefId = stateModelDefId;
+  }
+
+  @Override
+  public StateModelFactoryId getStateModelFactoryId() {
+    return _stateModelFactoryId;
+  }
+
+  /**
+   * Set the state model factory that the resource uses
+   * @param stateModelFactoryId state model factory id
+   */
+  public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+    _stateModelFactoryId = stateModelFactoryId;
+  }
+
+  @Override
+  public String getParticipantGroupTag() {
+    return _participantGroupTag;
+  }
+
+  /**
+   * Set a tag that participants must have in order to serve this resource
+   * @param participantGroupTag string group tag
+   */
+  public void setParticipantGroupTag(String participantGroupTag) {
+    _participantGroupTag = participantGroupTag;
+  }
+
+  /**
+   * Get the serializer. If none is provided, {@link DefaultContextSerializer} is used
+   */
+  @Override
+  public Class<? extends ContextSerializer> getSerializerClass() {
+    return _serializer;
+  }
+
+  /**
+   * Set the class that can serialize this context
+   * @param serializer serializer class that implements ContextSerializer
+   */
+  public void setSerializerClass(Class<? extends ContextSerializer> serializer) {
+    _serializer = serializer;
+  }
+
+  @Override
+  @JsonIgnore
+  public Set<? extends PartitionId> getSubUnitIdSet() {
+    return getSubUnitMap().keySet();
+  }
+
+  @Override
+  @JsonIgnore
+  public Partition getSubUnit(PartitionId subUnitId) {
+    return getSubUnitMap().get(subUnitId);
+  }
+
+  @Override
+  public RebalancerRef getRebalancerRef() {
+    return _rebalancerRef;
+  }
+
+  /**
+   * Set the reference to the class used to rebalance this resource
+   * @param rebalancerRef RebalancerRef instance
+   */
+  public void setRebalancerRef(RebalancerRef rebalancerRef) {
+    _rebalancerRef = rebalancerRef;
+  }
+
+  /**
+   * Abstract builder for the base rebalancer context
+   */
+  public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
+    private final ResourceId _resourceId;
+    private StateModelDefId _stateModelDefId;
+    private StateModelFactoryId _stateModelFactoryId;
+    private String _participantGroupTag;
+    private Class<? extends ContextSerializer> _serializerClass;
+    private RebalancerRef _rebalancerRef;
+
+    /**
+     * Instantiate with a resource id
+     * @param resourceId resource id
+     */
+    public AbstractBuilder(ResourceId resourceId) {
+      _resourceId = resourceId;
+      _serializerClass = DefaultContextSerializer.class;
+    }
+
+    /**
+     * Set the state model definition that the resource should follow
+     * @param stateModelDefId state model definition id
+     * @return Builder
+     */
+    public T stateModelDefId(StateModelDefId stateModelDefId) {
+      _stateModelDefId = stateModelDefId;
+      return self();
+    }
+
+    /**
+     * Set the state model factory that the resource should use
+     * @param stateModelFactoryId state model factory id
+     * @return Builder
+     */
+    public T stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+      _stateModelFactoryId = stateModelFactoryId;
+      return self();
+    }
+
+    /**
+     * Set the tag that all participants require in order to serve this resource
+     * @param participantGroupTag the tag
+     * @return Builder
+     */
+    public T participantGroupTag(String participantGroupTag) {
+      _participantGroupTag = participantGroupTag;
+      return self();
+    }
+
+    /**
+     * Set the serializer class for this rebalancer context
+     * @param serializerClass class that implements ContextSerializer
+     * @return Builder
+     */
+    public T serializerClass(Class<? extends ContextSerializer> serializerClass) {
+      _serializerClass = serializerClass;
+      return self();
+    }
+
+    /**
+     * Specify a custom class to use for rebalancing
+     * @param rebalancerRef RebalancerRef instance
+     * @return Builder
+     */
+    public T rebalancerRef(RebalancerRef rebalancerRef) {
+      _rebalancerRef = rebalancerRef;
+      return self();
+    }
+
+    /**
+     * Update an existing context with base fields
+     * @param context derived context
+     */
+    protected final void update(BasicRebalancerContext context) {
+      context.setResourceId(_resourceId);
+      context.setStateModelDefId(_stateModelDefId);
+      context.setStateModelFactoryId(_stateModelFactoryId);
+      context.setParticipantGroupTag(_participantGroupTag);
+      context.setSerializerClass(_serializerClass);
+      context.setRebalancerRef(_rebalancerRef);
+    }
+
+    /**
+     * Get a typed reference to "this" class. Final derived classes should simply return the this
+     * reference.
+     * @return this for the most specific type
+     */
+    protected abstract T self();
+
+    /**
+     * Get the rebalancer context from the built fields
+     * @return RebalancerContext
+     */
+    public abstract RebalancerContext build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java
new file mode 100644
index 0000000..ef12a09
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java
@@ -0,0 +1,37 @@
+package org.apache.helix.controller.rebalancer.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public interface ContextSerializer {
+  /**
+   * Convert a RebalancerContext object instance to a String
+   * @param data instance of the rebalancer context type
+   * @return String representing the object
+   */
+  public <T> String serialize(final T data);
+
+  /**
+   * Convert raw bytes to a generic object instance
+   * @param clazz The class represented by the deserialized string
+   * @param string String representing the object
+   * @return instance of the generic type or null if the conversion failed
+   */
+  public <T> T deserialize(final Class<T> clazz, final String string);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java
new file mode 100644
index 0000000..97ea96a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java
@@ -0,0 +1,123 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.State;
+import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class CustomRebalancer implements Rebalancer {
+
+  private static final Logger LOG = Logger.getLogger(CustomRebalancer.class);
+
+  @Override
+  public void init(HelixManager helixManager) {
+    // do nothing
+  }
+
+  @Override
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster,
+      ResourceCurrentState currentState) {
+    CustomRebalancerContext config =
+        rebalancerConfig.getRebalancerContext(CustomRebalancerContext.class);
+    StateModelDefinition stateModelDef =
+        cluster.getStateModelMap().get(config.getStateModelDefId());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing resource:" + config.getResourceId());
+    }
+    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+    for (PartitionId partition : config.getPartitionSet()) {
+      Map<ParticipantId, State> currentStateMap =
+          currentState.getCurrentStateMap(config.getResourceId(), partition);
+      Set<ParticipantId> disabledInstancesForPartition =
+          NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+              partition);
+      Map<ParticipantId, State> bestStateForPartition =
+          computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap(), stateModelDef,
+              config.getPreferenceMap(partition), currentStateMap, disabledInstancesForPartition);
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
+    }
+    return partitionMapping;
+  }
+
+  /**
+   * compute best state for resource in CUSTOMIZED rebalancer mode
+   * @param liveParticipantMap
+   * @param stateModelDef
+   * @param idealStateMap
+   * @param currentStateMap
+   * @param disabledParticipantsForPartition
+   * @return
+   */
+  private Map<ParticipantId, State> computeCustomizedBestStateForPartition(
+      Map<ParticipantId, Participant> liveParticipantMap, StateModelDefinition stateModelDef,
+      Map<ParticipantId, State> idealStateMap, Map<ParticipantId, State> currentStateMap,
+      Set<ParticipantId> disabledParticipantsForPartition) {
+    Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
+
+    // if the resource is deleted, idealStateMap will be null/empty and
+    // we should drop all resources.
+    if (currentStateMap != null) {
+      for (ParticipantId participantId : currentStateMap.keySet()) {
+        if ((idealStateMap == null || !idealStateMap.containsKey(participantId))
+            && !disabledParticipantsForPartition.contains(participantId)) {
+          // if dropped and not disabled, transit to DROPPED
+          participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
+        } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
+            participantId).equals(State.from(HelixDefinedState.ERROR)))
+            && disabledParticipantsForPartition.contains(participantId)) {
+          // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
+          participantStateMap.put(participantId, stateModelDef.getInitialState());
+        }
+      }
+    }
+
+    // ideal state is deleted
+    if (idealStateMap == null) {
+      return participantStateMap;
+    }
+
+    for (ParticipantId participantId : idealStateMap.keySet()) {
+      boolean notInErrorState =
+          currentStateMap == null || currentStateMap.get(participantId) == null
+              || !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
+
+      if (liveParticipantMap.containsKey(participantId) && notInErrorState
+          && !disabledParticipantsForPartition.contains(participantId)) {
+        participantStateMap.put(participantId, idealStateMap.get(participantId));
+      }
+    }
+
+    return participantStateMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
new file mode 100644
index 0000000..c1efe81
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
@@ -0,0 +1,114 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.Map;
+
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.testng.collections.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerContext for a resource that should be rebalanced in CUSTOMIZED mode. By default, it
+ * corresponds to {@link CustomRebalancer}
+ */
+public class CustomRebalancerContext extends PartitionedRebalancerContext {
+  private Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
+
+  /**
+   * Instantiate a CustomRebalancerContext
+   */
+  public CustomRebalancerContext() {
+    super(RebalanceMode.CUSTOMIZED);
+    setRebalancerRef(RebalancerRef.from(CustomRebalancer.class));
+    _preferenceMaps = Maps.newHashMap();
+  }
+
+  /**
+   * Get the preference maps of the partitions and replicas of the resource
+   * @return map of partition to participant and state
+   */
+  public Map<PartitionId, Map<ParticipantId, State>> getPreferenceMaps() {
+    return _preferenceMaps;
+  }
+
+  /**
+   * Set the preference maps of the partitions and replicas of the resource
+   * @param preferenceMaps map of partition to participant and state
+   */
+  public void setPreferenceMaps(Map<PartitionId, Map<ParticipantId, State>> preferenceMaps) {
+    _preferenceMaps = preferenceMaps;
+  }
+
+  /**
+   * Get the preference map of a partition
+   * @param partitionId the partition to look up
+   * @return map of participant to state
+   */
+  @JsonIgnore
+  public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
+    return _preferenceMaps.get(partitionId);
+  }
+
+  /**
+   * Build a CustomRebalancerContext. By default, it corresponds to {@link CustomRebalancer}
+   */
+  public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
+    private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
+
+    /**
+     * Instantiate for a resource
+     * @param resourceId resource id
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+      super.rebalancerRef(RebalancerRef.from(CustomRebalancer.class));
+      _preferenceMaps = Maps.newHashMap();
+    }
+
+    /**
+     * Add a preference map for a partition
+     * @param partitionId partition to set
+     * @param preferenceList map of participant id to state indicating where replicas are served
+     * @return Builder
+     */
+    public Builder preferenceMap(PartitionId partitionId, Map<ParticipantId, State> preferenceMap) {
+      _preferenceMaps.put(partitionId, preferenceMap);
+      return self();
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    @Override
+    public CustomRebalancerContext build() {
+      CustomRebalancerContext context = new CustomRebalancerContext();
+      super.update(context);
+      context.setPreferenceMaps(_preferenceMaps);
+      return context;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java
new file mode 100644
index 0000000..ecc93fb
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java
@@ -0,0 +1,83 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.io.ByteArrayInputStream;
+import java.io.StringWriter;
+
+import org.apache.helix.HelixException;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Default serializer implementation for RebalancerContexts. Uses the Jackson JSON library to
+ * convert to and from strings
+ */
+public class DefaultContextSerializer implements ContextSerializer {
+
+  private static Logger logger = Logger.getLogger(DefaultContextSerializer.class);
+
+  @Override
+  public <T> String serialize(final T data) {
+    if (data == null) {
+      return null;
+    }
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+    serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+    serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    StringWriter sw = new StringWriter();
+    try {
+      mapper.writeValue(sw, data);
+    } catch (Exception e) {
+      logger.error("Exception during payload data serialization.", e);
+      throw new HelixException(e);
+    }
+    return sw.toString();
+  }
+
+  @Override
+  public <T> T deserialize(final Class<T> clazz, final String string) {
+    if (string == null || string.length() == 0) {
+      return null;
+    }
+
+    ObjectMapper mapper = new ObjectMapper();
+    ByteArrayInputStream bais = new ByteArrayInputStream(string.getBytes());
+
+    DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_CREATORS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
+    deserializationConfig.set(DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    try {
+      T payload = mapper.readValue(bais, clazz);
+      return payload;
+    } catch (Exception e) {
+      logger.error("Exception during deserialization of payload bytes: " + string, e);
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancer.java
new file mode 100644
index 0000000..a1b8406
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancer.java
@@ -0,0 +1,199 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.State;
+import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.collect.Lists;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class FullAutoRebalancer implements Rebalancer {
+  // These should be final, but are initialized in init rather than a constructor
+  private AutoRebalanceStrategy _algorithm;
+
+  private static Logger LOG = Logger.getLogger(FullAutoRebalancer.class);
+
+  @Override
+  public void init(HelixManager helixManager) {
+    // do nothing
+  }
+
+  @Override
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster,
+      ResourceCurrentState currentState) {
+    FullAutoRebalancerContext config =
+        rebalancerConfig.getRebalancerContext(FullAutoRebalancerContext.class);
+    StateModelDefinition stateModelDef =
+        cluster.getStateModelMap().get(config.getStateModelDefId());
+    // Compute a preference list based on the current ideal state
+    List<PartitionId> partitions = new ArrayList<PartitionId>(config.getPartitionSet());
+    List<String> partitionNames = Lists.transform(partitions, Functions.toStringFunction());
+    Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
+    Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
+    int replicas = -1;
+    if (config.anyLiveParticipant()) {
+      replicas = liveParticipants.size();
+    } else {
+      replicas = config.getReplicaCount();
+    }
+
+    // count how many replicas should be in each state
+    LinkedHashMap<State, Integer> stateCountMap =
+        NewConstraintBasedAssignment.stateCount(cluster.getConfig(), config.getResourceId(),
+            stateModelDef, liveParticipants.size(), replicas);
+    LinkedHashMap<String, Integer> rawStateCountMap = new LinkedHashMap<String, Integer>();
+    for (State state : stateCountMap.keySet()) {
+      rawStateCountMap.put(state.toString(), stateCountMap.get(state));
+    }
+
+    // get the participant lists
+    List<ParticipantId> liveParticipantList =
+        new ArrayList<ParticipantId>(liveParticipants.keySet());
+    List<ParticipantId> allParticipantList =
+        new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
+    List<String> liveNodes = Lists.transform(liveParticipantList, Functions.toStringFunction());
+
+    // compute the current mapping from the current state
+    Map<PartitionId, Map<ParticipantId, State>> currentMapping =
+        currentMapping(config, currentState, stateCountMap);
+
+    // If there are nodes tagged with resource, use only those nodes
+    Set<String> taggedNodes = new HashSet<String>();
+    if (config.getParticipantGroupTag() != null) {
+      for (ParticipantId participantId : liveParticipantList) {
+        if (liveParticipants.get(participantId).hasTag(config.getParticipantGroupTag())) {
+          taggedNodes.add(participantId.stringify());
+        }
+      }
+    }
+    if (taggedNodes.size() > 0) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("found the following instances with tag " + config.getResourceId() + " "
+            + taggedNodes);
+      }
+      liveNodes = new ArrayList<String>(taggedNodes);
+    }
+
+    // determine which nodes the replicas should live on
+    List<String> allNodes = Lists.transform(allParticipantList, Functions.toStringFunction());
+    int maxPartition = config.getMaxPartitionsPerParticipant();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("currentMapping: " + currentMapping);
+      LOG.info("stateCountMap: " + stateCountMap);
+      LOG.info("liveNodes: " + liveNodes);
+      LOG.info("allNodes: " + allNodes);
+      LOG.info("maxPartition: " + maxPartition);
+    }
+    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+    _algorithm =
+        new AutoRebalanceStrategy(config.getResourceId().stringify(), partitionNames,
+            rawStateCountMap, maxPartition, placementScheme);
+    ZNRecord newMapping =
+        _algorithm.computePartitionAssignment(liveNodes,
+            ResourceAssignment.stringMapsFromReplicaMaps(currentMapping), allNodes);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("newMapping: " + newMapping);
+    }
+
+    // compute a full partition mapping for the resource
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing resource:" + config.getResourceId());
+    }
+    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+    for (PartitionId partition : partitions) {
+      Set<ParticipantId> disabledParticipantsForPartition =
+          NewConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition);
+      List<String> rawPreferenceList = newMapping.getListField(partition.stringify());
+      if (rawPreferenceList == null) {
+        rawPreferenceList = Collections.emptyList();
+      }
+      List<ParticipantId> preferenceList =
+          Lists.transform(rawPreferenceList, new Function<String, ParticipantId>() {
+            @Override
+            public ParticipantId apply(String participantName) {
+              return ParticipantId.from(participantName);
+            }
+          });
+      preferenceList =
+          NewConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
+      Map<ParticipantId, State> bestStateForPartition =
+          NewConstraintBasedAssignment.computeAutoBestStateForPartition(cluster.getConfig(),
+              config.getResourceId(), liveParticipants, stateModelDef, preferenceList,
+              currentState.getCurrentStateMap(config.getResourceId(), partition),
+              disabledParticipantsForPartition);
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
+    }
+    return partitionMapping;
+  }
+
+  private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
+      FullAutoRebalancerContext config, ResourceCurrentState currentStateOutput,
+      Map<State, Integer> stateCountMap) {
+    Map<PartitionId, Map<ParticipantId, State>> map =
+        new HashMap<PartitionId, Map<ParticipantId, State>>();
+
+    for (PartitionId partition : config.getPartitionSet()) {
+      Map<ParticipantId, State> curStateMap =
+          currentStateOutput.getCurrentStateMap(config.getResourceId(), partition);
+      map.put(partition, new HashMap<ParticipantId, State>());
+      for (ParticipantId node : curStateMap.keySet()) {
+        State state = curStateMap.get(node);
+        if (stateCountMap.containsKey(state)) {
+          map.get(partition).put(node, state);
+        }
+      }
+
+      Map<ParticipantId, State> pendingStateMap =
+          currentStateOutput.getPendingStateMap(config.getResourceId(), partition);
+      for (ParticipantId node : pendingStateMap.keySet()) {
+        State state = pendingStateMap.get(node);
+        if (stateCountMap.containsKey(state)) {
+          map.get(partition).put(node, state);
+        }
+      }
+    }
+    return map;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
new file mode 100644
index 0000000..2e274be
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
@@ -0,0 +1,61 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.model.IdealState.RebalanceMode;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerContext for FULL_AUTO rebalancing mode. By default, it corresponds to
+ * {@link FullAutoRebalancer}
+ */
+public class FullAutoRebalancerContext extends PartitionedRebalancerContext {
+  public FullAutoRebalancerContext() {
+    super(RebalanceMode.FULL_AUTO);
+    setRebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
+  }
+
+  /**
+   * Builder for a full auto rebalancer context. By default, it corresponds to
+   * {@link FullAutoRebalancer}
+   */
+  public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
+    /**
+     * Instantiate with a resource
+     * @param resourceId resource id
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+      super.rebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    @Override
+    public FullAutoRebalancerContext build() {
+      FullAutoRebalancerContext context = new FullAutoRebalancerContext();
+      super.update(context);
+      return context;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
new file mode 100644
index 0000000..5165ba7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
@@ -0,0 +1,355 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixConstants.StateModelToken;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.testng.collections.Maps;
+
+import com.google.common.collect.ImmutableMap;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerContext for a resource whose subunits are partitions. In addition, these partitions can
+ * be replicated.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class PartitionedRebalancerContext extends BasicRebalancerContext implements
+    ReplicatedRebalancerContext {
+  private Map<PartitionId, Partition> _partitionMap;
+  private boolean _anyLiveParticipant;
+  private int _replicaCount;
+  private int _maxPartitionsPerParticipant;
+  private final RebalanceMode _rebalanceMode;
+
+  /**
+   * Instantiate a DataRebalancerContext
+   */
+  public PartitionedRebalancerContext(RebalanceMode rebalanceMode) {
+    _partitionMap = Collections.emptyMap();
+    _replicaCount = 1;
+    _anyLiveParticipant = false;
+    _maxPartitionsPerParticipant = Integer.MAX_VALUE;
+    _rebalanceMode = rebalanceMode;
+  }
+
+  /**
+   * Get a map from partition id to partition
+   * @return partition map
+   */
+  public Map<PartitionId, Partition> getPartitionMap() {
+    return _partitionMap;
+  }
+
+  /**
+   * Set a map of partition id to partition
+   * @param partitionMap partition map
+   */
+  public void setPartitionMap(Map<PartitionId, Partition> partitionMap) {
+    _partitionMap = ImmutableMap.copyOf(partitionMap);
+  }
+
+  /**
+   * Get the set of partitions for this resource
+   * @return set of partition ids
+   */
+  @JsonIgnore
+  public Set<PartitionId> getPartitionSet() {
+    return _partitionMap.keySet();
+  }
+
+  /**
+   * Get a partition
+   * @param partitionId id of the partition to get
+   * @return Partition object, or null if not present
+   */
+  @JsonIgnore
+  public Partition getPartition(PartitionId partitionId) {
+    return _partitionMap.get(partitionId);
+  }
+
+  @Override
+  public boolean anyLiveParticipant() {
+    return _anyLiveParticipant;
+  }
+
+  /**
+   * Indicate if this resource should be assigned to any live participant
+   * @param anyLiveParticipant true if any live participant expected, false otherwise
+   */
+  public void setAnyLiveParticipant(boolean anyLiveParticipant) {
+    _anyLiveParticipant = anyLiveParticipant;
+  }
+
+  @Override
+  public int getReplicaCount() {
+    return _replicaCount;
+  }
+
+  /**
+   * Set the number of replicas that each partition should have
+   * @param replicaCount
+   */
+  public void setReplicaCount(int replicaCount) {
+    _replicaCount = replicaCount;
+  }
+
+  /**
+   * Get the maximum number of partitions that a participant can serve
+   * @return maximum number of partitions per participant
+   */
+  public int getMaxPartitionsPerParticipant() {
+    return _maxPartitionsPerParticipant;
+  }
+
+  /**
+   * Set the maximum number of partitions that a participant can serve
+   * @param maxPartitionsPerParticipant maximum number of partitions per participant
+   */
+  public void setMaxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
+    _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
+  }
+
+  /**
+   * Get the rebalancer mode of the resource
+   * @return RebalanceMode
+   */
+  public RebalanceMode getRebalanceMode() {
+    return _rebalanceMode;
+  }
+
+  @Override
+  @JsonIgnore
+  public Map<PartitionId, Partition> getSubUnitMap() {
+    return getPartitionMap();
+  }
+
+  /**
+   * Convert a physically-stored IdealState into a rebalancer context for a partitioned resource
+   * @param idealState populated IdealState
+   * @return PartitionRebalancerContext
+   */
+  public static PartitionedRebalancerContext from(IdealState idealState) {
+    PartitionedRebalancerContext context;
+    switch (idealState.getRebalanceMode()) {
+    case FULL_AUTO:
+      FullAutoRebalancerContext.Builder fullAutoBuilder =
+          new FullAutoRebalancerContext.Builder(idealState.getResourceId());
+      populateContext(fullAutoBuilder, idealState);
+      context = fullAutoBuilder.build();
+      break;
+    case SEMI_AUTO:
+      SemiAutoRebalancerContext.Builder semiAutoBuilder =
+          new SemiAutoRebalancerContext.Builder(idealState.getResourceId());
+      for (PartitionId partitionId : idealState.getPartitionSet()) {
+        semiAutoBuilder.preferenceList(partitionId, idealState.getPreferenceList(partitionId));
+      }
+      populateContext(semiAutoBuilder, idealState);
+      context = semiAutoBuilder.build();
+      break;
+    case CUSTOMIZED:
+      CustomRebalancerContext.Builder customBuilder =
+          new CustomRebalancerContext.Builder(idealState.getResourceId());
+      for (PartitionId partitionId : idealState.getPartitionSet()) {
+        customBuilder.preferenceMap(partitionId, idealState.getParticipantStateMap(partitionId));
+      }
+      populateContext(customBuilder, idealState);
+      context = customBuilder.build();
+      break;
+    default:
+      Builder baseBuilder = new Builder(idealState.getResourceId());
+      populateContext(baseBuilder, idealState);
+      context = baseBuilder.build();
+      break;
+    }
+    return context;
+  }
+
+  /**
+   * Update a builder subclass with all the fields of the ideal state
+   * @param builder builder that extends AbstractBuilder
+   * @param idealState populated IdealState
+   */
+  private static <T extends AbstractBuilder<T>> void populateContext(T builder,
+      IdealState idealState) {
+    String replicas = idealState.getReplicas();
+    int replicaCount = 0;
+    boolean anyLiveParticipant = false;
+    if (replicas.equals(StateModelToken.ANY_LIVEINSTANCE.toString())) {
+      anyLiveParticipant = true;
+    } else {
+      replicaCount = Integer.parseInt(replicas);
+    }
+    for (PartitionId partitionId : idealState.getPartitionSet()) {
+      builder.addPartition(new Partition(partitionId));
+    }
+    builder.anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
+        .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
+        .participantGroupTag(idealState.getInstanceGroupTag())
+        .stateModelDefId(idealState.getStateModelDefId())
+        .stateModelFactoryId(idealState.getStateModelFactoryId());
+    RebalancerRef rebalancerRef = idealState.getRebalancerRef();
+    if (rebalancerRef != null) {
+      builder.rebalancerRef(rebalancerRef);
+    }
+  }
+
+  /**
+   * Builder for a basic data rebalancer context
+   */
+  public static final class Builder extends AbstractBuilder<Builder> {
+    /**
+     * Instantiate with a resource
+     * @param resourceId resource id
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    @Override
+    public PartitionedRebalancerContext build() {
+      PartitionedRebalancerContext context =
+          new PartitionedRebalancerContext(RebalanceMode.USER_DEFINED);
+      super.update(context);
+      return context;
+    }
+  }
+
+  /**
+   * Abstract builder for a generic partitioned resource rebalancer context
+   */
+  public static abstract class AbstractBuilder<T extends BasicRebalancerContext.AbstractBuilder<T>>
+      extends BasicRebalancerContext.AbstractBuilder<T> {
+    private final ResourceId _resourceId;
+    private final Map<PartitionId, Partition> _partitionMap;
+    private boolean _anyLiveParticipant;
+    private int _replicaCount;
+    private int _maxPartitionsPerParticipant;
+
+    /**
+     * Instantiate with a resource
+     * @param resourceId resource id
+     */
+    public AbstractBuilder(ResourceId resourceId) {
+      super(resourceId);
+      _resourceId = resourceId;
+      _partitionMap = Maps.newHashMap();
+      _anyLiveParticipant = false;
+      _replicaCount = 1;
+      _maxPartitionsPerParticipant = Integer.MAX_VALUE;
+    }
+
+    /**
+     * Add a partition that the resource serves
+     * @param partition fully-qualified partition
+     * @return Builder
+     */
+    public T addPartition(Partition partition) {
+      _partitionMap.put(partition.getId(), partition);
+      return self();
+    }
+
+    /**
+     * Add a collection of partitions
+     * @param partitions any collection of Partition objects
+     * @return Builder
+     */
+    public T addPartitions(Collection<Partition> partitions) {
+      for (Partition partition : partitions) {
+        addPartition(partition);
+      }
+      return self();
+    }
+
+    /**
+     * Add a specified number of partitions with a default naming scheme, namely
+     * resourceId_partitionNumber where partitionNumber starts at 0
+     * @param partitionCount number of partitions to add
+     * @return Builder
+     */
+    public T addPartitions(int partitionCount) {
+      for (int i = 0; i < partitionCount; i++) {
+        addPartition(new Partition(PartitionId.from(_resourceId, Integer.toString(i))));
+      }
+      return self();
+    }
+
+    /**
+     * Set whether any live participant should be used in rebalancing
+     * @param anyLiveParticipant true if any live participant can be used, false otherwise
+     * @return Builder
+     */
+    public T anyLiveParticipant(boolean anyLiveParticipant) {
+      _anyLiveParticipant = anyLiveParticipant;
+      return self();
+    }
+
+    /**
+     * Set the number of replicas
+     * @param replicaCount number of replicas
+     * @return Builder
+     */
+    public T replicaCount(int replicaCount) {
+      _replicaCount = replicaCount;
+      return self();
+    }
+
+    /**
+     * Set the maximum number of partitions to assign to any participant
+     * @param maxPartitionsPerParticipant the maximum
+     * @return Builder
+     */
+    public T maxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
+      _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
+      return self();
+    }
+
+    /**
+     * Update a DataRebalancerContext with fields from this builder level
+     * @param context DataRebalancerContext
+     */
+    protected final void update(PartitionedRebalancerContext context) {
+      super.update(context);
+      // enforce at least one partition
+      if (_partitionMap.isEmpty()) {
+        addPartitions(1);
+      }
+      context.setPartitionMap(_partitionMap);
+      context.setAnyLiveParticipant(_anyLiveParticipant);
+      context.setMaxPartitionsPerParticipant(_maxPartitionsPerParticipant);
+      context.setReplicaCount(_replicaCount);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/Rebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/Rebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/Rebalancer.java
new file mode 100644
index 0000000..6e55e63
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/Rebalancer.java
@@ -0,0 +1,38 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.ResourceAssignment;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Allows one to come up with custom implementation of a rebalancer.<br/>
+ * This will be invoked on all changes that happen in the cluster.<br/>
+ * Simply return the resource assignment for a resource in this method.<br/>
+ */
+public interface Rebalancer {
+
+  public void init(HelixManager helixManager);
+
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster,
+      ResourceCurrentState currentState);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
new file mode 100644
index 0000000..26f134b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
@@ -0,0 +1,149 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import org.apache.helix.api.NamespacedConfig;
+import org.apache.helix.api.Scope;
+import org.apache.helix.model.ResourceConfiguration;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Configuration for a resource rebalancer. This contains a RebalancerContext, which contains
+ * information specific to each rebalancer.
+ */
+public final class RebalancerConfig {
+  private enum Fields {
+    SERIALIZER_CLASS,
+    REBALANCER_CONTEXT,
+    REBALANCER_CONTEXT_CLASS
+  }
+
+  private static final Logger LOG = Logger.getLogger(RebalancerConfig.class);
+  private ContextSerializer _serializer;
+  private Rebalancer _rebalancer;
+  private final RebalancerContext _context;
+  private final NamespacedConfig _config;
+
+  /**
+   * Instantiate a RebalancerConfig
+   * @param context rebalancer context
+   * @param rebalancerRef reference to the rebalancer class that will be used
+   */
+  public RebalancerConfig(RebalancerContext context) {
+    _config =
+        new NamespacedConfig(Scope.resource(context.getResourceId()),
+            RebalancerConfig.class.getSimpleName());
+    _config.setSimpleField(Fields.SERIALIZER_CLASS.toString(), context.getSerializerClass()
+        .getName());
+    _config
+        .setSimpleField(Fields.REBALANCER_CONTEXT_CLASS.toString(), context.getClass().getName());
+    _context = context;
+    try {
+      _serializer = context.getSerializerClass().newInstance();
+      _config.setSimpleField(Fields.REBALANCER_CONTEXT.toString(), _serializer.serialize(context));
+    } catch (InstantiationException e) {
+      LOG.error("Error initializing the configuration", e);
+    } catch (IllegalAccessException e) {
+      LOG.error("Error initializing the configuration", e);
+    }
+  }
+
+  /**
+   * Instantiate from a physical ResourceConfiguration
+   * @param resourceConfiguration populated ResourceConfiguration
+   */
+  public RebalancerConfig(ResourceConfiguration resourceConfiguration) {
+    _config = new NamespacedConfig(resourceConfiguration, RebalancerConfig.class.getSimpleName());
+    _serializer = getSerializer();
+    _context = getContext();
+  }
+
+  /**
+   * Get the class that can serialize and deserialize the rebalancer context
+   * @return ContextSerializer
+   */
+  private ContextSerializer getSerializer() {
+    String serializerClassName = _config.getSimpleField(Fields.SERIALIZER_CLASS.toString());
+    if (serializerClassName != null) {
+      try {
+        return (ContextSerializer) HelixUtil.loadClass(getClass(), serializerClassName)
+            .newInstance();
+      } catch (InstantiationException e) {
+        LOG.error("Error getting the serializer", e);
+      } catch (IllegalAccessException e) {
+        LOG.error("Error getting the serializer", e);
+      } catch (ClassNotFoundException e) {
+        LOG.error("Error getting the serializer", e);
+      }
+    }
+    return null;
+  }
+
+  private RebalancerContext getContext() {
+    String className = _config.getSimpleField(Fields.REBALANCER_CONTEXT_CLASS.toString());
+    try {
+      Class<? extends RebalancerContext> contextClass =
+          HelixUtil.loadClass(getClass(), className).asSubclass(RebalancerContext.class);
+      String serialized = _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
+      return _serializer.deserialize(contextClass, serialized);
+    } catch (ClassNotFoundException e) {
+      LOG.info(className + " is not a valid class");
+    }
+    return null;
+  }
+
+  /**
+   * Get a rebalancer class instance
+   * @return Rebalancer
+   */
+  public Rebalancer getRebalancer() {
+    // cache the rebalancer to avoid loading and instantiating it excessively
+    if (_rebalancer == null) {
+      if (_context == null || _context.getRebalancerRef() == null) {
+        return null;
+      }
+      _rebalancer = _context.getRebalancerRef().getRebalancer();
+    }
+    return _rebalancer;
+  }
+
+  /**
+   * Get the instantiated RebalancerContext
+   * @param contextClass specific class of the RebalancerContext
+   * @return RebalancerContext subclass instance, or null if conversion is not possible
+   */
+  public <T extends RebalancerContext> T getRebalancerContext(Class<T> contextClass) {
+    try {
+      return contextClass.cast(_context);
+    } catch (ClassCastException e) {
+      LOG.info(contextClass + " is incompatible with context class: " + _context.getClass());
+    }
+    return null;
+  }
+
+  /**
+   * Convert this to a namespaced config
+   * @return NamespacedConfig
+   */
+  public NamespacedConfig toNamespacedConfig() {
+    return _config;
+  }
+}