You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/16 02:09:44 UTC

[1/4] [HELIX-209] Moving rebalancer code around, take 2

Updated Branches:
  refs/heads/helix-logical-model 9f229c80c -> e032132a6


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index 7fe3314..369ad68 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -33,7 +33,7 @@ import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.rebalancer.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java b/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
index 0a578c1..74781cd 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
@@ -9,8 +9,8 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.FullAutoRebalancerContext;
-import org.apache.helix.api.rebalancer.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
index 8650475..5bbe54f 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
@@ -8,9 +8,6 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.CustomRebalancerContext;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
-import org.apache.helix.api.rebalancer.RebalancerContext;
 import org.apache.helix.model.ResourceConfiguration;
 import org.testng.Assert;
 import org.testng.annotations.Test;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index 6279087..ecb8151 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -39,10 +39,10 @@ import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.PartitionedRebalancerContext;
-import org.apache.helix.api.rebalancer.RebalancerContext;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
index 0dba374..45b0dac 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
@@ -32,8 +32,8 @@ import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.RebalancerContext;
 import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index 50d5ad8..f8fa4a2 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -41,7 +41,7 @@ import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.StateModelDefinition;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
index 8620383..9f52866 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
@@ -46,7 +46,7 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index 688846e..eba322d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -32,9 +32,9 @@ import org.apache.helix.api.Cluster;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.rebalancer.PartitionedRebalancerContext;
-import org.apache.helix.api.rebalancer.Rebalancer;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
index df81000..7215707 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
@@ -21,7 +21,7 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.FullAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;


[3/4] [HELIX-209] Moving rebalancer code around, take 2

Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancerContext.java
deleted file mode 100644
index e1f1ac2..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancerContext.java
+++ /dev/null
@@ -1,176 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for SEMI_AUTO rebalancer mode. It indicates the preferred locations of each
- * partition replica. By default, it corresponds to {@link SemiAutoRebalancer}
- */
-public final class SemiAutoRebalancerContext extends PartitionedRebalancerContext {
-  @JsonProperty("preferenceLists")
-  private Map<PartitionId, List<ParticipantId>> _preferenceLists;
-
-  /**
-   * Instantiate a SemiAutoRebalancerContext
-   */
-  public SemiAutoRebalancerContext() {
-    super(RebalanceMode.SEMI_AUTO);
-    setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
-    _preferenceLists = Maps.newHashMap();
-  }
-
-  /**
-   * Get the preference lists of all partitions of the resource
-   * @return map of partition id to list of participant ids
-   */
-  public Map<PartitionId, List<ParticipantId>> getPreferenceLists() {
-    return _preferenceLists;
-  }
-
-  /**
-   * Set the preference lists of all partitions of the resource
-   * @param preferenceLists
-   */
-  public void setPreferenceLists(Map<PartitionId, List<ParticipantId>> preferenceLists) {
-    _preferenceLists = preferenceLists;
-  }
-
-  /**
-   * Get the preference list of a partition
-   * @param partitionId the partition to look up
-   * @return list of participant ids
-   */
-  @JsonIgnore
-  public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
-    return _preferenceLists.get(partitionId);
-  }
-
-  /**
-   * Generate preference lists based on a default cluster setup
-   * @param stateModelDef the state model definition to follow
-   * @param participantSet the set of participant ids to configure for
-   */
-  @Override
-  @JsonIgnore
-  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
-      Set<ParticipantId> participantSet) {
-    // compute default upper bounds
-    Map<State, String> upperBounds = Maps.newHashMap();
-    for (State state : stateModelDef.getTypedStatesPriorityList()) {
-      upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
-    }
-
-    // determine the current mapping
-    Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
-    for (PartitionId partitionId : getPartitionSet()) {
-      List<ParticipantId> preferenceList = getPreferenceList(partitionId);
-      if (preferenceList != null && !preferenceList.isEmpty()) {
-        Set<ParticipantId> disabledParticipants = Collections.emptySet();
-        Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
-        Map<ParticipantId, State> initialMap =
-            ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
-                participantSet, stateModelDef, preferenceList, emptyCurrentState,
-                disabledParticipants);
-        currentMapping.put(partitionId, initialMap);
-      }
-    }
-
-    // determine the preference
-    LinkedHashMap<State, Integer> stateCounts =
-        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
-            getReplicaCount());
-    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
-    List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
-    List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
-    AutoRebalanceStrategy strategy =
-        new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
-            getMaxPartitionsPerParticipant(), placementScheme);
-    Map<String, List<String>> rawPreferenceLists =
-        strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
-            .getListFields();
-    Map<PartitionId, List<ParticipantId>> preferenceLists =
-        Maps.newHashMap(IdealState.preferenceListsFromStringLists(rawPreferenceLists));
-    setPreferenceLists(preferenceLists);
-  }
-
-  /**
-   * Build a SemiAutoRebalancerContext. By default, it corresponds to {@link SemiAutoRebalancer}
-   */
-  public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
-    private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
-
-    /**
-     * Instantiate for a resource
-     * @param resourceId resource id
-     */
-    public Builder(ResourceId resourceId) {
-      super(resourceId);
-      super.rebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
-      _preferenceLists = Maps.newHashMap();
-    }
-
-    /**
-     * Add a preference list for a partition
-     * @param partitionId partition to set
-     * @param preferenceList ordered list of participants who can serve the partition
-     * @return Builder
-     */
-    public Builder preferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
-      _preferenceLists.put(partitionId, preferenceList);
-      return self();
-    }
-
-    @Override
-    protected Builder self() {
-      return this;
-    }
-
-    @Override
-    public SemiAutoRebalancerContext build() {
-      SemiAutoRebalancerContext context = new SemiAutoRebalancerContext();
-      super.update(context);
-      context.setPreferenceLists(_preferenceLists);
-      return context;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/util/ConstraintBasedAssignment.java
deleted file mode 100644
index 0199796..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/util/ConstraintBasedAssignment.java
+++ /dev/null
@@ -1,244 +0,0 @@
-package org.apache.helix.api.rebalancer.util;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixConstants.StateModelToken;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.State;
-import org.apache.helix.api.config.ClusterConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * Collection of functions that will compute the best possible state based on the participants and
- * the rebalancer configuration of a resource.
- */
-public class ConstraintBasedAssignment {
-  private static Logger logger = Logger.getLogger(ConstraintBasedAssignment.class);
-
-  /**
-   * Get a set of disabled participants for a partition
-   * @param participantMap map of all participants
-   * @param partitionId the partition to check
-   * @return a set of all participants that are disabled for the partition
-   */
-  public static Set<ParticipantId> getDisabledParticipants(
-      final Map<ParticipantId, Participant> participantMap, final PartitionId partitionId) {
-    Set<ParticipantId> participantSet = new HashSet<ParticipantId>(participantMap.keySet());
-    Set<ParticipantId> disabledParticipantsForPartition =
-        Sets.filter(participantSet, new Predicate<ParticipantId>() {
-          @Override
-          public boolean apply(ParticipantId participantId) {
-            Participant participant = participantMap.get(participantId);
-            return !participant.isEnabled()
-                || participant.getDisabledPartitionIds().contains(partitionId);
-          }
-        });
-    return disabledParticipantsForPartition;
-  }
-
-  /**
-   * Get an ordered list of participants that can serve a partition
-   * @param cluster cluster snapshot
-   * @param partitionId the partition to look up
-   * @param config rebalancing constraints
-   * @return list with most preferred participants first
-   */
-  public static List<ParticipantId> getPreferenceList(Cluster cluster, PartitionId partitionId,
-      List<ParticipantId> prefList) {
-    if (prefList != null && prefList.size() == 1
-        && StateModelToken.ANY_LIVEINSTANCE.toString().equals(prefList.get(0).stringify())) {
-      prefList = new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
-      Collections.sort(prefList);
-    }
-    return prefList;
-  }
-
-  /**
-   * Get a map of state to upper bound constraint given a cluster
-   * @param stateModelDef the state model definition to check
-   * @param resourceId the resource that is constraint
-   * @param cluster the cluster the resource belongs to
-   * @return map of state to upper bound
-   */
-  public static Map<State, String> stateConstraints(StateModelDefinition stateModelDef,
-      ResourceId resourceId, ClusterConfig cluster) {
-    Map<State, String> stateMap = Maps.newHashMap();
-    for (State state : stateModelDef.getTypedStatesPriorityList()) {
-      String num =
-          cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
-              stateModelDef.getStateModelDefId(), state);
-      stateMap.put(state, num);
-    }
-    return stateMap;
-  }
-
-  /**
-   * compute best state for resource in SEMI_AUTO and FULL_AUTO modes
-   * @param upperBounds map of state to upper bound
-   * @param liveParticipantSet set of live participant ids
-   * @param stateModelDef
-   * @param participantPreferenceList
-   * @param currentStateMap
-   *          : participant->state for each partition
-   * @param disabledParticipantsForPartition
-   * @return
-   */
-  public static Map<ParticipantId, State> computeAutoBestStateForPartition(
-      Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
-      StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
-      Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
-    Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
-
-    // if the resource is deleted, instancePreferenceList will be empty and
-    // we should drop all resources.
-    if (currentStateMap != null) {
-      for (ParticipantId participantId : currentStateMap.keySet()) {
-        if ((participantPreferenceList == null || !participantPreferenceList
-            .contains(participantId)) && !disabledParticipantsForPartition.contains(participantId)) {
-          // if dropped and not disabled, transit to DROPPED
-          participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
-        } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
-            participantId).equals(State.from(HelixDefinedState.ERROR)))
-            && disabledParticipantsForPartition.contains(participantId)) {
-          // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
-          participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
-        }
-      }
-    }
-
-    // resource is deleted
-    if (participantPreferenceList == null) {
-      return participantStateMap;
-    }
-
-    List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
-    boolean assigned[] = new boolean[participantPreferenceList.size()];
-
-    for (State state : statesPriorityList) {
-      String num = upperBounds.get(state);
-      int stateCount = -1;
-      if ("N".equals(num)) {
-        Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantSet);
-        liveAndEnabled.removeAll(disabledParticipantsForPartition);
-        stateCount = liveAndEnabled.size();
-      } else if ("R".equals(num)) {
-        stateCount = participantPreferenceList.size();
-      } else {
-        try {
-          stateCount = Integer.parseInt(num);
-        } catch (Exception e) {
-          logger.error("Invalid count for state:" + state + " ,count=" + num);
-        }
-      }
-      if (stateCount > -1) {
-        int count = 0;
-        for (int i = 0; i < participantPreferenceList.size(); i++) {
-          ParticipantId participantId = participantPreferenceList.get(i);
-
-          boolean notInErrorState =
-              currentStateMap == null
-                  || currentStateMap.get(participantId) == null
-                  || !currentStateMap.get(participantId)
-                      .equals(State.from(HelixDefinedState.ERROR));
-
-          if (liveParticipantSet.contains(participantId) && !assigned[i] && notInErrorState
-              && !disabledParticipantsForPartition.contains(participantId)) {
-            participantStateMap.put(participantId, state);
-            count = count + 1;
-            assigned[i] = true;
-            if (count == stateCount) {
-              break;
-            }
-          }
-        }
-      }
-    }
-    return participantStateMap;
-  }
-
-  /**
-   * Get the number of replicas that should be in each state for a partition
-   * @param upperBounds map of state to upper bound
-   * @param stateModelDef StateModelDefinition object
-   * @param liveNodesNb number of live nodes
-   * @param total number of replicas
-   * @return state count map: state->count
-   */
-  public static LinkedHashMap<State, Integer> stateCount(Map<State, String> upperBounds,
-      StateModelDefinition stateModelDef, int liveNodesNb, int totalReplicas) {
-    LinkedHashMap<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
-    List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
-
-    int replicas = totalReplicas;
-    for (State state : statesPriorityList) {
-      String num = upperBounds.get(state);
-      if ("N".equals(num)) {
-        stateCountMap.put(state, liveNodesNb);
-      } else if ("R".equals(num)) {
-        // wait until we get the counts for all other states
-        continue;
-      } else {
-        int stateCount = -1;
-        try {
-          stateCount = Integer.parseInt(num);
-        } catch (Exception e) {
-          // LOG.error("Invalid count for state: " + state + ", count: " + num +
-          // ", use -1 instead");
-        }
-
-        if (stateCount > 0) {
-          stateCountMap.put(state, stateCount);
-          replicas -= stateCount;
-        }
-      }
-    }
-
-    // get state count for R
-    for (State state : statesPriorityList) {
-      String num = upperBounds.get(state);
-      if ("R".equals(num)) {
-        stateCountMap.put(state, replicas);
-        // should have at most one state using R
-        break;
-      }
-    }
-    return stateCountMap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
deleted file mode 100644
index 880f2c0..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ /dev/null
@@ -1,187 +0,0 @@
-package org.apache.helix.controller.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * This is a Rebalancer specific to full automatic mode. It is tasked with computing the ideal
- * state of a resource, fully adapting to the addition or removal of instances. This includes
- * computation of a new preference list and a partition to instance and state mapping based on the
- * computed instance preferences.
- * The input is the current assignment of partitions to instances, as well as existing instance
- * preferences, if any.
- * The output is a preference list and a mapping based on that preference list, i.e. partition p
- * has a replica on node k with state s.
- */
-@Deprecated
-public class AutoRebalancer implements Rebalancer {
-  // These should be final, but are initialized in init rather than a constructor
-  private HelixManager _manager;
-  private AutoRebalanceStrategy _algorithm;
-
-  private static final Logger LOG = Logger.getLogger(AutoRebalancer.class);
-
-  @Override
-  public void init(HelixManager manager) {
-    this._manager = manager;
-    this._algorithm = null;
-  }
-
-  @Override
-  public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
-      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
-    // Compute a preference list based on the current ideal state
-    List<String> partitions = new ArrayList<String>(currentIdealState.getPartitionSet());
-    String stateModelName = currentIdealState.getStateModelDefRef();
-    StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
-    Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances();
-    String replicas = currentIdealState.getReplicas();
-
-    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
-    stateCountMap =
-        ConstraintBasedAssignment.stateCount(stateModelDef, liveInstance.size(),
-            Integer.parseInt(replicas));
-    List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
-    Map<String, Map<String, String>> currentMapping =
-        currentMapping(currentStateOutput, resource.getResourceName(), partitions, stateCountMap);
-
-    // If there are nodes tagged with resource name, use only those nodes
-    Set<String> taggedNodes = new HashSet<String>();
-    if (currentIdealState.getInstanceGroupTag() != null) {
-      for (String instanceName : liveNodes) {
-        if (clusterData.getInstanceConfigMap().get(instanceName)
-            .containsTag(currentIdealState.getInstanceGroupTag())) {
-          taggedNodes.add(instanceName);
-        }
-      }
-    }
-    if (taggedNodes.size() > 0) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("found the following instances with tag " + currentIdealState.getResourceName()
-            + " " + taggedNodes);
-      }
-      liveNodes = new ArrayList<String>(taggedNodes);
-    }
-
-    List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
-    int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("currentMapping: " + currentMapping);
-      LOG.info("stateCountMap: " + stateCountMap);
-      LOG.info("liveNodes: " + liveNodes);
-      LOG.info("allNodes: " + allNodes);
-      LOG.info("maxPartition: " + maxPartition);
-    }
-    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
-    placementScheme.init(_manager);
-    _algorithm =
-        new AutoRebalanceStrategy(resource.getResourceName(), partitions, stateCountMap,
-            maxPartition, placementScheme);
-    ZNRecord newMapping =
-        _algorithm.computePartitionAssignment(liveNodes, currentMapping, allNodes);
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("newMapping: " + newMapping);
-    }
-
-    IdealState newIdealState = new IdealState(resource.getResourceName());
-    newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
-    newIdealState.setRebalanceMode(RebalanceMode.FULL_AUTO);
-    newIdealState.getRecord().setListFields(newMapping.getListFields());
-
-    // compute a full partition mapping for the resource
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + resource.getResourceName());
-    }
-    ResourceAssignment partitionMapping =
-        new ResourceAssignment(ResourceId.from(resource.getResourceName()));
-    for (String partitionName : partitions) {
-      Partition partition = new Partition(partitionName);
-      Map<String, String> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
-      Set<String> disabledInstancesForPartition =
-          clusterData.getDisabledInstancesForPartition(partition.toString());
-      List<String> preferenceList =
-          ConstraintBasedAssignment.getPreferenceList(clusterData, partition, newIdealState,
-              stateModelDef);
-      Map<String, String> bestStateForPartition =
-          ConstraintBasedAssignment.computeAutoBestStateForPartition(clusterData, stateModelDef,
-              preferenceList, currentStateMap, disabledInstancesForPartition);
-      partitionMapping.addReplicaMap(PartitionId.from(partitionName),
-          ResourceAssignment.replicaMapFromStringMap(bestStateForPartition));
-    }
-    return partitionMapping;
-  }
-
-  private Map<String, Map<String, String>> currentMapping(CurrentStateOutput currentStateOutput,
-      String resourceName, List<String> partitions, Map<String, Integer> stateCountMap) {
-
-    Map<String, Map<String, String>> map = new HashMap<String, Map<String, String>>();
-
-    for (String partition : partitions) {
-      Map<String, String> curStateMap =
-          currentStateOutput.getCurrentStateMap(resourceName, new Partition(partition));
-      map.put(partition, new HashMap<String, String>());
-      for (String node : curStateMap.keySet()) {
-        String state = curStateMap.get(node);
-        if (stateCountMap.containsKey(state)) {
-          map.get(partition).put(node, state);
-        }
-      }
-
-      Map<String, String> pendingStateMap =
-          currentStateOutput.getPendingStateMap(resourceName, new Partition(partition));
-      for (String node : pendingStateMap.keySet()) {
-        String state = pendingStateMap.get(node);
-        if (stateCountMap.containsKey(state)) {
-          map.get(partition).put(node, state);
-        }
-      }
-    }
-    return map;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index f6ea60f..ac4d328 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -1,5 +1,23 @@
 package org.apache.helix.controller.rebalancer;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,117 +37,89 @@ package org.apache.helix.controller.rebalancer;
  * under the License.
  */
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * This is a Rebalancer specific to custom mode. It is tasked with checking an existing mapping of
- * partitions against the set of live instances to mark assignment states as dropped or erroneous
- * as necessary.
- * The input is the required current assignment of partitions to instances, as well as the required
- * existing instance preferences.
- * The output is a verified mapping based on that preference list, i.e. partition p has a replica
- * on node k with state s, where s may be a dropped or error state if necessary.
- */
-@Deprecated
 public class CustomRebalancer implements Rebalancer {
 
   private static final Logger LOG = Logger.getLogger(CustomRebalancer.class);
 
   @Override
-  public void init(HelixManager manager) {
+  public void init(HelixManager helixManager) {
+    // do nothing
   }
 
   @Override
-  public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
-      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
-    String stateModelDefName = currentIdealState.getStateModelDefRef();
-    StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelDefName);
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+      Cluster cluster, ResourceCurrentState currentState) {
+    CustomRebalancerContext config =
+        rebalancerConfig.getRebalancerContext(CustomRebalancerContext.class);
+    StateModelDefinition stateModelDef =
+        cluster.getStateModelMap().get(config.getStateModelDefId());
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + resource.getResourceName());
+      LOG.debug("Processing resource:" + config.getResourceId());
     }
-    ResourceAssignment partitionMapping =
-        new ResourceAssignment(ResourceId.from(resource.getResourceName()));
-    for (Partition partition : resource.getPartitions()) {
-      Map<String, String> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
-      Set<String> disabledInstancesForPartition =
-          clusterData.getDisabledInstancesForPartition(partition.toString());
-      Map<String, String> idealStateMap =
-          IdealState.stringMapFromParticipantStateMap(currentIdealState
-              .getParticipantStateMap(PartitionId.from(partition.getPartitionName())));
-      Map<String, String> bestStateForPartition =
-          computeCustomizedBestStateForPartition(clusterData, stateModelDef, idealStateMap,
-              currentStateMap, disabledInstancesForPartition);
-      partitionMapping.addReplicaMap(PartitionId.from(partition.getPartitionName()),
-          ResourceAssignment.replicaMapFromStringMap(bestStateForPartition));
+    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+    for (PartitionId partition : config.getPartitionSet()) {
+      Map<ParticipantId, State> currentStateMap =
+          currentState.getCurrentStateMap(config.getResourceId(), partition);
+      Set<ParticipantId> disabledInstancesForPartition =
+          ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+              partition);
+      Map<ParticipantId, State> bestStateForPartition =
+          computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap().keySet(),
+              stateModelDef, config.getPreferenceMap(partition), currentStateMap,
+              disabledInstancesForPartition);
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
   }
 
   /**
-   * compute best state for resource in CUSTOMIZED ideal state mode
-   * @param cache
+   * compute best state for resource in CUSTOMIZED rebalancer mode
+   * @param liveParticipantMap
    * @param stateModelDef
-   * @param idealStateMap
+   * @param preferenceMap
    * @param currentStateMap
-   * @param disabledInstancesForPartition
+   * @param disabledParticipantsForPartition
    * @return
    */
-  private Map<String, String> computeCustomizedBestStateForPartition(ClusterDataCache cache,
-      StateModelDefinition stateModelDef, Map<String, String> idealStateMap,
-      Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition) {
-    Map<String, String> instanceStateMap = new HashMap<String, String>();
+  private Map<ParticipantId, State> computeCustomizedBestStateForPartition(
+      Set<ParticipantId> liveParticipantSet, StateModelDefinition stateModelDef,
+      Map<ParticipantId, State> preferenceMap, Map<ParticipantId, State> currentStateMap,
+      Set<ParticipantId> disabledParticipantsForPartition) {
+    Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
 
-    // if the ideal state is deleted, idealStateMap will be null/empty and
+    // if the resource is deleted, idealStateMap will be null/empty and
     // we should drop all resources.
     if (currentStateMap != null) {
-      for (String instance : currentStateMap.keySet()) {
-        if ((idealStateMap == null || !idealStateMap.containsKey(instance))
-            && !disabledInstancesForPartition.contains(instance)) {
+      for (ParticipantId participantId : currentStateMap.keySet()) {
+        if ((preferenceMap == null || !preferenceMap.containsKey(participantId))
+            && !disabledParticipantsForPartition.contains(participantId)) {
           // if dropped and not disabled, transit to DROPPED
-          instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
-        } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
-            HelixDefinedState.ERROR.toString()))
-            && disabledInstancesForPartition.contains(instance)) {
+          participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
+        } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
+            participantId).equals(State.from(HelixDefinedState.ERROR)))
+            && disabledParticipantsForPartition.contains(participantId)) {
           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
-          instanceStateMap.put(instance, stateModelDef.getInitialState());
+          participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
         }
       }
     }
 
     // ideal state is deleted
-    if (idealStateMap == null) {
-      return instanceStateMap;
+    if (preferenceMap == null) {
+      return participantStateMap;
     }
 
-    Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
-    for (String instance : idealStateMap.keySet()) {
+    for (ParticipantId participantId : preferenceMap.keySet()) {
       boolean notInErrorState =
-          currentStateMap == null || currentStateMap.get(instance) == null
-              || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString());
+          currentStateMap == null || currentStateMap.get(participantId) == null
+              || !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
 
-      if (liveInstancesMap.containsKey(instance) && notInErrorState
-          && !disabledInstancesForPartition.contains(instance)) {
-        instanceStateMap.put(instance, idealStateMap.get(instance));
+      if (liveParticipantSet.contains(participantId) && notInErrorState
+          && !disabledParticipantsForPartition.contains(participantId)) {
+        participantStateMap.put(participantId, preferenceMap.get(participantId));
       }
     }
 
-    return instanceStateMap;
+    return participantStateMap;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
new file mode 100644
index 0000000..b0c11d4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
@@ -0,0 +1,196 @@
+package org.apache.helix.controller.rebalancer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class FullAutoRebalancer implements Rebalancer {
+  // These should be final, but are initialized in init rather than a constructor
+  private AutoRebalanceStrategy _algorithm;
+
+  private static Logger LOG = Logger.getLogger(FullAutoRebalancer.class);
+
+  @Override
+  public void init(HelixManager helixManager) {
+    // do nothing
+  }
+
+  @Override
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+      Cluster cluster, ResourceCurrentState currentState) {
+    FullAutoRebalancerContext config =
+        rebalancerConfig.getRebalancerContext(FullAutoRebalancerContext.class);
+    StateModelDefinition stateModelDef =
+        cluster.getStateModelMap().get(config.getStateModelDefId());
+    // Compute a preference list based on the current ideal state
+    List<PartitionId> partitions = new ArrayList<PartitionId>(config.getPartitionSet());
+    Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
+    Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
+    int replicas = -1;
+    if (config.anyLiveParticipant()) {
+      replicas = liveParticipants.size();
+    } else {
+      replicas = config.getReplicaCount();
+    }
+
+    // count how many replicas should be in each state
+    Map<State, String> upperBounds =
+        ConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
+            cluster.getConfig());
+    LinkedHashMap<State, Integer> stateCountMap =
+        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef,
+            liveParticipants.size(), replicas);
+
+    // get the participant lists
+    List<ParticipantId> liveParticipantList =
+        new ArrayList<ParticipantId>(liveParticipants.keySet());
+    List<ParticipantId> allParticipantList =
+        new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
+
+    // compute the current mapping from the current state
+    Map<PartitionId, Map<ParticipantId, State>> currentMapping =
+        currentMapping(config, currentState, stateCountMap);
+
+    // If there are nodes tagged with resource, use only those nodes
+    Set<ParticipantId> taggedNodes = new HashSet<ParticipantId>();
+    if (config.getParticipantGroupTag() != null) {
+      for (ParticipantId participantId : liveParticipantList) {
+        if (liveParticipants.get(participantId).hasTag(config.getParticipantGroupTag())) {
+          taggedNodes.add(participantId);
+        }
+      }
+    }
+    if (taggedNodes.size() > 0) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("found the following instances with tag " + config.getResourceId() + " "
+            + taggedNodes);
+      }
+      liveParticipantList = new ArrayList<ParticipantId>(taggedNodes);
+    }
+
+    // determine which nodes the replicas should live on
+    int maxPartition = config.getMaxPartitionsPerParticipant();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("currentMapping: " + currentMapping);
+      LOG.info("stateCountMap: " + stateCountMap);
+      LOG.info("liveNodes: " + liveParticipantList);
+      LOG.info("allNodes: " + allParticipantList);
+      LOG.info("maxPartition: " + maxPartition);
+    }
+    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+    _algorithm =
+        new AutoRebalanceStrategy(config.getResourceId(), partitions, stateCountMap, maxPartition,
+            placementScheme);
+    ZNRecord newMapping =
+        _algorithm.typedComputePartitionAssignment(liveParticipantList, currentMapping,
+            allParticipantList);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("newMapping: " + newMapping);
+    }
+
+    // compute a full partition mapping for the resource
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing resource:" + config.getResourceId());
+    }
+    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+    for (PartitionId partition : partitions) {
+      Set<ParticipantId> disabledParticipantsForPartition =
+          ConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition);
+      List<String> rawPreferenceList = newMapping.getListField(partition.stringify());
+      if (rawPreferenceList == null) {
+        rawPreferenceList = Collections.emptyList();
+      }
+      List<ParticipantId> preferenceList =
+          Lists.transform(rawPreferenceList, new Function<String, ParticipantId>() {
+            @Override
+            public ParticipantId apply(String participantName) {
+              return ParticipantId.from(participantName);
+            }
+          });
+      preferenceList =
+          ConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
+      Map<ParticipantId, State> bestStateForPartition =
+          ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
+              liveParticipants.keySet(), stateModelDef, preferenceList,
+              currentState.getCurrentStateMap(config.getResourceId(), partition),
+              disabledParticipantsForPartition);
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
+    }
+    return partitionMapping;
+  }
+
+  private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
+      FullAutoRebalancerContext config, ResourceCurrentState currentStateOutput,
+      Map<State, Integer> stateCountMap) {
+    Map<PartitionId, Map<ParticipantId, State>> map =
+        new HashMap<PartitionId, Map<ParticipantId, State>>();
+
+    for (PartitionId partition : config.getPartitionSet()) {
+      Map<ParticipantId, State> curStateMap =
+          currentStateOutput.getCurrentStateMap(config.getResourceId(), partition);
+      map.put(partition, new HashMap<ParticipantId, State>());
+      for (ParticipantId node : curStateMap.keySet()) {
+        State state = curStateMap.get(node);
+        if (stateCountMap.containsKey(state)) {
+          map.get(partition).put(node, state);
+        }
+      }
+
+      Map<ParticipantId, State> pendingStateMap =
+          currentStateOutput.getPendingStateMap(config.getResourceId(), partition);
+      for (ParticipantId node : pendingStateMap.keySet()) {
+        State state = pendingStateMap.get(node);
+        if (stateCountMap.containsKey(state)) {
+          map.get(partition).put(node, state);
+        }
+      }
+    }
+    return map;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
index 26fc2ef..5a6a24e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
@@ -1,5 +1,11 @@
 package org.apache.helix.controller.rebalancer;
 
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.ResourceAssignment;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,40 +25,15 @@ package org.apache.helix.controller.rebalancer;
  * under the License.
  */
 
-import org.apache.helix.HelixManager;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-
 /**
  * Allows one to come up with custom implementation of a rebalancer.<br/>
  * This will be invoked on all changes that happen in the cluster.<br/>
- * Simply return the newIdealState for a resource in this method.<br/>
- * <br/>
- * Deprecated. Use {@link org.apache.helix.api.rebalancer.Rebalancer} instead.
+ * Simply return the resource assignment for a resource in this method.<br/>
  */
-@Deprecated
 public interface Rebalancer {
-  /**
-   * Initialize the rebalancer with a HelixManager if necessary
-   * @param manager
-   */
-  void init(HelixManager manager);
 
-  /**
-   * Given an ideal state for a resource and liveness of instances, compute a assignment of
-   * instances and states to each partition of a resource. This method provides all the relevant
-   * information needed to rebalance a resource. If you need additional information use
-   * manager.getAccessor to read the cluster data. This allows one to compute the newIdealState
-   * according to app specific requirements.
-   * @param resourceName the resource for which a mapping will be computed
-   * @param currentIdealState the IdealState that corresponds to this resource
-   * @param currentStateOutput the current states of all partitions
-   * @param clusterData cache of the cluster state
-   */
-  ResourceAssignment computeResourceMapping(final Resource resource,
-      final IdealState currentIdealState, final CurrentStateOutput currentStateOutput,
-      final ClusterDataCache clusterData);
+  public void init(HelixManager helixManager);
+
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster,
+      ResourceCurrentState currentState);
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
new file mode 100644
index 0000000..79e4ba0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
@@ -0,0 +1,94 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Reference to a class that extends {@link Rebalancer}. It loads the class automatically.
+ */
+public class RebalancerRef {
+  private static final Logger LOG = Logger.getLogger(RebalancerRef.class);
+
+  @JsonProperty("rebalancerClassName")
+  private final String _rebalancerClassName;
+
+  @JsonCreator
+  private RebalancerRef(@JsonProperty("rebalancerClassName") String rebalancerClassName) {
+    _rebalancerClassName = rebalancerClassName;
+  }
+
+  /**
+   * Get an instantiated Rebalancer
+   * @return Rebalancer or null if instantiation failed
+   */
+  @JsonIgnore
+  public Rebalancer getRebalancer() {
+    try {
+      return (Rebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
+    } catch (Exception e) {
+      LOG.warn("Exception while invoking custom rebalancer class:" + _rebalancerClassName, e);
+    }
+    return null;
+  }
+
+  @Override
+  public String toString() {
+    return _rebalancerClassName;
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that instanceof RebalancerRef) {
+      return this.toString().equals(((RebalancerRef) that).toString());
+    } else if (that instanceof String) {
+      return this.toString().equals(that);
+    }
+    return false;
+  }
+
+  /**
+   * Get a rebalancer class reference
+   * @param rebalancerClassName name of the class
+   * @return RebalancerRef or null if name is null
+   */
+  public static RebalancerRef from(String rebalancerClassName) {
+    if (rebalancerClassName == null) {
+      return null;
+    }
+    return new RebalancerRef(rebalancerClassName);
+  }
+
+  /**
+   * Get a RebalancerRef from a class object
+   * @param rebalancerClass class that implements Rebalancer
+   * @return RebalancerRef
+   */
+  public static RebalancerRef from(Class<? extends Rebalancer> rebalancerClass) {
+    if (rebalancerClass == null) {
+      return null;
+    }
+    return RebalancerRef.from(rebalancerClass.getName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index dd9fcf1..96e3d4b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -1,5 +1,22 @@
 package org.apache.helix.controller.rebalancer;
 
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,65 +36,48 @@ package org.apache.helix.controller.rebalancer;
  * under the License.
  */
 
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
 /**
- * This is a Rebalancer specific to semi-automatic mode. It is tasked with computing the ideal
- * state of a resource based on a predefined preference list of instances willing to accept
- * replicas.
- * The input is the optional current assignment of partitions to instances, as well as the required
- * existing instance preferences.
- * The output is a mapping based on that preference list, i.e. partition p has a replica on node k
- * with state s.
+ * Rebalancer for the SEMI_AUTO mode. It expects a RebalancerConfig that understands the preferred
+ * locations of each partition replica
  */
-@Deprecated
 public class SemiAutoRebalancer implements Rebalancer {
-
   private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
 
   @Override
-  public void init(HelixManager manager) {
+  public void init(HelixManager helixManager) {
+    // do nothing
   }
 
   @Override
-  public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
-      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
-    String stateModelDefName = currentIdealState.getStateModelDefRef();
-    StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelDefName);
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+      Cluster cluster, ResourceCurrentState currentState) {
+    SemiAutoRebalancerContext config =
+        rebalancerConfig.getRebalancerContext(SemiAutoRebalancerContext.class);
+    StateModelDefinition stateModelDef =
+        cluster.getStateModelMap().get(config.getStateModelDefId());
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + resource.getResourceName());
+      LOG.debug("Processing resource:" + config.getResourceId());
     }
-    ResourceAssignment partitionMapping =
-        new ResourceAssignment(ResourceId.from(resource.getResourceName()));
-    for (Partition partition : resource.getPartitions()) {
-      Map<String, String> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
-      Set<String> disabledInstancesForPartition =
-          clusterData.getDisabledInstancesForPartition(partition.toString());
-      List<String> preferenceList =
-          ConstraintBasedAssignment.getPreferenceList(clusterData, partition, currentIdealState,
-              stateModelDef);
-      Map<String, String> bestStateForPartition =
-          ConstraintBasedAssignment.computeAutoBestStateForPartition(clusterData, stateModelDef,
-              preferenceList, currentStateMap, disabledInstancesForPartition);
-      partitionMapping.addReplicaMap(PartitionId.from(partition.getPartitionName()),
-          ResourceAssignment.replicaMapFromStringMap(bestStateForPartition));
+    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+    for (PartitionId partition : config.getPartitionSet()) {
+      Map<ParticipantId, State> currentStateMap =
+          currentState.getCurrentStateMap(config.getResourceId(), partition);
+      Set<ParticipantId> disabledInstancesForPartition =
+          ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+              partition);
+      List<ParticipantId> preferenceList =
+          ConstraintBasedAssignment.getPreferenceList(cluster, partition,
+              config.getPreferenceList(partition));
+      Map<State, String> upperBounds =
+          ConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
+              cluster.getConfig());
+      Map<ParticipantId, State> bestStateForPartition =
+          ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
+              .getLiveParticipantMap().keySet(), stateModelDef, preferenceList, currentStateMap,
+              disabledInstancesForPartition);
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
new file mode 100644
index 0000000..ec765d7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
@@ -0,0 +1,240 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.Set;
+
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Abstract RebalancerContext that functions for generic subunits. Use a subclass that more
+ * concretely defines the subunits.
+ */
+public abstract class BasicRebalancerContext implements RebalancerContext {
+  private ResourceId _resourceId;
+  private StateModelDefId _stateModelDefId;
+  private StateModelFactoryId _stateModelFactoryId;
+  private String _participantGroupTag;
+  private Class<? extends ContextSerializer> _serializer;
+  private RebalancerRef _rebalancerRef;
+
+  /**
+   * Instantiate a basic rebalancer context
+   */
+  public BasicRebalancerContext() {
+    _serializer = DefaultContextSerializer.class;
+  }
+
+  @Override
+  public ResourceId getResourceId() {
+    return _resourceId;
+  }
+
+  /**
+   * Set the resource to rebalance
+   * @param resourceId resource id
+   */
+  public void setResourceId(ResourceId resourceId) {
+    _resourceId = resourceId;
+  }
+
+  @Override
+  public StateModelDefId getStateModelDefId() {
+    return _stateModelDefId;
+  }
+
+  /**
+   * Set the state model definition that the resource follows
+   * @param stateModelDefId state model definition id
+   */
+  public void setStateModelDefId(StateModelDefId stateModelDefId) {
+    _stateModelDefId = stateModelDefId;
+  }
+
+  @Override
+  public StateModelFactoryId getStateModelFactoryId() {
+    return _stateModelFactoryId;
+  }
+
+  /**
+   * Set the state model factory that the resource uses
+   * @param stateModelFactoryId state model factory id
+   */
+  public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+    _stateModelFactoryId = stateModelFactoryId;
+  }
+
+  @Override
+  public String getParticipantGroupTag() {
+    return _participantGroupTag;
+  }
+
+  /**
+   * Set a tag that participants must have in order to serve this resource
+   * @param participantGroupTag string group tag
+   */
+  public void setParticipantGroupTag(String participantGroupTag) {
+    _participantGroupTag = participantGroupTag;
+  }
+
+  /**
+   * Get the serializer. If none is provided, {@link DefaultContextSerializer} is used
+   */
+  @Override
+  public Class<? extends ContextSerializer> getSerializerClass() {
+    return _serializer;
+  }
+
+  /**
+   * Set the class that can serialize this context
+   * @param serializer serializer class that implements ContextSerializer
+   */
+  public void setSerializerClass(Class<? extends ContextSerializer> serializer) {
+    _serializer = serializer;
+  }
+
+  @Override
+  @JsonIgnore
+  public Set<? extends PartitionId> getSubUnitIdSet() {
+    return getSubUnitMap().keySet();
+  }
+
+  @Override
+  @JsonIgnore
+  public Partition getSubUnit(PartitionId subUnitId) {
+    return getSubUnitMap().get(subUnitId);
+  }
+
+  @Override
+  public RebalancerRef getRebalancerRef() {
+    return _rebalancerRef;
+  }
+
+  /**
+   * Set the reference to the class used to rebalance this resource
+   * @param rebalancerRef RebalancerRef instance
+   */
+  public void setRebalancerRef(RebalancerRef rebalancerRef) {
+    _rebalancerRef = rebalancerRef;
+  }
+
+  /**
+   * Abstract builder for the base rebalancer context
+   */
+  public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
+    private final ResourceId _resourceId;
+    private StateModelDefId _stateModelDefId;
+    private StateModelFactoryId _stateModelFactoryId;
+    private String _participantGroupTag;
+    private Class<? extends ContextSerializer> _serializerClass;
+    private RebalancerRef _rebalancerRef;
+
+    /**
+     * Instantiate with a resource id
+     * @param resourceId resource id
+     */
+    public AbstractBuilder(ResourceId resourceId) {
+      _resourceId = resourceId;
+      _serializerClass = DefaultContextSerializer.class;
+    }
+
+    /**
+     * Set the state model definition that the resource should follow
+     * @param stateModelDefId state model definition id
+     * @return Builder
+     */
+    public T stateModelDefId(StateModelDefId stateModelDefId) {
+      _stateModelDefId = stateModelDefId;
+      return self();
+    }
+
+    /**
+     * Set the state model factory that the resource should use
+     * @param stateModelFactoryId state model factory id
+     * @return Builder
+     */
+    public T stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+      _stateModelFactoryId = stateModelFactoryId;
+      return self();
+    }
+
+    /**
+     * Set the tag that all participants require in order to serve this resource
+     * @param participantGroupTag the tag
+     * @return Builder
+     */
+    public T participantGroupTag(String participantGroupTag) {
+      _participantGroupTag = participantGroupTag;
+      return self();
+    }
+
+    /**
+     * Set the serializer class for this rebalancer context
+     * @param serializerClass class that implements ContextSerializer
+     * @return Builder
+     */
+    public T serializerClass(Class<? extends ContextSerializer> serializerClass) {
+      _serializerClass = serializerClass;
+      return self();
+    }
+
+    /**
+     * Specify a custom class to use for rebalancing
+     * @param rebalancerRef RebalancerRef instance
+     * @return Builder
+     */
+    public T rebalancerRef(RebalancerRef rebalancerRef) {
+      _rebalancerRef = rebalancerRef;
+      return self();
+    }
+
+    /**
+     * Update an existing context with base fields
+     * @param context derived context
+     */
+    protected final void update(BasicRebalancerContext context) {
+      context.setResourceId(_resourceId);
+      context.setStateModelDefId(_stateModelDefId);
+      context.setStateModelFactoryId(_stateModelFactoryId);
+      context.setParticipantGroupTag(_participantGroupTag);
+      context.setSerializerClass(_serializerClass);
+      context.setRebalancerRef(_rebalancerRef);
+    }
+
+    /**
+     * Get a typed reference to "this" class. Final derived classes should simply return the this
+     * reference.
+     * @return this for the most specific type
+     */
+    protected abstract T self();
+
+    /**
+     * Get the rebalancer context from the built fields
+     * @return RebalancerContext
+     */
+    public abstract RebalancerContext build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java
new file mode 100644
index 0000000..ef12a09
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java
@@ -0,0 +1,37 @@
+package org.apache.helix.controller.rebalancer.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public interface ContextSerializer {
+  /**
+   * Convert a RebalancerContext object instance to a String
+   * @param data instance of the rebalancer context type
+   * @return String representing the object
+   */
+  public <T> String serialize(final T data);
+
+  /**
+   * Convert raw bytes to a generic object instance
+   * @param clazz The class represented by the deserialized string
+   * @param string String representing the object
+   * @return instance of the generic type or null if the conversion failed
+   */
+  public <T> T deserialize(final Class<T> clazz, final String string);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
new file mode 100644
index 0000000..1fc1cda
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
@@ -0,0 +1,163 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.CustomRebalancer;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerContext for a resource that should be rebalanced in CUSTOMIZED mode. By default, it
+ * corresponds to {@link CustomRebalancer}
+ */
+public class CustomRebalancerContext extends PartitionedRebalancerContext {
+  private Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
+
+  /**
+   * Instantiate a CustomRebalancerContext
+   */
+  public CustomRebalancerContext() {
+    super(RebalanceMode.CUSTOMIZED);
+    setRebalancerRef(RebalancerRef.from(CustomRebalancer.class));
+    _preferenceMaps = Maps.newHashMap();
+  }
+
+  /**
+   * Get the preference maps of the partitions and replicas of the resource
+   * @return map of partition to participant and state
+   */
+  public Map<PartitionId, Map<ParticipantId, State>> getPreferenceMaps() {
+    return _preferenceMaps;
+  }
+
+  /**
+   * Set the preference maps of the partitions and replicas of the resource
+   * @param preferenceMaps map of partition to participant and state
+   */
+  public void setPreferenceMaps(Map<PartitionId, Map<ParticipantId, State>> preferenceMaps) {
+    _preferenceMaps = preferenceMaps;
+  }
+
+  /**
+   * Get the preference map of a partition
+   * @param partitionId the partition to look up
+   * @return map of participant to state
+   */
+  @JsonIgnore
+  public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
+    return _preferenceMaps.get(partitionId);
+  }
+
+  /**
+   * Generate preference maps based on a default cluster setup
+   * @param stateModelDef the state model definition to follow
+   * @param participantSet the set of participant ids to configure for
+   */
+  @Override
+  @JsonIgnore
+  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+      Set<ParticipantId> participantSet) {
+    // compute default upper bounds
+    Map<State, String> upperBounds = Maps.newHashMap();
+    for (State state : stateModelDef.getTypedStatesPriorityList()) {
+      upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
+    }
+
+    // determine the current mapping
+    Map<PartitionId, Map<ParticipantId, State>> currentMapping = getPreferenceMaps();
+
+    // determine the preference maps
+    LinkedHashMap<State, Integer> stateCounts =
+        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
+            getReplicaCount());
+    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+    List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
+    List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
+    AutoRebalanceStrategy strategy =
+        new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
+            getMaxPartitionsPerParticipant(), placementScheme);
+    Map<String, Map<String, String>> rawPreferenceMaps =
+        strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
+            .getMapFields();
+    Map<PartitionId, Map<ParticipantId, State>> preferenceMaps =
+        Maps.newHashMap(ResourceAssignment.replicaMapsFromStringMaps(rawPreferenceMaps));
+    setPreferenceMaps(preferenceMaps);
+  }
+
+  /**
+   * Build a CustomRebalancerContext. By default, it corresponds to {@link CustomRebalancer}
+   */
+  public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
+    private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
+
+    /**
+     * Instantiate for a resource
+     * @param resourceId resource id
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+      super.rebalancerRef(RebalancerRef.from(CustomRebalancer.class));
+      _preferenceMaps = Maps.newHashMap();
+    }
+
+    /**
+     * Add a preference map for a partition
+     * @param partitionId partition to set
+     * @param preferenceList map of participant id to state indicating where replicas are served
+     * @return Builder
+     */
+    public Builder preferenceMap(PartitionId partitionId, Map<ParticipantId, State> preferenceMap) {
+      _preferenceMaps.put(partitionId, preferenceMap);
+      return self();
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    @Override
+    public CustomRebalancerContext build() {
+      CustomRebalancerContext context = new CustomRebalancerContext();
+      super.update(context);
+      context.setPreferenceMaps(_preferenceMaps);
+      return context;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java
new file mode 100644
index 0000000..ecc93fb
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java
@@ -0,0 +1,83 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.io.ByteArrayInputStream;
+import java.io.StringWriter;
+
+import org.apache.helix.HelixException;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Default serializer implementation for RebalancerContexts. Uses the Jackson JSON library to
+ * convert to and from strings
+ */
+public class DefaultContextSerializer implements ContextSerializer {
+
+  private static Logger logger = Logger.getLogger(DefaultContextSerializer.class);
+
+  @Override
+  public <T> String serialize(final T data) {
+    if (data == null) {
+      return null;
+    }
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+    serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+    serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    StringWriter sw = new StringWriter();
+    try {
+      mapper.writeValue(sw, data);
+    } catch (Exception e) {
+      logger.error("Exception during payload data serialization.", e);
+      throw new HelixException(e);
+    }
+    return sw.toString();
+  }
+
+  @Override
+  public <T> T deserialize(final Class<T> clazz, final String string) {
+    if (string == null || string.length() == 0) {
+      return null;
+    }
+
+    ObjectMapper mapper = new ObjectMapper();
+    ByteArrayInputStream bais = new ByteArrayInputStream(string.getBytes());
+
+    DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_CREATORS, true);
+    deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
+    deserializationConfig.set(DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    try {
+      T payload = mapper.readValue(bais, clazz);
+      return payload;
+    } catch (Exception e) {
+      logger.error("Exception during deserialization of payload bytes: " + string, e);
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
new file mode 100644
index 0000000..2db9ac6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
@@ -0,0 +1,63 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.model.IdealState.RebalanceMode;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerContext for FULL_AUTO rebalancing mode. By default, it corresponds to
+ * {@link FullAutoRebalancer}
+ */
+public class FullAutoRebalancerContext extends PartitionedRebalancerContext {
+  public FullAutoRebalancerContext() {
+    super(RebalanceMode.FULL_AUTO);
+    setRebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
+  }
+
+  /**
+   * Builder for a full auto rebalancer context. By default, it corresponds to
+   * {@link FullAutoRebalancer}
+   */
+  public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
+    /**
+     * Instantiate with a resource
+     * @param resourceId resource id
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+      super.rebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    @Override
+    public FullAutoRebalancerContext build() {
+      FullAutoRebalancerContext context = new FullAutoRebalancerContext();
+      super.update(context);
+      return context;
+    }
+  }
+}


[4/4] git commit: [HELIX-209] Moving rebalancer code around, take 2

Posted by ka...@apache.org.
[HELIX-209] Moving rebalancer code around, take 2


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/e032132a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/e032132a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/e032132a

Branch: refs/heads/helix-logical-model
Commit: e032132a6232bb9c66f00dc1d75f4cd61b6f91b1
Parents: 9f229c8
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Oct 15 17:09:14 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Oct 15 17:09:14 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/api/Resource.java     |   4 +-
 .../api/accessor/AtomicResourceAccessor.java    |   2 +-
 .../helix/api/accessor/ClusterAccessor.java     |   4 +-
 .../helix/api/accessor/ParticipantAccessor.java |   4 +-
 .../helix/api/accessor/ResourceAccessor.java    |  10 +-
 .../apache/helix/api/config/ResourceConfig.java |   4 +-
 .../api/rebalancer/BasicRebalancerContext.java  | 239 ------------
 .../helix/api/rebalancer/ContextSerializer.java |  37 --
 .../helix/api/rebalancer/CustomRebalancer.java  | 123 ------
 .../api/rebalancer/CustomRebalancerContext.java | 161 --------
 .../rebalancer/DefaultContextSerializer.java    |  83 -----
 .../api/rebalancer/FullAutoRebalancer.java      | 194 ----------
 .../rebalancer/FullAutoRebalancerContext.java   |  61 ---
 .../PartitionedRebalancerContext.java           | 372 ------------------
 .../apache/helix/api/rebalancer/Rebalancer.java |  38 --
 .../helix/api/rebalancer/RebalancerConfig.java  | 177 ---------
 .../helix/api/rebalancer/RebalancerContext.java |  93 -----
 .../helix/api/rebalancer/RebalancerRef.java     |  94 -----
 .../rebalancer/ReplicatedRebalancerContext.java |  40 --
 .../api/rebalancer/SemiAutoRebalancer.java      |  81 ----
 .../rebalancer/SemiAutoRebalancerContext.java   | 176 ---------
 .../util/ConstraintBasedAssignment.java         | 244 ------------
 .../controller/rebalancer/AutoRebalancer.java   | 187 ----------
 .../controller/rebalancer/CustomRebalancer.java | 142 ++++---
 .../rebalancer/FullAutoRebalancer.java          | 196 ++++++++++
 .../helix/controller/rebalancer/Rebalancer.java |  41 +-
 .../controller/rebalancer/RebalancerRef.java    |  94 +++++
 .../rebalancer/SemiAutoRebalancer.java          |  94 ++---
 .../context/BasicRebalancerContext.java         | 240 ++++++++++++
 .../rebalancer/context/ContextSerializer.java   |  37 ++
 .../context/CustomRebalancerContext.java        | 163 ++++++++
 .../context/DefaultContextSerializer.java       |  83 +++++
 .../context/FullAutoRebalancerContext.java      |  63 ++++
 .../context/PartitionedRebalancerContext.java   | 373 +++++++++++++++++++
 .../rebalancer/context/RebalancerConfig.java    | 178 +++++++++
 .../rebalancer/context/RebalancerContext.java   |  94 +++++
 .../context/ReplicatedRebalancerContext.java    |  40 ++
 .../context/SemiAutoRebalancerContext.java      | 178 +++++++++
 .../util/ConstraintBasedAssignment.java         | 182 +++++----
 .../stages/BestPossibleStateCalcStage.java      |   8 +-
 .../stages/ExternalViewComputeStage.java        |   4 +-
 .../stages/MessageGenerationStage.java          |   2 +-
 .../stages/MessageSelectionStage.java           |   6 +-
 .../stages/RebalanceIdealStateStage.java        |   2 +-
 .../stages/ResourceComputationStage.java        |   6 +-
 .../controller/rebalancer/AutoRebalancer.java   | 187 ++++++++++
 .../controller/rebalancer/CustomRebalancer.java | 135 +++++++
 .../controller/rebalancer/Rebalancer.java       |  58 +++
 .../rebalancer/SemiAutoRebalancer.java          |  83 +++++
 .../util/ConstraintBasedAssignment.java         | 194 ++++++++++
 .../java/org/apache/helix/model/IdealState.java |   2 +-
 .../org/apache/helix/tools/NewClusterSetup.java |   8 +-
 .../org/apache/helix/api/TestNewStages.java     |   2 +-
 .../org/apache/helix/api/TestUpdateConfig.java  |   4 +-
 .../context/TestSerializeRebalancerContext.java |   3 -
 .../helix/controller/stages/BaseStageTest.java  |   4 +-
 .../stages/TestResourceComputationStage.java    |   2 +-
 .../strategy/TestAutoRebalanceStrategy.java     |   2 +-
 .../strategy/TestNewAutoRebalanceStrategy.java  |   2 +-
 .../TestCustomizedIdealStateRebalancer.java     |   6 +-
 .../apache/helix/examples/NewModelExample.java  |   2 +-
 61 files changed, 2681 insertions(+), 2667 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index a505aeb..79a1e09 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -30,8 +30,8 @@ import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
-import org.apache.helix.api.rebalancer.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
index 74b6bdb..6d69981 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
@@ -6,7 +6,7 @@ import org.apache.helix.api.Scope;
 import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.rebalancer.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.lock.HelixLock;
 import org.apache.helix.lock.HelixLockable;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index ed6c844..85b8432 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -51,8 +51,8 @@ import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SessionId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
-import org.apache.helix.api.rebalancer.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConfiguration;
 import org.apache.helix.model.ClusterConstraints;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index ff24ee9..83dd53e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -51,8 +51,8 @@ import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SessionId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.PartitionedRebalancerContext;
-import org.apache.helix.api.rebalancer.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index b959f17..517c8c4 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -38,11 +38,11 @@ import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.rebalancer.CustomRebalancerContext;
-import org.apache.helix.api.rebalancer.PartitionedRebalancerContext;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
-import org.apache.helix.api.rebalancer.RebalancerContext;
-import org.apache.helix.api.rebalancer.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
index ec1d8f6..38d48ab 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
@@ -7,8 +7,8 @@ import org.apache.helix.api.Partition;
 import org.apache.helix.api.Scope;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
-import org.apache.helix.api.rebalancer.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 
 import com.google.common.collect.Sets;
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/BasicRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/BasicRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/BasicRebalancerContext.java
deleted file mode 100644
index b346cda..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/BasicRebalancerContext.java
+++ /dev/null
@@ -1,239 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import java.util.Set;
-
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.id.StateModelFactoryId;
-import org.codehaus.jackson.annotate.JsonIgnore;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Abstract RebalancerContext that functions for generic subunits. Use a subclass that more
- * concretely defines the subunits.
- */
-public abstract class BasicRebalancerContext implements RebalancerContext {
-  private ResourceId _resourceId;
-  private StateModelDefId _stateModelDefId;
-  private StateModelFactoryId _stateModelFactoryId;
-  private String _participantGroupTag;
-  private Class<? extends ContextSerializer> _serializer;
-  private RebalancerRef _rebalancerRef;
-
-  /**
-   * Instantiate a basic rebalancer context
-   */
-  public BasicRebalancerContext() {
-    _serializer = DefaultContextSerializer.class;
-  }
-
-  @Override
-  public ResourceId getResourceId() {
-    return _resourceId;
-  }
-
-  /**
-   * Set the resource to rebalance
-   * @param resourceId resource id
-   */
-  public void setResourceId(ResourceId resourceId) {
-    _resourceId = resourceId;
-  }
-
-  @Override
-  public StateModelDefId getStateModelDefId() {
-    return _stateModelDefId;
-  }
-
-  /**
-   * Set the state model definition that the resource follows
-   * @param stateModelDefId state model definition id
-   */
-  public void setStateModelDefId(StateModelDefId stateModelDefId) {
-    _stateModelDefId = stateModelDefId;
-  }
-
-  @Override
-  public StateModelFactoryId getStateModelFactoryId() {
-    return _stateModelFactoryId;
-  }
-
-  /**
-   * Set the state model factory that the resource uses
-   * @param stateModelFactoryId state model factory id
-   */
-  public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
-    _stateModelFactoryId = stateModelFactoryId;
-  }
-
-  @Override
-  public String getParticipantGroupTag() {
-    return _participantGroupTag;
-  }
-
-  /**
-   * Set a tag that participants must have in order to serve this resource
-   * @param participantGroupTag string group tag
-   */
-  public void setParticipantGroupTag(String participantGroupTag) {
-    _participantGroupTag = participantGroupTag;
-  }
-
-  /**
-   * Get the serializer. If none is provided, {@link DefaultContextSerializer} is used
-   */
-  @Override
-  public Class<? extends ContextSerializer> getSerializerClass() {
-    return _serializer;
-  }
-
-  /**
-   * Set the class that can serialize this context
-   * @param serializer serializer class that implements ContextSerializer
-   */
-  public void setSerializerClass(Class<? extends ContextSerializer> serializer) {
-    _serializer = serializer;
-  }
-
-  @Override
-  @JsonIgnore
-  public Set<? extends PartitionId> getSubUnitIdSet() {
-    return getSubUnitMap().keySet();
-  }
-
-  @Override
-  @JsonIgnore
-  public Partition getSubUnit(PartitionId subUnitId) {
-    return getSubUnitMap().get(subUnitId);
-  }
-
-  @Override
-  public RebalancerRef getRebalancerRef() {
-    return _rebalancerRef;
-  }
-
-  /**
-   * Set the reference to the class used to rebalance this resource
-   * @param rebalancerRef RebalancerRef instance
-   */
-  public void setRebalancerRef(RebalancerRef rebalancerRef) {
-    _rebalancerRef = rebalancerRef;
-  }
-
-  /**
-   * Abstract builder for the base rebalancer context
-   */
-  public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
-    private final ResourceId _resourceId;
-    private StateModelDefId _stateModelDefId;
-    private StateModelFactoryId _stateModelFactoryId;
-    private String _participantGroupTag;
-    private Class<? extends ContextSerializer> _serializerClass;
-    private RebalancerRef _rebalancerRef;
-
-    /**
-     * Instantiate with a resource id
-     * @param resourceId resource id
-     */
-    public AbstractBuilder(ResourceId resourceId) {
-      _resourceId = resourceId;
-      _serializerClass = DefaultContextSerializer.class;
-    }
-
-    /**
-     * Set the state model definition that the resource should follow
-     * @param stateModelDefId state model definition id
-     * @return Builder
-     */
-    public T stateModelDefId(StateModelDefId stateModelDefId) {
-      _stateModelDefId = stateModelDefId;
-      return self();
-    }
-
-    /**
-     * Set the state model factory that the resource should use
-     * @param stateModelFactoryId state model factory id
-     * @return Builder
-     */
-    public T stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
-      _stateModelFactoryId = stateModelFactoryId;
-      return self();
-    }
-
-    /**
-     * Set the tag that all participants require in order to serve this resource
-     * @param participantGroupTag the tag
-     * @return Builder
-     */
-    public T participantGroupTag(String participantGroupTag) {
-      _participantGroupTag = participantGroupTag;
-      return self();
-    }
-
-    /**
-     * Set the serializer class for this rebalancer context
-     * @param serializerClass class that implements ContextSerializer
-     * @return Builder
-     */
-    public T serializerClass(Class<? extends ContextSerializer> serializerClass) {
-      _serializerClass = serializerClass;
-      return self();
-    }
-
-    /**
-     * Specify a custom class to use for rebalancing
-     * @param rebalancerRef RebalancerRef instance
-     * @return Builder
-     */
-    public T rebalancerRef(RebalancerRef rebalancerRef) {
-      _rebalancerRef = rebalancerRef;
-      return self();
-    }
-
-    /**
-     * Update an existing context with base fields
-     * @param context derived context
-     */
-    protected final void update(BasicRebalancerContext context) {
-      context.setResourceId(_resourceId);
-      context.setStateModelDefId(_stateModelDefId);
-      context.setStateModelFactoryId(_stateModelFactoryId);
-      context.setParticipantGroupTag(_participantGroupTag);
-      context.setSerializerClass(_serializerClass);
-      context.setRebalancerRef(_rebalancerRef);
-    }
-
-    /**
-     * Get a typed reference to "this" class. Final derived classes should simply return the this
-     * reference.
-     * @return this for the most specific type
-     */
-    protected abstract T self();
-
-    /**
-     * Get the rebalancer context from the built fields
-     * @return RebalancerContext
-     */
-    public abstract RebalancerContext build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/ContextSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/ContextSerializer.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/ContextSerializer.java
deleted file mode 100644
index 10e3ba9..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/ContextSerializer.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public interface ContextSerializer {
-  /**
-   * Convert a RebalancerContext object instance to a String
-   * @param data instance of the rebalancer context type
-   * @return String representing the object
-   */
-  public <T> String serialize(final T data);
-
-  /**
-   * Convert raw bytes to a generic object instance
-   * @param clazz The class represented by the deserialized string
-   * @param string String representing the object
-   * @return instance of the generic type or null if the conversion failed
-   */
-  public <T> T deserialize(final Class<T> clazz, final String string);
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/CustomRebalancer.java
deleted file mode 100644
index c5e4e80..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/CustomRebalancer.java
+++ /dev/null
@@ -1,123 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public class CustomRebalancer implements Rebalancer {
-
-  private static final Logger LOG = Logger.getLogger(CustomRebalancer.class);
-
-  @Override
-  public void init(HelixManager helixManager) {
-    // do nothing
-  }
-
-  @Override
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      Cluster cluster, ResourceCurrentState currentState) {
-    CustomRebalancerContext config =
-        rebalancerConfig.getRebalancerContext(CustomRebalancerContext.class);
-    StateModelDefinition stateModelDef =
-        cluster.getStateModelMap().get(config.getStateModelDefId());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + config.getResourceId());
-    }
-    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
-    for (PartitionId partition : config.getPartitionSet()) {
-      Map<ParticipantId, State> currentStateMap =
-          currentState.getCurrentStateMap(config.getResourceId(), partition);
-      Set<ParticipantId> disabledInstancesForPartition =
-          ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
-              partition);
-      Map<ParticipantId, State> bestStateForPartition =
-          computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap().keySet(),
-              stateModelDef, config.getPreferenceMap(partition), currentStateMap,
-              disabledInstancesForPartition);
-      partitionMapping.addReplicaMap(partition, bestStateForPartition);
-    }
-    return partitionMapping;
-  }
-
-  /**
-   * compute best state for resource in CUSTOMIZED rebalancer mode
-   * @param liveParticipantMap
-   * @param stateModelDef
-   * @param preferenceMap
-   * @param currentStateMap
-   * @param disabledParticipantsForPartition
-   * @return
-   */
-  private Map<ParticipantId, State> computeCustomizedBestStateForPartition(
-      Set<ParticipantId> liveParticipantSet, StateModelDefinition stateModelDef,
-      Map<ParticipantId, State> preferenceMap, Map<ParticipantId, State> currentStateMap,
-      Set<ParticipantId> disabledParticipantsForPartition) {
-    Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
-
-    // if the resource is deleted, idealStateMap will be null/empty and
-    // we should drop all resources.
-    if (currentStateMap != null) {
-      for (ParticipantId participantId : currentStateMap.keySet()) {
-        if ((preferenceMap == null || !preferenceMap.containsKey(participantId))
-            && !disabledParticipantsForPartition.contains(participantId)) {
-          // if dropped and not disabled, transit to DROPPED
-          participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
-        } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
-            participantId).equals(State.from(HelixDefinedState.ERROR)))
-            && disabledParticipantsForPartition.contains(participantId)) {
-          // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
-          participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
-        }
-      }
-    }
-
-    // ideal state is deleted
-    if (preferenceMap == null) {
-      return participantStateMap;
-    }
-
-    for (ParticipantId participantId : preferenceMap.keySet()) {
-      boolean notInErrorState =
-          currentStateMap == null || currentStateMap.get(participantId) == null
-              || !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
-
-      if (liveParticipantSet.contains(participantId) && notInErrorState
-          && !disabledParticipantsForPartition.contains(participantId)) {
-        participantStateMap.put(participantId, preferenceMap.get(participantId));
-      }
-    }
-
-    return participantStateMap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/CustomRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/CustomRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/CustomRebalancerContext.java
deleted file mode 100644
index 9d5b7e8..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/CustomRebalancerContext.java
+++ /dev/null
@@ -1,161 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for a resource that should be rebalanced in CUSTOMIZED mode. By default, it
- * corresponds to {@link CustomRebalancer}
- */
-public class CustomRebalancerContext extends PartitionedRebalancerContext {
-  private Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
-
-  /**
-   * Instantiate a CustomRebalancerContext
-   */
-  public CustomRebalancerContext() {
-    super(RebalanceMode.CUSTOMIZED);
-    setRebalancerRef(RebalancerRef.from(CustomRebalancer.class));
-    _preferenceMaps = Maps.newHashMap();
-  }
-
-  /**
-   * Get the preference maps of the partitions and replicas of the resource
-   * @return map of partition to participant and state
-   */
-  public Map<PartitionId, Map<ParticipantId, State>> getPreferenceMaps() {
-    return _preferenceMaps;
-  }
-
-  /**
-   * Set the preference maps of the partitions and replicas of the resource
-   * @param preferenceMaps map of partition to participant and state
-   */
-  public void setPreferenceMaps(Map<PartitionId, Map<ParticipantId, State>> preferenceMaps) {
-    _preferenceMaps = preferenceMaps;
-  }
-
-  /**
-   * Get the preference map of a partition
-   * @param partitionId the partition to look up
-   * @return map of participant to state
-   */
-  @JsonIgnore
-  public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
-    return _preferenceMaps.get(partitionId);
-  }
-
-  /**
-   * Generate preference maps based on a default cluster setup
-   * @param stateModelDef the state model definition to follow
-   * @param participantSet the set of participant ids to configure for
-   */
-  @Override
-  @JsonIgnore
-  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
-      Set<ParticipantId> participantSet) {
-    // compute default upper bounds
-    Map<State, String> upperBounds = Maps.newHashMap();
-    for (State state : stateModelDef.getTypedStatesPriorityList()) {
-      upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
-    }
-
-    // determine the current mapping
-    Map<PartitionId, Map<ParticipantId, State>> currentMapping = getPreferenceMaps();
-
-    // determine the preference maps
-    LinkedHashMap<State, Integer> stateCounts =
-        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
-            getReplicaCount());
-    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
-    List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
-    List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
-    AutoRebalanceStrategy strategy =
-        new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
-            getMaxPartitionsPerParticipant(), placementScheme);
-    Map<String, Map<String, String>> rawPreferenceMaps =
-        strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
-            .getMapFields();
-    Map<PartitionId, Map<ParticipantId, State>> preferenceMaps =
-        Maps.newHashMap(ResourceAssignment.replicaMapsFromStringMaps(rawPreferenceMaps));
-    setPreferenceMaps(preferenceMaps);
-  }
-
-  /**
-   * Build a CustomRebalancerContext. By default, it corresponds to {@link CustomRebalancer}
-   */
-  public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
-    private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
-
-    /**
-     * Instantiate for a resource
-     * @param resourceId resource id
-     */
-    public Builder(ResourceId resourceId) {
-      super(resourceId);
-      super.rebalancerRef(RebalancerRef.from(CustomRebalancer.class));
-      _preferenceMaps = Maps.newHashMap();
-    }
-
-    /**
-     * Add a preference map for a partition
-     * @param partitionId partition to set
-     * @param preferenceList map of participant id to state indicating where replicas are served
-     * @return Builder
-     */
-    public Builder preferenceMap(PartitionId partitionId, Map<ParticipantId, State> preferenceMap) {
-      _preferenceMaps.put(partitionId, preferenceMap);
-      return self();
-    }
-
-    @Override
-    protected Builder self() {
-      return this;
-    }
-
-    @Override
-    public CustomRebalancerContext build() {
-      CustomRebalancerContext context = new CustomRebalancerContext();
-      super.update(context);
-      context.setPreferenceMaps(_preferenceMaps);
-      return context;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/DefaultContextSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/DefaultContextSerializer.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/DefaultContextSerializer.java
deleted file mode 100644
index 64e1f8b..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/DefaultContextSerializer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import java.io.ByteArrayInputStream;
-import java.io.StringWriter;
-
-import org.apache.helix.HelixException;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Default serializer implementation for RebalancerContexts. Uses the Jackson JSON library to
- * convert to and from strings
- */
-public class DefaultContextSerializer implements ContextSerializer {
-
-  private static Logger logger = Logger.getLogger(DefaultContextSerializer.class);
-
-  @Override
-  public <T> String serialize(final T data) {
-    if (data == null) {
-      return null;
-    }
-
-    ObjectMapper mapper = new ObjectMapper();
-    SerializationConfig serializationConfig = mapper.getSerializationConfig();
-    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-    serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
-    serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
-    StringWriter sw = new StringWriter();
-    try {
-      mapper.writeValue(sw, data);
-    } catch (Exception e) {
-      logger.error("Exception during payload data serialization.", e);
-      throw new HelixException(e);
-    }
-    return sw.toString();
-  }
-
-  @Override
-  public <T> T deserialize(final Class<T> clazz, final String string) {
-    if (string == null || string.length() == 0) {
-      return null;
-    }
-
-    ObjectMapper mapper = new ObjectMapper();
-    ByteArrayInputStream bais = new ByteArrayInputStream(string.getBytes());
-
-    DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
-    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
-    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
-    deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_CREATORS, true);
-    deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
-    deserializationConfig.set(DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
-    try {
-      T payload = mapper.readValue(bais, clazz);
-      return payload;
-    } catch (Exception e) {
-      logger.error("Exception during deserialization of payload bytes: " + string, e);
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/FullAutoRebalancer.java
deleted file mode 100644
index f5a5abe..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/FullAutoRebalancer.java
+++ /dev/null
@@ -1,194 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public class FullAutoRebalancer implements Rebalancer {
-  // These should be final, but are initialized in init rather than a constructor
-  private AutoRebalanceStrategy _algorithm;
-
-  private static Logger LOG = Logger.getLogger(FullAutoRebalancer.class);
-
-  @Override
-  public void init(HelixManager helixManager) {
-    // do nothing
-  }
-
-  @Override
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      Cluster cluster, ResourceCurrentState currentState) {
-    FullAutoRebalancerContext config =
-        rebalancerConfig.getRebalancerContext(FullAutoRebalancerContext.class);
-    StateModelDefinition stateModelDef =
-        cluster.getStateModelMap().get(config.getStateModelDefId());
-    // Compute a preference list based on the current ideal state
-    List<PartitionId> partitions = new ArrayList<PartitionId>(config.getPartitionSet());
-    Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
-    Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
-    int replicas = -1;
-    if (config.anyLiveParticipant()) {
-      replicas = liveParticipants.size();
-    } else {
-      replicas = config.getReplicaCount();
-    }
-
-    // count how many replicas should be in each state
-    Map<State, String> upperBounds =
-        ConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
-            cluster.getConfig());
-    LinkedHashMap<State, Integer> stateCountMap =
-        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef,
-            liveParticipants.size(), replicas);
-
-    // get the participant lists
-    List<ParticipantId> liveParticipantList =
-        new ArrayList<ParticipantId>(liveParticipants.keySet());
-    List<ParticipantId> allParticipantList =
-        new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
-
-    // compute the current mapping from the current state
-    Map<PartitionId, Map<ParticipantId, State>> currentMapping =
-        currentMapping(config, currentState, stateCountMap);
-
-    // If there are nodes tagged with resource, use only those nodes
-    Set<ParticipantId> taggedNodes = new HashSet<ParticipantId>();
-    if (config.getParticipantGroupTag() != null) {
-      for (ParticipantId participantId : liveParticipantList) {
-        if (liveParticipants.get(participantId).hasTag(config.getParticipantGroupTag())) {
-          taggedNodes.add(participantId);
-        }
-      }
-    }
-    if (taggedNodes.size() > 0) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("found the following instances with tag " + config.getResourceId() + " "
-            + taggedNodes);
-      }
-      liveParticipantList = new ArrayList<ParticipantId>(taggedNodes);
-    }
-
-    // determine which nodes the replicas should live on
-    int maxPartition = config.getMaxPartitionsPerParticipant();
-    if (LOG.isInfoEnabled()) {
-      LOG.info("currentMapping: " + currentMapping);
-      LOG.info("stateCountMap: " + stateCountMap);
-      LOG.info("liveNodes: " + liveParticipantList);
-      LOG.info("allNodes: " + allParticipantList);
-      LOG.info("maxPartition: " + maxPartition);
-    }
-    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
-    _algorithm =
-        new AutoRebalanceStrategy(config.getResourceId(), partitions, stateCountMap, maxPartition,
-            placementScheme);
-    ZNRecord newMapping =
-        _algorithm.typedComputePartitionAssignment(liveParticipantList, currentMapping,
-            allParticipantList);
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("newMapping: " + newMapping);
-    }
-
-    // compute a full partition mapping for the resource
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + config.getResourceId());
-    }
-    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
-    for (PartitionId partition : partitions) {
-      Set<ParticipantId> disabledParticipantsForPartition =
-          ConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition);
-      List<String> rawPreferenceList = newMapping.getListField(partition.stringify());
-      if (rawPreferenceList == null) {
-        rawPreferenceList = Collections.emptyList();
-      }
-      List<ParticipantId> preferenceList =
-          Lists.transform(rawPreferenceList, new Function<String, ParticipantId>() {
-            @Override
-            public ParticipantId apply(String participantName) {
-              return ParticipantId.from(participantName);
-            }
-          });
-      preferenceList =
-          ConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
-      Map<ParticipantId, State> bestStateForPartition =
-          ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
-              liveParticipants.keySet(), stateModelDef, preferenceList,
-              currentState.getCurrentStateMap(config.getResourceId(), partition),
-              disabledParticipantsForPartition);
-      partitionMapping.addReplicaMap(partition, bestStateForPartition);
-    }
-    return partitionMapping;
-  }
-
-  private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
-      FullAutoRebalancerContext config, ResourceCurrentState currentStateOutput,
-      Map<State, Integer> stateCountMap) {
-    Map<PartitionId, Map<ParticipantId, State>> map =
-        new HashMap<PartitionId, Map<ParticipantId, State>>();
-
-    for (PartitionId partition : config.getPartitionSet()) {
-      Map<ParticipantId, State> curStateMap =
-          currentStateOutput.getCurrentStateMap(config.getResourceId(), partition);
-      map.put(partition, new HashMap<ParticipantId, State>());
-      for (ParticipantId node : curStateMap.keySet()) {
-        State state = curStateMap.get(node);
-        if (stateCountMap.containsKey(state)) {
-          map.get(partition).put(node, state);
-        }
-      }
-
-      Map<ParticipantId, State> pendingStateMap =
-          currentStateOutput.getPendingStateMap(config.getResourceId(), partition);
-      for (ParticipantId node : pendingStateMap.keySet()) {
-        State state = pendingStateMap.get(node);
-        if (stateCountMap.containsKey(state)) {
-          map.get(partition).put(node, state);
-        }
-      }
-    }
-    return map;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/FullAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/FullAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/FullAutoRebalancerContext.java
deleted file mode 100644
index 6a3fe10..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/FullAutoRebalancerContext.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.model.IdealState.RebalanceMode;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for FULL_AUTO rebalancing mode. By default, it corresponds to
- * {@link FullAutoRebalancer}
- */
-public class FullAutoRebalancerContext extends PartitionedRebalancerContext {
-  public FullAutoRebalancerContext() {
-    super(RebalanceMode.FULL_AUTO);
-    setRebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
-  }
-
-  /**
-   * Builder for a full auto rebalancer context. By default, it corresponds to
-   * {@link FullAutoRebalancer}
-   */
-  public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
-    /**
-     * Instantiate with a resource
-     * @param resourceId resource id
-     */
-    public Builder(ResourceId resourceId) {
-      super(resourceId);
-      super.rebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
-    }
-
-    @Override
-    protected Builder self() {
-      return this;
-    }
-
-    @Override
-    public FullAutoRebalancerContext build() {
-      FullAutoRebalancerContext context = new FullAutoRebalancerContext();
-      super.update(context);
-      return context;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/PartitionedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/PartitionedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/PartitionedRebalancerContext.java
deleted file mode 100644
index 311683d..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/PartitionedRebalancerContext.java
+++ /dev/null
@@ -1,372 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixConstants.StateModelToken;
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for a resource whose subunits are partitions. In addition, these partitions can
- * be replicated.
- */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class PartitionedRebalancerContext extends BasicRebalancerContext implements
-    ReplicatedRebalancerContext {
-  private Map<PartitionId, Partition> _partitionMap;
-  private boolean _anyLiveParticipant;
-  private int _replicaCount;
-  private int _maxPartitionsPerParticipant;
-  private final RebalanceMode _rebalanceMode;
-
-  /**
-   * Instantiate a DataRebalancerContext
-   */
-  public PartitionedRebalancerContext(RebalanceMode rebalanceMode) {
-    _partitionMap = Collections.emptyMap();
-    _replicaCount = 1;
-    _anyLiveParticipant = false;
-    _maxPartitionsPerParticipant = Integer.MAX_VALUE;
-    _rebalanceMode = rebalanceMode;
-  }
-
-  /**
-   * Get a map from partition id to partition
-   * @return partition map (mutable)
-   */
-  public Map<PartitionId, Partition> getPartitionMap() {
-    return _partitionMap;
-  }
-
-  /**
-   * Set a map of partition id to partition
-   * @param partitionMap partition map
-   */
-  public void setPartitionMap(Map<PartitionId, Partition> partitionMap) {
-    _partitionMap = Maps.newHashMap(partitionMap);
-  }
-
-  /**
-   * Get the set of partitions for this resource
-   * @return set of partition ids
-   */
-  @JsonIgnore
-  public Set<PartitionId> getPartitionSet() {
-    return _partitionMap.keySet();
-  }
-
-  /**
-   * Get a partition
-   * @param partitionId id of the partition to get
-   * @return Partition object, or null if not present
-   */
-  @JsonIgnore
-  public Partition getPartition(PartitionId partitionId) {
-    return _partitionMap.get(partitionId);
-  }
-
-  @Override
-  public boolean anyLiveParticipant() {
-    return _anyLiveParticipant;
-  }
-
-  /**
-   * Indicate if this resource should be assigned to any live participant
-   * @param anyLiveParticipant true if any live participant expected, false otherwise
-   */
-  public void setAnyLiveParticipant(boolean anyLiveParticipant) {
-    _anyLiveParticipant = anyLiveParticipant;
-  }
-
-  @Override
-  public int getReplicaCount() {
-    return _replicaCount;
-  }
-
-  /**
-   * Set the number of replicas that each partition should have
-   * @param replicaCount
-   */
-  public void setReplicaCount(int replicaCount) {
-    _replicaCount = replicaCount;
-  }
-
-  /**
-   * Get the maximum number of partitions that a participant can serve
-   * @return maximum number of partitions per participant
-   */
-  public int getMaxPartitionsPerParticipant() {
-    return _maxPartitionsPerParticipant;
-  }
-
-  /**
-   * Set the maximum number of partitions that a participant can serve
-   * @param maxPartitionsPerParticipant maximum number of partitions per participant
-   */
-  public void setMaxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
-    _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
-  }
-
-  /**
-   * Get the rebalancer mode of the resource
-   * @return RebalanceMode
-   */
-  public RebalanceMode getRebalanceMode() {
-    return _rebalanceMode;
-  }
-
-  @Override
-  @JsonIgnore
-  public Map<PartitionId, Partition> getSubUnitMap() {
-    return getPartitionMap();
-  }
-
-  /**
-   * Generate a default configuration given the state model and a participant.
-   * @param stateModelDef the state model definition to follow
-   * @param participantSet the set of participant ids to configure for
-   */
-  @JsonIgnore
-  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
-      Set<ParticipantId> participantSet) {
-    // the base context does not understand enough to know do to anything
-  }
-
-  /**
-   * Convert a physically-stored IdealState into a rebalancer context for a partitioned resource
-   * @param idealState populated IdealState
-   * @return PartitionedRebalancerContext
-   */
-  public static PartitionedRebalancerContext from(IdealState idealState) {
-    PartitionedRebalancerContext context;
-    switch (idealState.getRebalanceMode()) {
-    case FULL_AUTO:
-      FullAutoRebalancerContext.Builder fullAutoBuilder =
-          new FullAutoRebalancerContext.Builder(idealState.getResourceId());
-      populateContext(fullAutoBuilder, idealState);
-      context = fullAutoBuilder.build();
-      break;
-    case SEMI_AUTO:
-      SemiAutoRebalancerContext.Builder semiAutoBuilder =
-          new SemiAutoRebalancerContext.Builder(idealState.getResourceId());
-      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
-        semiAutoBuilder.preferenceList(partitionId, idealState.getPreferenceList(partitionId));
-      }
-      populateContext(semiAutoBuilder, idealState);
-      context = semiAutoBuilder.build();
-      break;
-    case CUSTOMIZED:
-      CustomRebalancerContext.Builder customBuilder =
-          new CustomRebalancerContext.Builder(idealState.getResourceId());
-      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
-        customBuilder.preferenceMap(partitionId, idealState.getParticipantStateMap(partitionId));
-      }
-      populateContext(customBuilder, idealState);
-      context = customBuilder.build();
-      break;
-    default:
-      Builder baseBuilder = new Builder(idealState.getResourceId());
-      populateContext(baseBuilder, idealState);
-      context = baseBuilder.build();
-      break;
-    }
-    return context;
-  }
-
-  /**
-   * Update a builder subclass with all the fields of the ideal state
-   * @param builder builder that extends AbstractBuilder
-   * @param idealState populated IdealState
-   */
-  private static <T extends AbstractBuilder<T>> void populateContext(T builder,
-      IdealState idealState) {
-    String replicas = idealState.getReplicas();
-    int replicaCount = 0;
-    boolean anyLiveParticipant = false;
-    if (replicas.equals(StateModelToken.ANY_LIVEINSTANCE.toString())) {
-      anyLiveParticipant = true;
-    } else {
-      replicaCount = Integer.parseInt(replicas);
-    }
-    if (idealState.getNumPartitions() > 0 && idealState.getPartitionIdSet().size() == 0) {
-      // backwards compatibility: partition sets were based on pref lists/maps previously
-      builder.addPartitions(idealState.getNumPartitions());
-    } else {
-      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
-        builder.addPartition(new Partition(partitionId));
-      }
-    }
-    builder.anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
-        .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
-        .participantGroupTag(idealState.getInstanceGroupTag())
-        .stateModelDefId(idealState.getStateModelDefId())
-        .stateModelFactoryId(idealState.getStateModelFactoryId());
-    RebalancerRef rebalancerRef = idealState.getRebalancerRef();
-    if (rebalancerRef != null) {
-      builder.rebalancerRef(rebalancerRef);
-    }
-  }
-
-  /**
-   * Builder for a basic data rebalancer context
-   */
-  public static final class Builder extends AbstractBuilder<Builder> {
-    /**
-     * Instantiate with a resource
-     * @param resourceId resource id
-     */
-    public Builder(ResourceId resourceId) {
-      super(resourceId);
-    }
-
-    @Override
-    protected Builder self() {
-      return this;
-    }
-
-    @Override
-    public PartitionedRebalancerContext build() {
-      PartitionedRebalancerContext context =
-          new PartitionedRebalancerContext(RebalanceMode.USER_DEFINED);
-      super.update(context);
-      return context;
-    }
-  }
-
-  /**
-   * Abstract builder for a generic partitioned resource rebalancer context
-   */
-  public static abstract class AbstractBuilder<T extends BasicRebalancerContext.AbstractBuilder<T>>
-      extends BasicRebalancerContext.AbstractBuilder<T> {
-    private final ResourceId _resourceId;
-    private final Map<PartitionId, Partition> _partitionMap;
-    private boolean _anyLiveParticipant;
-    private int _replicaCount;
-    private int _maxPartitionsPerParticipant;
-
-    /**
-     * Instantiate with a resource
-     * @param resourceId resource id
-     */
-    public AbstractBuilder(ResourceId resourceId) {
-      super(resourceId);
-      _resourceId = resourceId;
-      _partitionMap = Maps.newHashMap();
-      _anyLiveParticipant = false;
-      _replicaCount = 1;
-      _maxPartitionsPerParticipant = Integer.MAX_VALUE;
-    }
-
-    /**
-     * Add a partition that the resource serves
-     * @param partition fully-qualified partition
-     * @return Builder
-     */
-    public T addPartition(Partition partition) {
-      _partitionMap.put(partition.getId(), partition);
-      return self();
-    }
-
-    /**
-     * Add a collection of partitions
-     * @param partitions any collection of Partition objects
-     * @return Builder
-     */
-    public T addPartitions(Collection<Partition> partitions) {
-      for (Partition partition : partitions) {
-        addPartition(partition);
-      }
-      return self();
-    }
-
-    /**
-     * Add a specified number of partitions with a default naming scheme, namely
-     * resourceId_partitionNumber where partitionNumber starts at 0
-     * @param partitionCount number of partitions to add
-     * @return Builder
-     */
-    public T addPartitions(int partitionCount) {
-      for (int i = 0; i < partitionCount; i++) {
-        addPartition(new Partition(PartitionId.from(_resourceId, Integer.toString(i))));
-      }
-      return self();
-    }
-
-    /**
-     * Set whether any live participant should be used in rebalancing
-     * @param anyLiveParticipant true if any live participant can be used, false otherwise
-     * @return Builder
-     */
-    public T anyLiveParticipant(boolean anyLiveParticipant) {
-      _anyLiveParticipant = anyLiveParticipant;
-      return self();
-    }
-
-    /**
-     * Set the number of replicas
-     * @param replicaCount number of replicas
-     * @return Builder
-     */
-    public T replicaCount(int replicaCount) {
-      _replicaCount = replicaCount;
-      return self();
-    }
-
-    /**
-     * Set the maximum number of partitions to assign to any participant
-     * @param maxPartitionsPerParticipant the maximum
-     * @return Builder
-     */
-    public T maxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
-      _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
-      return self();
-    }
-
-    /**
-     * Update a DataRebalancerContext with fields from this builder level
-     * @param context DataRebalancerContext
-     */
-    protected final void update(PartitionedRebalancerContext context) {
-      super.update(context);
-      // enforce at least one partition
-      if (_partitionMap.isEmpty()) {
-        addPartitions(1);
-      }
-      context.setPartitionMap(_partitionMap);
-      context.setAnyLiveParticipant(_anyLiveParticipant);
-      context.setMaxPartitionsPerParticipant(_maxPartitionsPerParticipant);
-      context.setReplicaCount(_replicaCount);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/Rebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/Rebalancer.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/Rebalancer.java
deleted file mode 100644
index e164920..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/Rebalancer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.model.ResourceAssignment;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Allows one to come up with custom implementation of a rebalancer.<br/>
- * This will be invoked on all changes that happen in the cluster.<br/>
- * Simply return the resource assignment for a resource in this method.<br/>
- */
-public interface Rebalancer {
-
-  public void init(HelixManager helixManager);
-
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster,
-      ResourceCurrentState currentState);
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerConfig.java
deleted file mode 100644
index 2c57a29..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerConfig.java
+++ /dev/null
@@ -1,177 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.config.NamespacedConfig;
-import org.apache.helix.model.ResourceConfiguration;
-import org.apache.helix.util.HelixUtil;
-import org.apache.log4j.Logger;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Configuration for a resource rebalancer. This contains a RebalancerContext, which contains
- * information specific to each rebalancer.
- */
-public final class RebalancerConfig {
-  private enum Fields {
-    SERIALIZER_CLASS,
-    REBALANCER_CONTEXT,
-    REBALANCER_CONTEXT_CLASS
-  }
-
-  private static final Logger LOG = Logger.getLogger(RebalancerConfig.class);
-  private ContextSerializer _serializer;
-  private Rebalancer _rebalancer;
-  private final RebalancerContext _context;
-  private final NamespacedConfig _config;
-
-  /**
-   * Instantiate a RebalancerConfig
-   * @param context rebalancer context
-   * @param rebalancerRef reference to the rebalancer class that will be used
-   */
-  public RebalancerConfig(RebalancerContext context) {
-    _config =
-        new NamespacedConfig(Scope.resource(context.getResourceId()),
-            RebalancerConfig.class.getSimpleName());
-    _config.setSimpleField(Fields.SERIALIZER_CLASS.toString(), context.getSerializerClass()
-        .getName());
-    _config
-        .setSimpleField(Fields.REBALANCER_CONTEXT_CLASS.toString(), context.getClass().getName());
-    _context = context;
-    try {
-      _serializer = context.getSerializerClass().newInstance();
-      _config.setSimpleField(Fields.REBALANCER_CONTEXT.toString(), _serializer.serialize(context));
-    } catch (InstantiationException e) {
-      LOG.error("Error initializing the configuration", e);
-    } catch (IllegalAccessException e) {
-      LOG.error("Error initializing the configuration", e);
-    }
-  }
-
-  /**
-   * Instantiate from a physical ResourceConfiguration
-   * @param resourceConfiguration populated ResourceConfiguration
-   */
-  public RebalancerConfig(ResourceConfiguration resourceConfiguration) {
-    _config = new NamespacedConfig(resourceConfiguration, RebalancerConfig.class.getSimpleName());
-    _serializer = getSerializer();
-    _context = getContext();
-  }
-
-  /**
-   * Get the class that can serialize and deserialize the rebalancer context
-   * @return ContextSerializer
-   */
-  private ContextSerializer getSerializer() {
-    String serializerClassName = _config.getSimpleField(Fields.SERIALIZER_CLASS.toString());
-    if (serializerClassName != null) {
-      try {
-        return (ContextSerializer) HelixUtil.loadClass(getClass(), serializerClassName)
-            .newInstance();
-      } catch (InstantiationException e) {
-        LOG.error("Error getting the serializer", e);
-      } catch (IllegalAccessException e) {
-        LOG.error("Error getting the serializer", e);
-      } catch (ClassNotFoundException e) {
-        LOG.error("Error getting the serializer", e);
-      }
-    }
-    return null;
-  }
-
-  private RebalancerContext getContext() {
-    String className = _config.getSimpleField(Fields.REBALANCER_CONTEXT_CLASS.toString());
-    try {
-      Class<? extends RebalancerContext> contextClass =
-          HelixUtil.loadClass(getClass(), className).asSubclass(RebalancerContext.class);
-      String serialized = _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
-      return _serializer.deserialize(contextClass, serialized);
-    } catch (ClassNotFoundException e) {
-      LOG.error(className + " is not a valid class");
-    } catch (ClassCastException e) {
-      LOG.error(className + " does not implement RebalancerContext");
-    }
-    return null;
-  }
-
-  /**
-   * Get a rebalancer class instance
-   * @return Rebalancer
-   */
-  public Rebalancer getRebalancer() {
-    // cache the rebalancer to avoid loading and instantiating it excessively
-    if (_rebalancer == null) {
-      if (_context == null || _context.getRebalancerRef() == null) {
-        return null;
-      }
-      _rebalancer = _context.getRebalancerRef().getRebalancer();
-    }
-    return _rebalancer;
-  }
-
-  /**
-   * Get the instantiated RebalancerContext
-   * @param contextClass specific class of the RebalancerContext
-   * @return RebalancerContext subclass instance, or null if conversion is not possible
-   */
-  public <T extends RebalancerContext> T getRebalancerContext(Class<T> contextClass) {
-    try {
-      return contextClass.cast(_context);
-    } catch (ClassCastException e) {
-      LOG.info(contextClass + " is incompatible with context class: " + _context.getClass());
-    }
-    return null;
-  }
-
-  /**
-   * Get the rebalancer context serialized as a string
-   * @return string representing the context
-   */
-  public String getSerializedContext() {
-    return _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
-  }
-
-  /**
-   * Convert this to a namespaced config
-   * @return NamespacedConfig
-   */
-  public NamespacedConfig toNamespacedConfig() {
-    return _config;
-  }
-
-  /**
-   * Get a RebalancerConfig from a physical resource config
-   * @param resourceConfiguration physical resource config
-   * @return RebalancerConfig
-   */
-  public static RebalancerConfig from(ResourceConfiguration resourceConfiguration) {
-    return new RebalancerConfig(resourceConfiguration);
-  }
-
-  /**
-   * Get a RebalancerConfig from a RebalancerContext
-   * @param context instantiated RebalancerContext
-   * @return RebalancerConfig
-   */
-  public static RebalancerConfig from(RebalancerContext context) {
-    return new RebalancerConfig(context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerContext.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerContext.java
deleted file mode 100644
index 80f6b06..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerContext.java
+++ /dev/null
@@ -1,93 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.id.StateModelFactoryId;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Defines the state available to a rebalancer. The most common use case is to use a
- * {@link PartitionedRebalancerContext} or a subclass and set up a resource with it. A rebalancer
- * configuration, at a minimum, is aware of subunits of a resource, the state model to follow, and
- * how the configuration should be serialized.
- */
-public interface RebalancerContext {
-  /**
-   * Get a map of resource partition identifiers to partitions. A partition is a subunit of a
-   * resource, e.g. a subtask of a task
-   * @return map of (subunit id, subunit) pairs
-   */
-  public Map<? extends PartitionId, ? extends Partition> getSubUnitMap();
-
-  /**
-   * Get the subunits of the resource (e.g. partitions)
-   * @return set of subunit ids
-   */
-  public Set<? extends PartitionId> getSubUnitIdSet();
-
-  /**
-   * Get a specific subunit
-   * @param subUnitId the id of the subunit
-   * @return SubUnit
-   */
-  public Partition getSubUnit(PartitionId subUnitId);
-
-  /**
-   * Get the resource to rebalance
-   * @return resource id
-   */
-  public ResourceId getResourceId();
-
-  /**
-   * Get the state model definition that the resource follows
-   * @return state model definition id
-   */
-  public StateModelDefId getStateModelDefId();
-
-  /**
-   * Get the state model factory of this resource
-   * @return state model factory id
-   */
-  public StateModelFactoryId getStateModelFactoryId();
-
-  /**
-   * Get the tag, if any, that participants must have in order to serve this resource
-   * @return participant group tag, or null
-   */
-  public String getParticipantGroupTag();
-
-  /**
-   * Get the serializer for this context
-   * @return ContextSerializer class object
-   */
-  public Class<? extends ContextSerializer> getSerializerClass();
-
-  /**
-   * Get a reference to the class used to rebalance this resource
-   * @return RebalancerRef
-   */
-  public RebalancerRef getRebalancerRef();
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerRef.java
deleted file mode 100644
index 83ae589..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/RebalancerRef.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.util.HelixUtil;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
- * Reference to a class that extends {@link Rebalancer}. It loads the class automatically.
- */
-public class RebalancerRef {
-  private static final Logger LOG = Logger.getLogger(RebalancerRef.class);
-
-  @JsonProperty("rebalancerClassName")
-  private final String _rebalancerClassName;
-
-  @JsonCreator
-  private RebalancerRef(@JsonProperty("rebalancerClassName") String rebalancerClassName) {
-    _rebalancerClassName = rebalancerClassName;
-  }
-
-  /**
-   * Get an instantiated Rebalancer
-   * @return Rebalancer or null if instantiation failed
-   */
-  @JsonIgnore
-  public Rebalancer getRebalancer() {
-    try {
-      return (Rebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
-    } catch (Exception e) {
-      LOG.warn("Exception while invoking custom rebalancer class:" + _rebalancerClassName, e);
-    }
-    return null;
-  }
-
-  @Override
-  public String toString() {
-    return _rebalancerClassName;
-  }
-
-  @Override
-  public boolean equals(Object that) {
-    if (that instanceof RebalancerRef) {
-      return this.toString().equals(((RebalancerRef) that).toString());
-    } else if (that instanceof String) {
-      return this.toString().equals(that);
-    }
-    return false;
-  }
-
-  /**
-   * Get a rebalancer class reference
-   * @param rebalancerClassName name of the class
-   * @return RebalancerRef or null if name is null
-   */
-  public static RebalancerRef from(String rebalancerClassName) {
-    if (rebalancerClassName == null) {
-      return null;
-    }
-    return new RebalancerRef(rebalancerClassName);
-  }
-
-  /**
-   * Get a RebalancerRef from a class object
-   * @param rebalancerClass class that implements Rebalancer
-   * @return RebalancerRef
-   */
-  public static RebalancerRef from(Class<? extends Rebalancer> rebalancerClass) {
-    if (rebalancerClass == null) {
-      return null;
-    }
-    return RebalancerRef.from(rebalancerClass.getName());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/ReplicatedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/ReplicatedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/ReplicatedRebalancerContext.java
deleted file mode 100644
index 4e98ce7..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/ReplicatedRebalancerContext.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Methods specifying a rebalancer context that allows replicas. For instance, a rebalancer context
- * with partitions may accept state model definitions that support multiple replicas per partition,
- * and it's possible that the policy is that each live participant in the system should have a
- * replica.
- */
-public interface ReplicatedRebalancerContext extends RebalancerContext {
-  /**
-   * Check if this resource should be assigned to any live participant
-   * @return true if any live participant expected, false otherwise
-   */
-  public boolean anyLiveParticipant();
-
-  /**
-   * Get the number of replicas that each resource subunit should have
-   * @return replica count
-   */
-  public int getReplicaCount();
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancer.java
deleted file mode 100644
index 80ab256..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/SemiAutoRebalancer.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package org.apache.helix.api.rebalancer;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Rebalancer for the SEMI_AUTO mode. It expects a RebalancerConfig that understands the preferred
- * locations of each partition replica
- */
-public class SemiAutoRebalancer implements Rebalancer {
-  private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
-
-  @Override
-  public void init(HelixManager helixManager) {
-    // do nothing
-  }
-
-  @Override
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      Cluster cluster, ResourceCurrentState currentState) {
-    SemiAutoRebalancerContext config =
-        rebalancerConfig.getRebalancerContext(SemiAutoRebalancerContext.class);
-    StateModelDefinition stateModelDef =
-        cluster.getStateModelMap().get(config.getStateModelDefId());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + config.getResourceId());
-    }
-    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
-    for (PartitionId partition : config.getPartitionSet()) {
-      Map<ParticipantId, State> currentStateMap =
-          currentState.getCurrentStateMap(config.getResourceId(), partition);
-      Set<ParticipantId> disabledInstancesForPartition =
-          ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
-              partition);
-      List<ParticipantId> preferenceList =
-          ConstraintBasedAssignment.getPreferenceList(cluster, partition,
-              config.getPreferenceList(partition));
-      Map<State, String> upperBounds =
-          ConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
-              cluster.getConfig());
-      Map<ParticipantId, State> bestStateForPartition =
-          ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
-              .getLiveParticipantMap().keySet(), stateModelDef, preferenceList, currentStateMap,
-              disabledInstancesForPartition);
-      partitionMapping.addReplicaMap(partition, bestStateForPartition);
-    }
-    return partitionMapping;
-  }
-
-}


[2/4] [HELIX-209] Moving rebalancer code around, take 2

Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
new file mode 100644
index 0000000..9d24e68
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
@@ -0,0 +1,373 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixConstants.StateModelToken;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerContext for a resource whose subunits are partitions. In addition, these partitions can
+ * be replicated.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class PartitionedRebalancerContext extends BasicRebalancerContext implements
+    ReplicatedRebalancerContext {
+  private Map<PartitionId, Partition> _partitionMap;
+  private boolean _anyLiveParticipant;
+  private int _replicaCount;
+  private int _maxPartitionsPerParticipant;
+  private final RebalanceMode _rebalanceMode;
+
+  /**
+   * Instantiate a DataRebalancerContext
+   */
+  public PartitionedRebalancerContext(RebalanceMode rebalanceMode) {
+    _partitionMap = Collections.emptyMap();
+    _replicaCount = 1;
+    _anyLiveParticipant = false;
+    _maxPartitionsPerParticipant = Integer.MAX_VALUE;
+    _rebalanceMode = rebalanceMode;
+  }
+
+  /**
+   * Get a map from partition id to partition
+   * @return partition map (mutable)
+   */
+  public Map<PartitionId, Partition> getPartitionMap() {
+    return _partitionMap;
+  }
+
+  /**
+   * Set a map of partition id to partition
+   * @param partitionMap partition map
+   */
+  public void setPartitionMap(Map<PartitionId, Partition> partitionMap) {
+    _partitionMap = Maps.newHashMap(partitionMap);
+  }
+
+  /**
+   * Get the set of partitions for this resource
+   * @return set of partition ids
+   */
+  @JsonIgnore
+  public Set<PartitionId> getPartitionSet() {
+    return _partitionMap.keySet();
+  }
+
+  /**
+   * Get a partition
+   * @param partitionId id of the partition to get
+   * @return Partition object, or null if not present
+   */
+  @JsonIgnore
+  public Partition getPartition(PartitionId partitionId) {
+    return _partitionMap.get(partitionId);
+  }
+
+  @Override
+  public boolean anyLiveParticipant() {
+    return _anyLiveParticipant;
+  }
+
+  /**
+   * Indicate if this resource should be assigned to any live participant
+   * @param anyLiveParticipant true if any live participant expected, false otherwise
+   */
+  public void setAnyLiveParticipant(boolean anyLiveParticipant) {
+    _anyLiveParticipant = anyLiveParticipant;
+  }
+
+  @Override
+  public int getReplicaCount() {
+    return _replicaCount;
+  }
+
+  /**
+   * Set the number of replicas that each partition should have
+   * @param replicaCount
+   */
+  public void setReplicaCount(int replicaCount) {
+    _replicaCount = replicaCount;
+  }
+
+  /**
+   * Get the maximum number of partitions that a participant can serve
+   * @return maximum number of partitions per participant
+   */
+  public int getMaxPartitionsPerParticipant() {
+    return _maxPartitionsPerParticipant;
+  }
+
+  /**
+   * Set the maximum number of partitions that a participant can serve
+   * @param maxPartitionsPerParticipant maximum number of partitions per participant
+   */
+  public void setMaxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
+    _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
+  }
+
+  /**
+   * Get the rebalancer mode of the resource
+   * @return RebalanceMode
+   */
+  public RebalanceMode getRebalanceMode() {
+    return _rebalanceMode;
+  }
+
+  @Override
+  @JsonIgnore
+  public Map<PartitionId, Partition> getSubUnitMap() {
+    return getPartitionMap();
+  }
+
+  /**
+   * Generate a default configuration given the state model and a participant.
+   * @param stateModelDef the state model definition to follow
+   * @param participantSet the set of participant ids to configure for
+   */
+  @JsonIgnore
+  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+      Set<ParticipantId> participantSet) {
+    // the base context does not understand enough to know do to anything
+  }
+
+  /**
+   * Convert a physically-stored IdealState into a rebalancer context for a partitioned resource
+   * @param idealState populated IdealState
+   * @return PartitionedRebalancerContext
+   */
+  public static PartitionedRebalancerContext from(IdealState idealState) {
+    PartitionedRebalancerContext context;
+    switch (idealState.getRebalanceMode()) {
+    case FULL_AUTO:
+      FullAutoRebalancerContext.Builder fullAutoBuilder =
+          new FullAutoRebalancerContext.Builder(idealState.getResourceId());
+      populateContext(fullAutoBuilder, idealState);
+      context = fullAutoBuilder.build();
+      break;
+    case SEMI_AUTO:
+      SemiAutoRebalancerContext.Builder semiAutoBuilder =
+          new SemiAutoRebalancerContext.Builder(idealState.getResourceId());
+      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+        semiAutoBuilder.preferenceList(partitionId, idealState.getPreferenceList(partitionId));
+      }
+      populateContext(semiAutoBuilder, idealState);
+      context = semiAutoBuilder.build();
+      break;
+    case CUSTOMIZED:
+      CustomRebalancerContext.Builder customBuilder =
+          new CustomRebalancerContext.Builder(idealState.getResourceId());
+      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+        customBuilder.preferenceMap(partitionId, idealState.getParticipantStateMap(partitionId));
+      }
+      populateContext(customBuilder, idealState);
+      context = customBuilder.build();
+      break;
+    default:
+      Builder baseBuilder = new Builder(idealState.getResourceId());
+      populateContext(baseBuilder, idealState);
+      context = baseBuilder.build();
+      break;
+    }
+    return context;
+  }
+
+  /**
+   * Update a builder subclass with all the fields of the ideal state
+   * @param builder builder that extends AbstractBuilder
+   * @param idealState populated IdealState
+   */
+  private static <T extends AbstractBuilder<T>> void populateContext(T builder,
+      IdealState idealState) {
+    String replicas = idealState.getReplicas();
+    int replicaCount = 0;
+    boolean anyLiveParticipant = false;
+    if (replicas.equals(StateModelToken.ANY_LIVEINSTANCE.toString())) {
+      anyLiveParticipant = true;
+    } else {
+      replicaCount = Integer.parseInt(replicas);
+    }
+    if (idealState.getNumPartitions() > 0 && idealState.getPartitionIdSet().size() == 0) {
+      // backwards compatibility: partition sets were based on pref lists/maps previously
+      builder.addPartitions(idealState.getNumPartitions());
+    } else {
+      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+        builder.addPartition(new Partition(partitionId));
+      }
+    }
+    builder.anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
+        .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
+        .participantGroupTag(idealState.getInstanceGroupTag())
+        .stateModelDefId(idealState.getStateModelDefId())
+        .stateModelFactoryId(idealState.getStateModelFactoryId());
+    RebalancerRef rebalancerRef = idealState.getRebalancerRef();
+    if (rebalancerRef != null) {
+      builder.rebalancerRef(rebalancerRef);
+    }
+  }
+
+  /**
+   * Builder for a basic data rebalancer context
+   */
+  public static final class Builder extends AbstractBuilder<Builder> {
+    /**
+     * Instantiate with a resource
+     * @param resourceId resource id
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    @Override
+    public PartitionedRebalancerContext build() {
+      PartitionedRebalancerContext context =
+          new PartitionedRebalancerContext(RebalanceMode.USER_DEFINED);
+      super.update(context);
+      return context;
+    }
+  }
+
+  /**
+   * Abstract builder for a generic partitioned resource rebalancer context
+   */
+  public static abstract class AbstractBuilder<T extends BasicRebalancerContext.AbstractBuilder<T>>
+      extends BasicRebalancerContext.AbstractBuilder<T> {
+    private final ResourceId _resourceId;
+    private final Map<PartitionId, Partition> _partitionMap;
+    private boolean _anyLiveParticipant;
+    private int _replicaCount;
+    private int _maxPartitionsPerParticipant;
+
+    /**
+     * Instantiate with a resource
+     * @param resourceId resource id
+     */
+    public AbstractBuilder(ResourceId resourceId) {
+      super(resourceId);
+      _resourceId = resourceId;
+      _partitionMap = Maps.newHashMap();
+      _anyLiveParticipant = false;
+      _replicaCount = 1;
+      _maxPartitionsPerParticipant = Integer.MAX_VALUE;
+    }
+
+    /**
+     * Add a partition that the resource serves
+     * @param partition fully-qualified partition
+     * @return Builder
+     */
+    public T addPartition(Partition partition) {
+      _partitionMap.put(partition.getId(), partition);
+      return self();
+    }
+
+    /**
+     * Add a collection of partitions
+     * @param partitions any collection of Partition objects
+     * @return Builder
+     */
+    public T addPartitions(Collection<Partition> partitions) {
+      for (Partition partition : partitions) {
+        addPartition(partition);
+      }
+      return self();
+    }
+
+    /**
+     * Add a specified number of partitions with a default naming scheme, namely
+     * resourceId_partitionNumber where partitionNumber starts at 0
+     * @param partitionCount number of partitions to add
+     * @return Builder
+     */
+    public T addPartitions(int partitionCount) {
+      for (int i = 0; i < partitionCount; i++) {
+        addPartition(new Partition(PartitionId.from(_resourceId, Integer.toString(i))));
+      }
+      return self();
+    }
+
+    /**
+     * Set whether any live participant should be used in rebalancing
+     * @param anyLiveParticipant true if any live participant can be used, false otherwise
+     * @return Builder
+     */
+    public T anyLiveParticipant(boolean anyLiveParticipant) {
+      _anyLiveParticipant = anyLiveParticipant;
+      return self();
+    }
+
+    /**
+     * Set the number of replicas
+     * @param replicaCount number of replicas
+     * @return Builder
+     */
+    public T replicaCount(int replicaCount) {
+      _replicaCount = replicaCount;
+      return self();
+    }
+
+    /**
+     * Set the maximum number of partitions to assign to any participant
+     * @param maxPartitionsPerParticipant the maximum
+     * @return Builder
+     */
+    public T maxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
+      _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
+      return self();
+    }
+
+    /**
+     * Update a DataRebalancerContext with fields from this builder level
+     * @param context DataRebalancerContext
+     */
+    protected final void update(PartitionedRebalancerContext context) {
+      super.update(context);
+      // enforce at least one partition
+      if (_partitionMap.isEmpty()) {
+        addPartitions(1);
+      }
+      context.setPartitionMap(_partitionMap);
+      context.setAnyLiveParticipant(_anyLiveParticipant);
+      context.setMaxPartitionsPerParticipant(_maxPartitionsPerParticipant);
+      context.setReplicaCount(_replicaCount);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
new file mode 100644
index 0000000..846fd01
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
@@ -0,0 +1,178 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.NamespacedConfig;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.model.ResourceConfiguration;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Configuration for a resource rebalancer. This contains a RebalancerContext, which contains
+ * information specific to each rebalancer.
+ */
+public final class RebalancerConfig {
+  private enum Fields {
+    SERIALIZER_CLASS,
+    REBALANCER_CONTEXT,
+    REBALANCER_CONTEXT_CLASS
+  }
+
+  private static final Logger LOG = Logger.getLogger(RebalancerConfig.class);
+  private ContextSerializer _serializer;
+  private Rebalancer _rebalancer;
+  private final RebalancerContext _context;
+  private final NamespacedConfig _config;
+
+  /**
+   * Instantiate a RebalancerConfig
+   * @param context rebalancer context
+   * @param rebalancerRef reference to the rebalancer class that will be used
+   */
+  public RebalancerConfig(RebalancerContext context) {
+    _config =
+        new NamespacedConfig(Scope.resource(context.getResourceId()),
+            RebalancerConfig.class.getSimpleName());
+    _config.setSimpleField(Fields.SERIALIZER_CLASS.toString(), context.getSerializerClass()
+        .getName());
+    _config
+        .setSimpleField(Fields.REBALANCER_CONTEXT_CLASS.toString(), context.getClass().getName());
+    _context = context;
+    try {
+      _serializer = context.getSerializerClass().newInstance();
+      _config.setSimpleField(Fields.REBALANCER_CONTEXT.toString(), _serializer.serialize(context));
+    } catch (InstantiationException e) {
+      LOG.error("Error initializing the configuration", e);
+    } catch (IllegalAccessException e) {
+      LOG.error("Error initializing the configuration", e);
+    }
+  }
+
+  /**
+   * Instantiate from a physical ResourceConfiguration
+   * @param resourceConfiguration populated ResourceConfiguration
+   */
+  public RebalancerConfig(ResourceConfiguration resourceConfiguration) {
+    _config = new NamespacedConfig(resourceConfiguration, RebalancerConfig.class.getSimpleName());
+    _serializer = getSerializer();
+    _context = getContext();
+  }
+
+  /**
+   * Get the class that can serialize and deserialize the rebalancer context
+   * @return ContextSerializer
+   */
+  private ContextSerializer getSerializer() {
+    String serializerClassName = _config.getSimpleField(Fields.SERIALIZER_CLASS.toString());
+    if (serializerClassName != null) {
+      try {
+        return (ContextSerializer) HelixUtil.loadClass(getClass(), serializerClassName)
+            .newInstance();
+      } catch (InstantiationException e) {
+        LOG.error("Error getting the serializer", e);
+      } catch (IllegalAccessException e) {
+        LOG.error("Error getting the serializer", e);
+      } catch (ClassNotFoundException e) {
+        LOG.error("Error getting the serializer", e);
+      }
+    }
+    return null;
+  }
+
+  private RebalancerContext getContext() {
+    String className = _config.getSimpleField(Fields.REBALANCER_CONTEXT_CLASS.toString());
+    try {
+      Class<? extends RebalancerContext> contextClass =
+          HelixUtil.loadClass(getClass(), className).asSubclass(RebalancerContext.class);
+      String serialized = _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
+      return _serializer.deserialize(contextClass, serialized);
+    } catch (ClassNotFoundException e) {
+      LOG.error(className + " is not a valid class");
+    } catch (ClassCastException e) {
+      LOG.error(className + " does not implement RebalancerContext");
+    }
+    return null;
+  }
+
+  /**
+   * Get a rebalancer class instance
+   * @return Rebalancer
+   */
+  public Rebalancer getRebalancer() {
+    // cache the rebalancer to avoid loading and instantiating it excessively
+    if (_rebalancer == null) {
+      if (_context == null || _context.getRebalancerRef() == null) {
+        return null;
+      }
+      _rebalancer = _context.getRebalancerRef().getRebalancer();
+    }
+    return _rebalancer;
+  }
+
+  /**
+   * Get the instantiated RebalancerContext
+   * @param contextClass specific class of the RebalancerContext
+   * @return RebalancerContext subclass instance, or null if conversion is not possible
+   */
+  public <T extends RebalancerContext> T getRebalancerContext(Class<T> contextClass) {
+    try {
+      return contextClass.cast(_context);
+    } catch (ClassCastException e) {
+      LOG.info(contextClass + " is incompatible with context class: " + _context.getClass());
+    }
+    return null;
+  }
+
+  /**
+   * Get the rebalancer context serialized as a string
+   * @return string representing the context
+   */
+  public String getSerializedContext() {
+    return _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
+  }
+
+  /**
+   * Convert this to a namespaced config
+   * @return NamespacedConfig
+   */
+  public NamespacedConfig toNamespacedConfig() {
+    return _config;
+  }
+
+  /**
+   * Get a RebalancerConfig from a physical resource config
+   * @param resourceConfiguration physical resource config
+   * @return RebalancerConfig
+   */
+  public static RebalancerConfig from(ResourceConfiguration resourceConfiguration) {
+    return new RebalancerConfig(resourceConfiguration);
+  }
+
+  /**
+   * Get a RebalancerConfig from a RebalancerContext
+   * @param context instantiated RebalancerContext
+   * @return RebalancerConfig
+   */
+  public static RebalancerConfig from(RebalancerContext context) {
+    return new RebalancerConfig(context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
new file mode 100644
index 0000000..981891b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
@@ -0,0 +1,94 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Defines the state available to a rebalancer. The most common use case is to use a
+ * {@link PartitionedRebalancerContext} or a subclass and set up a resource with it. A rebalancer
+ * configuration, at a minimum, is aware of subunits of a resource, the state model to follow, and
+ * how the configuration should be serialized.
+ */
+public interface RebalancerContext {
+  /**
+   * Get a map of resource partition identifiers to partitions. A partition is a subunit of a
+   * resource, e.g. a subtask of a task
+   * @return map of (subunit id, subunit) pairs
+   */
+  public Map<? extends PartitionId, ? extends Partition> getSubUnitMap();
+
+  /**
+   * Get the subunits of the resource (e.g. partitions)
+   * @return set of subunit ids
+   */
+  public Set<? extends PartitionId> getSubUnitIdSet();
+
+  /**
+   * Get a specific subunit
+   * @param subUnitId the id of the subunit
+   * @return SubUnit
+   */
+  public Partition getSubUnit(PartitionId subUnitId);
+
+  /**
+   * Get the resource to rebalance
+   * @return resource id
+   */
+  public ResourceId getResourceId();
+
+  /**
+   * Get the state model definition that the resource follows
+   * @return state model definition id
+   */
+  public StateModelDefId getStateModelDefId();
+
+  /**
+   * Get the state model factory of this resource
+   * @return state model factory id
+   */
+  public StateModelFactoryId getStateModelFactoryId();
+
+  /**
+   * Get the tag, if any, that participants must have in order to serve this resource
+   * @return participant group tag, or null
+   */
+  public String getParticipantGroupTag();
+
+  /**
+   * Get the serializer for this context
+   * @return ContextSerializer class object
+   */
+  public Class<? extends ContextSerializer> getSerializerClass();
+
+  /**
+   * Get a reference to the class used to rebalance this resource
+   * @return RebalancerRef
+   */
+  public RebalancerRef getRebalancerRef();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
new file mode 100644
index 0000000..525931d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
@@ -0,0 +1,40 @@
+package org.apache.helix.controller.rebalancer.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Methods specifying a rebalancer context that allows replicas. For instance, a rebalancer context
+ * with partitions may accept state model definitions that support multiple replicas per partition,
+ * and it's possible that the policy is that each live participant in the system should have a
+ * replica.
+ */
+public interface ReplicatedRebalancerContext extends RebalancerContext {
+  /**
+   * Check if this resource should be assigned to any live participant
+   * @return true if any live participant expected, false otherwise
+   */
+  public boolean anyLiveParticipant();
+
+  /**
+   * Get the number of replicas that each resource subunit should have
+   * @return replica count
+   */
+  public int getReplicaCount();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
new file mode 100644
index 0000000..f574a62
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
@@ -0,0 +1,178 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerContext for SEMI_AUTO rebalancer mode. It indicates the preferred locations of each
+ * partition replica. By default, it corresponds to {@link SemiAutoRebalancer}
+ */
+public final class SemiAutoRebalancerContext extends PartitionedRebalancerContext {
+  @JsonProperty("preferenceLists")
+  private Map<PartitionId, List<ParticipantId>> _preferenceLists;
+
+  /**
+   * Instantiate a SemiAutoRebalancerContext
+   */
+  public SemiAutoRebalancerContext() {
+    super(RebalanceMode.SEMI_AUTO);
+    setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
+    _preferenceLists = Maps.newHashMap();
+  }
+
+  /**
+   * Get the preference lists of all partitions of the resource
+   * @return map of partition id to list of participant ids
+   */
+  public Map<PartitionId, List<ParticipantId>> getPreferenceLists() {
+    return _preferenceLists;
+  }
+
+  /**
+   * Set the preference lists of all partitions of the resource
+   * @param preferenceLists
+   */
+  public void setPreferenceLists(Map<PartitionId, List<ParticipantId>> preferenceLists) {
+    _preferenceLists = preferenceLists;
+  }
+
+  /**
+   * Get the preference list of a partition
+   * @param partitionId the partition to look up
+   * @return list of participant ids
+   */
+  @JsonIgnore
+  public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
+    return _preferenceLists.get(partitionId);
+  }
+
+  /**
+   * Generate preference lists based on a default cluster setup
+   * @param stateModelDef the state model definition to follow
+   * @param participantSet the set of participant ids to configure for
+   */
+  @Override
+  @JsonIgnore
+  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+      Set<ParticipantId> participantSet) {
+    // compute default upper bounds
+    Map<State, String> upperBounds = Maps.newHashMap();
+    for (State state : stateModelDef.getTypedStatesPriorityList()) {
+      upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
+    }
+
+    // determine the current mapping
+    Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
+    for (PartitionId partitionId : getPartitionSet()) {
+      List<ParticipantId> preferenceList = getPreferenceList(partitionId);
+      if (preferenceList != null && !preferenceList.isEmpty()) {
+        Set<ParticipantId> disabledParticipants = Collections.emptySet();
+        Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
+        Map<ParticipantId, State> initialMap =
+            ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
+                participantSet, stateModelDef, preferenceList, emptyCurrentState,
+                disabledParticipants);
+        currentMapping.put(partitionId, initialMap);
+      }
+    }
+
+    // determine the preference
+    LinkedHashMap<State, Integer> stateCounts =
+        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
+            getReplicaCount());
+    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+    List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
+    List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
+    AutoRebalanceStrategy strategy =
+        new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
+            getMaxPartitionsPerParticipant(), placementScheme);
+    Map<String, List<String>> rawPreferenceLists =
+        strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
+            .getListFields();
+    Map<PartitionId, List<ParticipantId>> preferenceLists =
+        Maps.newHashMap(IdealState.preferenceListsFromStringLists(rawPreferenceLists));
+    setPreferenceLists(preferenceLists);
+  }
+
+  /**
+   * Build a SemiAutoRebalancerContext. By default, it corresponds to {@link SemiAutoRebalancer}
+   */
+  public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
+    private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
+
+    /**
+     * Instantiate for a resource
+     * @param resourceId resource id
+     */
+    public Builder(ResourceId resourceId) {
+      super(resourceId);
+      super.rebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
+      _preferenceLists = Maps.newHashMap();
+    }
+
+    /**
+     * Add a preference list for a partition
+     * @param partitionId partition to set
+     * @param preferenceList ordered list of participants who can serve the partition
+     * @return Builder
+     */
+    public Builder preferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
+      _preferenceLists.put(partitionId, preferenceList);
+      return self();
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    @Override
+    public SemiAutoRebalancerContext build() {
+      SemiAutoRebalancerContext context = new SemiAutoRebalancerContext();
+      super.update(context);
+      context.setPreferenceLists(_preferenceLists);
+      return context;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index ce52a19..a0ee1c1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -30,89 +30,136 @@ import java.util.Set;
 
 import org.apache.helix.HelixConstants.StateModelToken;
 import org.apache.helix.HelixDefinedState;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Partition;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 /**
- * Collection of functions that will compute the best possible states given the live instances and
- * an ideal state.<br/>
- * <br/>
- * Deprecated. Use {@link org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment} instead.
+ * Collection of functions that will compute the best possible state based on the participants and
+ * the rebalancer configuration of a resource.
  */
-@Deprecated
 public class ConstraintBasedAssignment {
   private static Logger logger = Logger.getLogger(ConstraintBasedAssignment.class);
 
-  public static List<String> getPreferenceList(ClusterDataCache cache, Partition resource,
-      IdealState idealState, StateModelDefinition stateModelDef) {
-    List<String> listField = idealState.getPreferenceList(resource.getPartitionName());
+  /**
+   * Get a set of disabled participants for a partition
+   * @param participantMap map of all participants
+   * @param partitionId the partition to check
+   * @return a set of all participants that are disabled for the partition
+   */
+  public static Set<ParticipantId> getDisabledParticipants(
+      final Map<ParticipantId, Participant> participantMap, final PartitionId partitionId) {
+    Set<ParticipantId> participantSet = new HashSet<ParticipantId>(participantMap.keySet());
+    Set<ParticipantId> disabledParticipantsForPartition =
+        Sets.filter(participantSet, new Predicate<ParticipantId>() {
+          @Override
+          public boolean apply(ParticipantId participantId) {
+            Participant participant = participantMap.get(participantId);
+            return !participant.isEnabled()
+                || participant.getDisabledPartitionIds().contains(partitionId);
+          }
+        });
+    return disabledParticipantsForPartition;
+  }
 
-    if (listField != null && listField.size() == 1
-        && StateModelToken.ANY_LIVEINSTANCE.toString().equals(listField.get(0))) {
-      Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-      List<String> prefList = new ArrayList<String>(liveInstances.keySet());
+  /**
+   * Get an ordered list of participants that can serve a partition
+   * @param cluster cluster snapshot
+   * @param partitionId the partition to look up
+   * @param config rebalancing constraints
+   * @return list with most preferred participants first
+   */
+  public static List<ParticipantId> getPreferenceList(Cluster cluster, PartitionId partitionId,
+      List<ParticipantId> prefList) {
+    if (prefList != null && prefList.size() == 1
+        && StateModelToken.ANY_LIVEINSTANCE.toString().equals(prefList.get(0).stringify())) {
+      prefList = new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
       Collections.sort(prefList);
-      return prefList;
-    } else {
-      return listField;
     }
+    return prefList;
   }
 
   /**
-   * compute best state for resource in AUTO ideal state mode
-   * @param cache
+   * Get a map of state to upper bound constraint given a cluster
+   * @param stateModelDef the state model definition to check
+   * @param resourceId the resource that is constraint
+   * @param cluster the cluster the resource belongs to
+   * @return map of state to upper bound
+   */
+  public static Map<State, String> stateConstraints(StateModelDefinition stateModelDef,
+      ResourceId resourceId, ClusterConfig cluster) {
+    Map<State, String> stateMap = Maps.newHashMap();
+    for (State state : stateModelDef.getTypedStatesPriorityList()) {
+      String num =
+          cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
+              stateModelDef.getStateModelDefId(), state);
+      stateMap.put(state, num);
+    }
+    return stateMap;
+  }
+
+  /**
+   * compute best state for resource in SEMI_AUTO and FULL_AUTO modes
+   * @param upperBounds map of state to upper bound
+   * @param liveParticipantSet set of live participant ids
    * @param stateModelDef
-   * @param instancePreferenceList
+   * @param participantPreferenceList
    * @param currentStateMap
-   *          : instance->state for each partition
-   * @param disabledInstancesForPartition
+   *          : participant->state for each partition
+   * @param disabledParticipantsForPartition
    * @return
    */
-  public static Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache,
-      StateModelDefinition stateModelDef, List<String> instancePreferenceList,
-      Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition) {
-    Map<String, String> instanceStateMap = new HashMap<String, String>();
+  public static Map<ParticipantId, State> computeAutoBestStateForPartition(
+      Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
+      StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
+      Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
+    Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
 
-    // if the ideal state is deleted, instancePreferenceList will be empty and
+    // if the resource is deleted, instancePreferenceList will be empty and
     // we should drop all resources.
     if (currentStateMap != null) {
-      for (String instance : currentStateMap.keySet()) {
-        if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
-            && !disabledInstancesForPartition.contains(instance)) {
+      for (ParticipantId participantId : currentStateMap.keySet()) {
+        if ((participantPreferenceList == null || !participantPreferenceList
+            .contains(participantId)) && !disabledParticipantsForPartition.contains(participantId)) {
           // if dropped and not disabled, transit to DROPPED
-          instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
-        } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
-            HelixDefinedState.ERROR.toString()))
-            && disabledInstancesForPartition.contains(instance)) {
+          participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
+        } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
+            participantId).equals(State.from(HelixDefinedState.ERROR)))
+            && disabledParticipantsForPartition.contains(participantId)) {
           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
-          instanceStateMap.put(instance, stateModelDef.getInitialState());
+          participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
         }
       }
     }
 
-    // ideal state is deleted
-    if (instancePreferenceList == null) {
-      return instanceStateMap;
+    // resource is deleted
+    if (participantPreferenceList == null) {
+      return participantStateMap;
     }
 
-    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
-    boolean assigned[] = new boolean[instancePreferenceList.size()];
-
-    Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
+    List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
+    boolean assigned[] = new boolean[participantPreferenceList.size()];
 
-    for (String state : statesPriorityList) {
-      String num = stateModelDef.getNumInstancesPerState(state);
+    for (State state : statesPriorityList) {
+      String num = upperBounds.get(state);
       int stateCount = -1;
       if ("N".equals(num)) {
-        Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet());
-        liveAndEnabled.removeAll(disabledInstancesForPartition);
+        Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantSet);
+        liveAndEnabled.removeAll(disabledParticipantsForPartition);
         stateCount = liveAndEnabled.size();
       } else if ("R".equals(num)) {
-        stateCount = instancePreferenceList.size();
+        stateCount = participantPreferenceList.size();
       } else {
         try {
           stateCount = Integer.parseInt(num);
@@ -122,16 +169,18 @@ public class ConstraintBasedAssignment {
       }
       if (stateCount > -1) {
         int count = 0;
-        for (int i = 0; i < instancePreferenceList.size(); i++) {
-          String instanceName = instancePreferenceList.get(i);
+        for (int i = 0; i < participantPreferenceList.size(); i++) {
+          ParticipantId participantId = participantPreferenceList.get(i);
 
           boolean notInErrorState =
-              currentStateMap == null || currentStateMap.get(instanceName) == null
-                  || !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString());
-
-          if (liveInstancesMap.containsKey(instanceName) && !assigned[i] && notInErrorState
-              && !disabledInstancesForPartition.contains(instanceName)) {
-            instanceStateMap.put(instanceName, state);
+              currentStateMap == null
+                  || currentStateMap.get(participantId) == null
+                  || !currentStateMap.get(participantId)
+                      .equals(State.from(HelixDefinedState.ERROR));
+
+          if (liveParticipantSet.contains(participantId) && !assigned[i] && notInErrorState
+              && !disabledParticipantsForPartition.contains(participantId)) {
+            participantStateMap.put(participantId, state);
             count = count + 1;
             assigned[i] = true;
             if (count == stateCount) {
@@ -141,24 +190,25 @@ public class ConstraintBasedAssignment {
         }
       }
     }
-    return instanceStateMap;
+    return participantStateMap;
   }
 
   /**
    * Get the number of replicas that should be in each state for a partition
+   * @param upperBounds map of state to upper bound
    * @param stateModelDef StateModelDefinition object
    * @param liveNodesNb number of live nodes
    * @param total number of replicas
    * @return state count map: state->count
    */
-  public static LinkedHashMap<String, Integer> stateCount(StateModelDefinition stateModelDef,
-      int liveNodesNb, int totalReplicas) {
-    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
-    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+  public static LinkedHashMap<State, Integer> stateCount(Map<State, String> upperBounds,
+      StateModelDefinition stateModelDef, int liveNodesNb, int totalReplicas) {
+    LinkedHashMap<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
+    List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
 
     int replicas = totalReplicas;
-    for (String state : statesPriorityList) {
-      String num = stateModelDef.getNumInstancesPerState(state);
+    for (State state : statesPriorityList) {
+      String num = upperBounds.get(state);
       if ("N".equals(num)) {
         stateCountMap.put(state, liveNodesNb);
       } else if ("R".equals(num)) {
@@ -181,8 +231,8 @@ public class ConstraintBasedAssignment {
     }
 
     // get state count for R
-    for (String state : statesPriorityList) {
-      String num = stateModelDef.getNumInstancesPerState(state);
+    for (State state : statesPriorityList) {
+      String num = upperBounds.get(state);
       if ("R".equals(num)) {
         stateCountMap.put(state, replicas);
         // should have at most one state using R

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 51301f0..aae33b4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -30,12 +30,12 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.Rebalancer;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
-import org.apache.helix.api.rebalancer.RebalancerContext;
-import org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 5ecbddf..1d287fa 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -42,10 +42,10 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
-import org.apache.helix.api.rebalancer.RebalancerContext;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
index 5d9746b..d6fe8c3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
@@ -37,9 +37,9 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SessionId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.api.rebalancer.RebalancerContext;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 15004a6..5c24314 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -36,11 +36,11 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
-import org.apache.helix.api.rebalancer.RebalancerContext;
-import org.apache.helix.api.rebalancer.ReplicatedRebalancerContext;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.ReplicatedRebalancerContext;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
index 949cfca..a9ab34e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
@@ -24,7 +24,7 @@ import java.util.Map;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.deprecated.controller.rebalancer.Rebalancer;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.Resource;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index dc56b89..1fdd892 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -30,11 +30,11 @@ import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.api.rebalancer.PartitionedRebalancerContext;
-import org.apache.helix.api.rebalancer.RebalancerConfig;
-import org.apache.helix.api.rebalancer.RebalancerContext;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
 import org.apache.helix.model.CurrentState;
 import org.apache.log4j.Logger;
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/AutoRebalancer.java
new file mode 100644
index 0000000..afe132e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/AutoRebalancer.java
@@ -0,0 +1,187 @@
+package org.apache.helix.deprecated.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.deprecated.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * This is a Rebalancer specific to full automatic mode. It is tasked with computing the ideal
+ * state of a resource, fully adapting to the addition or removal of instances. This includes
+ * computation of a new preference list and a partition to instance and state mapping based on the
+ * computed instance preferences.
+ * The input is the current assignment of partitions to instances, as well as existing instance
+ * preferences, if any.
+ * The output is a preference list and a mapping based on that preference list, i.e. partition p
+ * has a replica on node k with state s.
+ */
+@Deprecated
+public class AutoRebalancer implements Rebalancer {
+  // These should be final, but are initialized in init rather than a constructor
+  private HelixManager _manager;
+  private AutoRebalanceStrategy _algorithm;
+
+  private static final Logger LOG = Logger.getLogger(AutoRebalancer.class);
+
+  @Override
+  public void init(HelixManager manager) {
+    this._manager = manager;
+    this._algorithm = null;
+  }
+
+  @Override
+  public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
+      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+    // Compute a preference list based on the current ideal state
+    List<String> partitions = new ArrayList<String>(currentIdealState.getPartitionSet());
+    String stateModelName = currentIdealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
+    Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances();
+    String replicas = currentIdealState.getReplicas();
+
+    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
+    stateCountMap =
+        ConstraintBasedAssignment.stateCount(stateModelDef, liveInstance.size(),
+            Integer.parseInt(replicas));
+    List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
+    Map<String, Map<String, String>> currentMapping =
+        currentMapping(currentStateOutput, resource.getResourceName(), partitions, stateCountMap);
+
+    // If there are nodes tagged with resource name, use only those nodes
+    Set<String> taggedNodes = new HashSet<String>();
+    if (currentIdealState.getInstanceGroupTag() != null) {
+      for (String instanceName : liveNodes) {
+        if (clusterData.getInstanceConfigMap().get(instanceName)
+            .containsTag(currentIdealState.getInstanceGroupTag())) {
+          taggedNodes.add(instanceName);
+        }
+      }
+    }
+    if (taggedNodes.size() > 0) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("found the following instances with tag " + currentIdealState.getResourceName()
+            + " " + taggedNodes);
+      }
+      liveNodes = new ArrayList<String>(taggedNodes);
+    }
+
+    List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
+    int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("currentMapping: " + currentMapping);
+      LOG.info("stateCountMap: " + stateCountMap);
+      LOG.info("liveNodes: " + liveNodes);
+      LOG.info("allNodes: " + allNodes);
+      LOG.info("maxPartition: " + maxPartition);
+    }
+    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+    placementScheme.init(_manager);
+    _algorithm =
+        new AutoRebalanceStrategy(resource.getResourceName(), partitions, stateCountMap,
+            maxPartition, placementScheme);
+    ZNRecord newMapping =
+        _algorithm.computePartitionAssignment(liveNodes, currentMapping, allNodes);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("newMapping: " + newMapping);
+    }
+
+    IdealState newIdealState = new IdealState(resource.getResourceName());
+    newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
+    newIdealState.setRebalanceMode(RebalanceMode.FULL_AUTO);
+    newIdealState.getRecord().setListFields(newMapping.getListFields());
+
+    // compute a full partition mapping for the resource
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing resource:" + resource.getResourceName());
+    }
+    ResourceAssignment partitionMapping =
+        new ResourceAssignment(ResourceId.from(resource.getResourceName()));
+    for (String partitionName : partitions) {
+      Partition partition = new Partition(partitionName);
+      Map<String, String> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
+      Set<String> disabledInstancesForPartition =
+          clusterData.getDisabledInstancesForPartition(partition.toString());
+      List<String> preferenceList =
+          ConstraintBasedAssignment.getPreferenceList(clusterData, partition, newIdealState,
+              stateModelDef);
+      Map<String, String> bestStateForPartition =
+          ConstraintBasedAssignment.computeAutoBestStateForPartition(clusterData, stateModelDef,
+              preferenceList, currentStateMap, disabledInstancesForPartition);
+      partitionMapping.addReplicaMap(PartitionId.from(partitionName),
+          ResourceAssignment.replicaMapFromStringMap(bestStateForPartition));
+    }
+    return partitionMapping;
+  }
+
+  private Map<String, Map<String, String>> currentMapping(CurrentStateOutput currentStateOutput,
+      String resourceName, List<String> partitions, Map<String, Integer> stateCountMap) {
+
+    Map<String, Map<String, String>> map = new HashMap<String, Map<String, String>>();
+
+    for (String partition : partitions) {
+      Map<String, String> curStateMap =
+          currentStateOutput.getCurrentStateMap(resourceName, new Partition(partition));
+      map.put(partition, new HashMap<String, String>());
+      for (String node : curStateMap.keySet()) {
+        String state = curStateMap.get(node);
+        if (stateCountMap.containsKey(state)) {
+          map.get(partition).put(node, state);
+        }
+      }
+
+      Map<String, String> pendingStateMap =
+          currentStateOutput.getPendingStateMap(resourceName, new Partition(partition));
+      for (String node : pendingStateMap.keySet()) {
+        String state = pendingStateMap.get(node);
+        if (stateCountMap.containsKey(state)) {
+          map.get(partition).put(node, state);
+        }
+      }
+    }
+    return map;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/CustomRebalancer.java
new file mode 100644
index 0000000..e1136a2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/CustomRebalancer.java
@@ -0,0 +1,135 @@
+package org.apache.helix.deprecated.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * This is a Rebalancer specific to custom mode. It is tasked with checking an existing mapping of
+ * partitions against the set of live instances to mark assignment states as dropped or erroneous
+ * as necessary.
+ * The input is the required current assignment of partitions to instances, as well as the required
+ * existing instance preferences.
+ * The output is a verified mapping based on that preference list, i.e. partition p has a replica
+ * on node k with state s, where s may be a dropped or error state if necessary.
+ */
+@Deprecated
+public class CustomRebalancer implements Rebalancer {
+
+  private static final Logger LOG = Logger.getLogger(CustomRebalancer.class);
+
+  @Override
+  public void init(HelixManager manager) {
+  }
+
+  @Override
+  public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
+      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+    String stateModelDefName = currentIdealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelDefName);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing resource:" + resource.getResourceName());
+    }
+    ResourceAssignment partitionMapping =
+        new ResourceAssignment(ResourceId.from(resource.getResourceName()));
+    for (Partition partition : resource.getPartitions()) {
+      Map<String, String> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
+      Set<String> disabledInstancesForPartition =
+          clusterData.getDisabledInstancesForPartition(partition.toString());
+      Map<String, String> idealStateMap =
+          IdealState.stringMapFromParticipantStateMap(currentIdealState
+              .getParticipantStateMap(PartitionId.from(partition.getPartitionName())));
+      Map<String, String> bestStateForPartition =
+          computeCustomizedBestStateForPartition(clusterData, stateModelDef, idealStateMap,
+              currentStateMap, disabledInstancesForPartition);
+      partitionMapping.addReplicaMap(PartitionId.from(partition.getPartitionName()),
+          ResourceAssignment.replicaMapFromStringMap(bestStateForPartition));
+    }
+    return partitionMapping;
+  }
+
+  /**
+   * compute best state for resource in CUSTOMIZED ideal state mode
+   * @param cache
+   * @param stateModelDef
+   * @param idealStateMap
+   * @param currentStateMap
+   * @param disabledInstancesForPartition
+   * @return
+   */
+  private Map<String, String> computeCustomizedBestStateForPartition(ClusterDataCache cache,
+      StateModelDefinition stateModelDef, Map<String, String> idealStateMap,
+      Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition) {
+    Map<String, String> instanceStateMap = new HashMap<String, String>();
+
+    // if the ideal state is deleted, idealStateMap will be null/empty and
+    // we should drop all resources.
+    if (currentStateMap != null) {
+      for (String instance : currentStateMap.keySet()) {
+        if ((idealStateMap == null || !idealStateMap.containsKey(instance))
+            && !disabledInstancesForPartition.contains(instance)) {
+          // if dropped and not disabled, transit to DROPPED
+          instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
+        } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
+            HelixDefinedState.ERROR.toString()))
+            && disabledInstancesForPartition.contains(instance)) {
+          // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
+          instanceStateMap.put(instance, stateModelDef.getInitialState());
+        }
+      }
+    }
+
+    // ideal state is deleted
+    if (idealStateMap == null) {
+      return instanceStateMap;
+    }
+
+    Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
+    for (String instance : idealStateMap.keySet()) {
+      boolean notInErrorState =
+          currentStateMap == null || currentStateMap.get(instance) == null
+              || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString());
+
+      if (liveInstancesMap.containsKey(instance) && notInErrorState
+          && !disabledInstancesForPartition.contains(instance)) {
+        instanceStateMap.put(instance, idealStateMap.get(instance));
+      }
+    }
+
+    return instanceStateMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/Rebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/Rebalancer.java b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/Rebalancer.java
new file mode 100644
index 0000000..59b827d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/Rebalancer.java
@@ -0,0 +1,58 @@
+package org.apache.helix.deprecated.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+
+/**
+ * Allows one to come up with custom implementation of a rebalancer.<br/>
+ * This will be invoked on all changes that happen in the cluster.<br/>
+ * Simply return the newIdealState for a resource in this method.<br/>
+ * <br/>
+ * Deprecated. Use {@link org.apache.helix.controller.rebalancer.Rebalancer} instead.
+ */
+@Deprecated
+public interface Rebalancer {
+  /**
+   * Initialize the rebalancer with a HelixManager if necessary
+   * @param manager
+   */
+  void init(HelixManager manager);
+
+  /**
+   * Given an ideal state for a resource and liveness of instances, compute a assignment of
+   * instances and states to each partition of a resource. This method provides all the relevant
+   * information needed to rebalance a resource. If you need additional information use
+   * manager.getAccessor to read the cluster data. This allows one to compute the newIdealState
+   * according to app specific requirements.
+   * @param resourceName the resource for which a mapping will be computed
+   * @param currentIdealState the IdealState that corresponds to this resource
+   * @param currentStateOutput the current states of all partitions
+   * @param clusterData cache of the cluster state
+   */
+  ResourceAssignment computeResourceMapping(final Resource resource,
+      final IdealState currentIdealState, final CurrentStateOutput currentStateOutput,
+      final ClusterDataCache clusterData);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/SemiAutoRebalancer.java
new file mode 100644
index 0000000..b4259f9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/SemiAutoRebalancer.java
@@ -0,0 +1,83 @@
+package org.apache.helix.deprecated.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.deprecated.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * This is a Rebalancer specific to semi-automatic mode. It is tasked with computing the ideal
+ * state of a resource based on a predefined preference list of instances willing to accept
+ * replicas.
+ * The input is the optional current assignment of partitions to instances, as well as the required
+ * existing instance preferences.
+ * The output is a mapping based on that preference list, i.e. partition p has a replica on node k
+ * with state s.
+ */
+@Deprecated
+public class SemiAutoRebalancer implements Rebalancer {
+
+  private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
+
+  @Override
+  public void init(HelixManager manager) {
+  }
+
+  @Override
+  public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
+      CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+    String stateModelDefName = currentIdealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelDefName);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing resource:" + resource.getResourceName());
+    }
+    ResourceAssignment partitionMapping =
+        new ResourceAssignment(ResourceId.from(resource.getResourceName()));
+    for (Partition partition : resource.getPartitions()) {
+      Map<String, String> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
+      Set<String> disabledInstancesForPartition =
+          clusterData.getDisabledInstancesForPartition(partition.toString());
+      List<String> preferenceList =
+          ConstraintBasedAssignment.getPreferenceList(clusterData, partition, currentIdealState,
+              stateModelDef);
+      Map<String, String> bestStateForPartition =
+          ConstraintBasedAssignment.computeAutoBestStateForPartition(clusterData, stateModelDef,
+              preferenceList, currentStateMap, disabledInstancesForPartition);
+      partitionMapping.addReplicaMap(PartitionId.from(partition.getPartitionName()),
+          ResourceAssignment.replicaMapFromStringMap(bestStateForPartition));
+    }
+    return partitionMapping;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/util/ConstraintBasedAssignment.java
new file mode 100644
index 0000000..c75423f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/deprecated/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -0,0 +1,194 @@
+package org.apache.helix.deprecated.controller.rebalancer.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixConstants.StateModelToken;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * Collection of functions that will compute the best possible states given the live instances and
+ * an ideal state.<br/>
+ * <br/>
+ * Deprecated. Use {@link org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment} instead.
+ */
+@Deprecated
+public class ConstraintBasedAssignment {
+  private static Logger logger = Logger.getLogger(ConstraintBasedAssignment.class);
+
+  public static List<String> getPreferenceList(ClusterDataCache cache, Partition resource,
+      IdealState idealState, StateModelDefinition stateModelDef) {
+    List<String> listField = idealState.getPreferenceList(resource.getPartitionName());
+
+    if (listField != null && listField.size() == 1
+        && StateModelToken.ANY_LIVEINSTANCE.toString().equals(listField.get(0))) {
+      Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+      List<String> prefList = new ArrayList<String>(liveInstances.keySet());
+      Collections.sort(prefList);
+      return prefList;
+    } else {
+      return listField;
+    }
+  }
+
+  /**
+   * compute best state for resource in AUTO ideal state mode
+   * @param cache
+   * @param stateModelDef
+   * @param instancePreferenceList
+   * @param currentStateMap
+   *          : instance->state for each partition
+   * @param disabledInstancesForPartition
+   * @return
+   */
+  public static Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache,
+      StateModelDefinition stateModelDef, List<String> instancePreferenceList,
+      Map<String, String> currentStateMap, Set<String> disabledInstancesForPartition) {
+    Map<String, String> instanceStateMap = new HashMap<String, String>();
+
+    // if the ideal state is deleted, instancePreferenceList will be empty and
+    // we should drop all resources.
+    if (currentStateMap != null) {
+      for (String instance : currentStateMap.keySet()) {
+        if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
+            && !disabledInstancesForPartition.contains(instance)) {
+          // if dropped and not disabled, transit to DROPPED
+          instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
+        } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
+            HelixDefinedState.ERROR.toString()))
+            && disabledInstancesForPartition.contains(instance)) {
+          // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
+          instanceStateMap.put(instance, stateModelDef.getInitialState());
+        }
+      }
+    }
+
+    // ideal state is deleted
+    if (instancePreferenceList == null) {
+      return instanceStateMap;
+    }
+
+    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+    boolean assigned[] = new boolean[instancePreferenceList.size()];
+
+    Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
+
+    for (String state : statesPriorityList) {
+      String num = stateModelDef.getNumInstancesPerState(state);
+      int stateCount = -1;
+      if ("N".equals(num)) {
+        Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet());
+        liveAndEnabled.removeAll(disabledInstancesForPartition);
+        stateCount = liveAndEnabled.size();
+      } else if ("R".equals(num)) {
+        stateCount = instancePreferenceList.size();
+      } else {
+        try {
+          stateCount = Integer.parseInt(num);
+        } catch (Exception e) {
+          logger.error("Invalid count for state:" + state + " ,count=" + num);
+        }
+      }
+      if (stateCount > -1) {
+        int count = 0;
+        for (int i = 0; i < instancePreferenceList.size(); i++) {
+          String instanceName = instancePreferenceList.get(i);
+
+          boolean notInErrorState =
+              currentStateMap == null || currentStateMap.get(instanceName) == null
+                  || !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString());
+
+          if (liveInstancesMap.containsKey(instanceName) && !assigned[i] && notInErrorState
+              && !disabledInstancesForPartition.contains(instanceName)) {
+            instanceStateMap.put(instanceName, state);
+            count = count + 1;
+            assigned[i] = true;
+            if (count == stateCount) {
+              break;
+            }
+          }
+        }
+      }
+    }
+    return instanceStateMap;
+  }
+
+  /**
+   * Get the number of replicas that should be in each state for a partition
+   * @param stateModelDef StateModelDefinition object
+   * @param liveNodesNb number of live nodes
+   * @param total number of replicas
+   * @return state count map: state->count
+   */
+  public static LinkedHashMap<String, Integer> stateCount(StateModelDefinition stateModelDef,
+      int liveNodesNb, int totalReplicas) {
+    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
+    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+
+    int replicas = totalReplicas;
+    for (String state : statesPriorityList) {
+      String num = stateModelDef.getNumInstancesPerState(state);
+      if ("N".equals(num)) {
+        stateCountMap.put(state, liveNodesNb);
+      } else if ("R".equals(num)) {
+        // wait until we get the counts for all other states
+        continue;
+      } else {
+        int stateCount = -1;
+        try {
+          stateCount = Integer.parseInt(num);
+        } catch (Exception e) {
+          // LOG.error("Invalid count for state: " + state + ", count: " + num +
+          // ", use -1 instead");
+        }
+
+        if (stateCount > 0) {
+          stateCountMap.put(state, stateCount);
+          replicas -= stateCount;
+        }
+      }
+    }
+
+    // get state count for R
+    for (String state : statesPriorityList) {
+      String num = stateModelDef.getNumInstancesPerState(state);
+      if ("R".equals(num)) {
+        stateCountMap.put(state, replicas);
+        // should have at most one state using R
+        break;
+      }
+    }
+    return stateCountMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 805f6bf..df4043a 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -38,7 +38,7 @@ import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.api.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Function;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e032132a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
index f48ebbc..85330be 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
@@ -45,10 +45,10 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.rebalancer.CustomRebalancerContext;
-import org.apache.helix.api.rebalancer.PartitionedRebalancerContext;
-import org.apache.helix.api.rebalancer.RebalancerContext;
-import org.apache.helix.api.rebalancer.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;