You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/31 01:06:42 UTC

[2/2] git commit: [HELIX-374] Rebalancer config should be fully user-specified

[HELIX-374] Rebalancer config should be fully user-specified


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ee8ef6a7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ee8ef6a7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ee8ef6a7

Branch: refs/heads/master
Commit: ee8ef6a702de2b1490e6c09bff914adf6dbb7603
Parents: 08371b5
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Jul 30 14:37:02 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Jul 30 16:05:15 2014 -0700

----------------------------------------------------------------------
 .../helix/api/accessor/ClusterAccessor.java     |  78 +--
 .../config/BasicRebalancerConfig.java           | 256 ---------
 .../config/CustomRebalancerConfig.java          | 169 ------
 .../config/FullAutoRebalancerConfig.java        |  69 ---
 .../config/PartitionedRebalancerConfig.java     | 523 -------------------
 .../rebalancer/config/RebalancerConfig.java     |  57 +-
 .../config/RebalancerConfigHolder.java          |  15 -
 .../config/ReplicatedRebalancerConfig.java      |  40 --
 .../config/SemiAutoRebalancerConfig.java        | 183 -------
 .../stages/BestPossibleStateCalcStage.java      |  70 +--
 .../stages/ExternalViewComputeStage.java        |   8 +-
 .../stages/MessageGenerationStage.java          |  17 +-
 .../stages/MessageSelectionStage.java           |  21 +-
 .../stages/ResourceComputationStage.java        |  51 +-
 .../java/org/apache/helix/model/IdealState.java |  83 +--
 .../helix/model/builder/IdealStateBuilder.java  |  10 +-
 .../monitoring/mbeans/ClusterStatusMonitor.java |   4 +-
 .../org/apache/helix/api/TestNewStages.java     | 253 ---------
 .../context/TestSerializeRebalancerContext.java | 105 ----
 .../helix/controller/stages/BaseStageTest.java  |   5 +-
 .../stages/TestResourceComputationStage.java    |  14 +-
 .../stages/TestStagesWithLogicalAccessors.java  | 255 +++++++++
 .../TestCustomizedIdealStateRebalancer.java     |   7 +-
 .../helix/integration/TestHelixConnection.java  |  20 +-
 .../integration/TestLocalContainerProvider.java |  20 +-
 .../TestUserDefRebalancerCompatibility.java     |   3 +-
 .../mbeans/TestClusterStatusMonitor.java        |  20 +-
 .../helix/examples/LogicalModelExample.java     |  66 +--
 .../tools/UpdateProvisionerConfig.java          |   1 +
 .../provisioning/yarn/AppMasterLauncher.java    |  22 +-
 30 files changed, 432 insertions(+), 2013 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 6b92275..ddf809a 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -53,8 +53,6 @@ import org.apache.helix.controller.provisioner.ContainerId;
 import org.apache.helix.controller.provisioner.ContainerSpec;
 import org.apache.helix.controller.provisioner.ContainerState;
 import org.apache.helix.controller.provisioner.ProvisionerConfig;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
 import org.apache.helix.controller.stages.ClusterDataCache;
@@ -64,7 +62,6 @@ import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
@@ -292,34 +289,8 @@ public class ClusterAccessor {
       resourceConfigMap = _accessor.getChildValuesMap(_keyBuilder.resourceConfigs());
     }
 
-    // check if external view and resource assignment reads are required
-    boolean extraReadsRequired = false;
-    for (String resourceName : idealStateMap.keySet()) {
-      if (extraReadsRequired) {
-        break;
-      }
-      // a rebalancer can be user defined if it has that mode set, or has a different rebalancer
-      // class
-      IdealState idealState = idealStateMap.get(resourceName);
-      extraReadsRequired =
-          extraReadsRequired || (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED);
-      RebalancerRef ref = idealState.getRebalancerRef();
-      if (ref != null) {
-        extraReadsRequired =
-            extraReadsRequired
-                || !PartitionedRebalancerConfig.isBuiltinRebalancer(ref.getRebalancerClass());
-      }
-    }
-    for (String resourceName : resourceConfigMap.keySet()) {
-      if (extraReadsRequired) {
-        break;
-      }
-      extraReadsRequired =
-          extraReadsRequired || resourceConfigMap.get(resourceName).hasRebalancerConfig();
-    }
-
     // now read external view and resource assignments if needed
-    if (!useCache || extraReadsRequired) {
+    if (!useCache) {
       externalViewMap = _accessor.getChildValuesMap(_keyBuilder.externalViews());
       resourceAssignmentMap = _accessor.getChildValuesMap(_keyBuilder.resourceAssignments());
       _cache.setAssignmentWritePolicy(true);
@@ -439,8 +410,8 @@ public class ClusterAccessor {
    * @return true if resource added, false if there was an error
    */
   public boolean addResource(ResourceConfig resource) {
-    if (resource == null || resource.getRebalancerConfig() == null) {
-      LOG.error("Resource not fully defined with a rebalancer config");
+    if (resource == null || resource.getIdealState() == null) {
+      LOG.error("Resource not fully defined with an ideal state");
       return false;
     }
 
@@ -448,8 +419,8 @@ public class ClusterAccessor {
       LOG.error("Cluster: " + _clusterId + " structure is not valid");
       return false;
     }
-    RebalancerConfig config = resource.getRebalancerConfig();
-    StateModelDefId stateModelDefId = config.getStateModelDefId();
+    IdealState idealState = resource.getIdealState();
+    StateModelDefId stateModelDefId = idealState.getStateModelDefId();
     if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
       LOG.error("State model: " + stateModelDefId + " not found in cluster: " + _clusterId);
       return false;
@@ -467,13 +438,8 @@ public class ClusterAccessor {
       return false;
     }
 
-    // Create an IdealState from a RebalancerConfig (if the resource supports it)
-    IdealState idealState =
-        PartitionedRebalancerConfig.rebalancerConfigToIdealState(resource.getRebalancerConfig(), 0,
-            false);
-    if (idealState != null) {
-      _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
-    }
+    // Persist the ideal state
+    _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
 
     // Add resource user config
     boolean persistConfig = false;
@@ -482,11 +448,10 @@ public class ClusterAccessor {
       configuration.addNamespacedConfig(resource.getUserConfig());
       persistConfig = true;
     }
-    PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
-    if (idealState == null
-        && (partitionedConfig == null || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED)) {
+    RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+    if (rebalancerConfig != null) {
       // only persist if this is not easily convertible to an ideal state
-      configuration.addNamespacedConfig(new RebalancerConfigHolder(resource.getRebalancerConfig())
+      configuration.addNamespacedConfig(new RebalancerConfigHolder(rebalancerConfig)
           .toNamespacedConfig());
       persistConfig = true;
     }
@@ -796,29 +761,10 @@ public class ClusterAccessor {
     RebalancerConfig rebalancerConfig = null;
     if (resourceConfiguration != null) {
       userConfig = resourceConfiguration.getUserConfig();
-    } else {
-      userConfig = new UserConfig(Scope.resource(resourceId));
-    }
-    if (idealState != null) {
-      if (resourceConfiguration != null
-          && idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
-        // prefer rebalancer config for user_defined data rebalancing
-        rebalancerConfig =
-            resourceConfiguration.getRebalancerConfig(PartitionedRebalancerConfig.class);
-      }
-      if (rebalancerConfig == null) {
-        // prefer ideal state for non-user_defined data rebalancing
-        rebalancerConfig = PartitionedRebalancerConfig.from(idealState);
-      }
-      idealState.updateUserConfig(userConfig);
-    } else if (resourceConfiguration != null) {
       rebalancerConfig = resourceConfiguration.getRebalancerConfig(RebalancerConfig.class);
-    }
-    if (rebalancerConfig == null) {
-      rebalancerConfig = new PartitionedRebalancerConfig();
-    }
-    if (resourceConfiguration != null) {
       provisionerConfig = resourceConfiguration.getProvisionerConfig(ProvisionerConfig.class);
+    } else {
+      userConfig = new UserConfig(Scope.resource(resourceId));
     }
     ResourceConfig resourceConfig =
         new ResourceConfig.Builder(resourceId).idealState(idealState)

http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java
deleted file mode 100644
index d6aa10e..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java
+++ /dev/null
@@ -1,256 +0,0 @@
-package org.apache.helix.controller.rebalancer.config;
-
-import java.util.Set;
-
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.serializer.DefaultStringSerializer;
-import org.apache.helix.controller.serializer.StringSerializer;
-import org.codehaus.jackson.annotate.JsonIgnore;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Abstract RebalancerConfig that functions for generic subunits. Use a subclass that more
- * concretely defines the subunits.
- */
-public abstract class BasicRebalancerConfig implements RebalancerConfig {
-  private ResourceId _resourceId;
-  private StateModelDefId _stateModelDefId;
-  private StateModelFactoryId _stateModelFactoryId;
-  private String _participantGroupTag;
-  private Class<? extends StringSerializer> _serializer;
-  private RebalancerRef _rebalancerRef;
-
-  /**
-   * Instantiate a basic rebalancer config
-   */
-  public BasicRebalancerConfig() {
-    _serializer = DefaultStringSerializer.class;
-  }
-
-  @Override
-  public ResourceId getResourceId() {
-    return _resourceId;
-  }
-
-  /**
-   * Set the resource to rebalance
-   * @param resourceId resource id
-   */
-  public void setResourceId(ResourceId resourceId) {
-    _resourceId = resourceId;
-  }
-
-  @Override
-  public StateModelDefId getStateModelDefId() {
-    return _stateModelDefId;
-  }
-
-  /**
-   * Set the state model definition that the resource follows
-   * @param stateModelDefId state model definition id
-   */
-  public void setStateModelDefId(StateModelDefId stateModelDefId) {
-    _stateModelDefId = stateModelDefId;
-  }
-
-  @Override
-  public StateModelFactoryId getStateModelFactoryId() {
-    return _stateModelFactoryId;
-  }
-
-  /**
-   * Set the state model factory that the resource uses
-   * @param stateModelFactoryId state model factory id
-   */
-  public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
-    _stateModelFactoryId = stateModelFactoryId;
-  }
-
-  @Override
-  public String getParticipantGroupTag() {
-    return _participantGroupTag;
-  }
-
-  /**
-   * Set a tag that participants must have in order to serve this resource
-   * @param participantGroupTag string group tag
-   */
-  public void setParticipantGroupTag(String participantGroupTag) {
-    _participantGroupTag = participantGroupTag;
-  }
-
-  /**
-   * Get the serializer. If none is provided, {@link DefaultStringSerializer} is used
-   */
-  @Override
-  public Class<? extends StringSerializer> getSerializerClass() {
-    return _serializer;
-  }
-
-  /**
-   * Set the class that can serialize this config
-   * @param serializer serializer class that implements StringSerializer
-   */
-  public void setSerializerClass(Class<? extends StringSerializer> serializer) {
-    _serializer = serializer;
-  }
-
-  @Override
-  @JsonIgnore
-  public Set<? extends PartitionId> getSubUnitIdSet() {
-    return getSubUnitMap().keySet();
-  }
-
-  @Override
-  @JsonIgnore
-  public Partition getSubUnit(PartitionId subUnitId) {
-    return getSubUnitMap().get(subUnitId);
-  }
-
-  @Override
-  public RebalancerRef getRebalancerRef() {
-    return _rebalancerRef;
-  }
-
-  /**
-   * Set the reference to the class used to rebalance this resource
-   * @param rebalancerRef RebalancerRef instance
-   */
-  public void setRebalancerRef(RebalancerRef rebalancerRef) {
-    _rebalancerRef = rebalancerRef;
-  }
-
-  /**
-   * Safely cast a RebalancerConfig into a subtype
-   * @param config RebalancerConfig object
-   * @param clazz the target class
-   * @return An instance of clazz, or null if the conversion is not possible
-   */
-  public static <T extends RebalancerConfig> T convert(RebalancerConfig config, Class<T> clazz) {
-    try {
-      return clazz.cast(config);
-    } catch (ClassCastException e) {
-      return null;
-    }
-  }
-
-  /**
-   * Abstract builder for the base rebalancer config
-   */
-  public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
-    private final ResourceId _resourceId;
-    private StateModelDefId _stateModelDefId;
-    private StateModelFactoryId _stateModelFactoryId;
-    private String _participantGroupTag;
-    private Class<? extends StringSerializer> _serializerClass;
-    private RebalancerRef _rebalancerRef;
-
-    /**
-     * Instantiate with a resource id
-     * @param resourceId resource id
-     */
-    public AbstractBuilder(ResourceId resourceId) {
-      _resourceId = resourceId;
-      _serializerClass = DefaultStringSerializer.class;
-    }
-
-    /**
-     * Set the state model definition that the resource should follow
-     * @param stateModelDefId state model definition id
-     * @return Builder
-     */
-    public T stateModelDefId(StateModelDefId stateModelDefId) {
-      _stateModelDefId = stateModelDefId;
-      return self();
-    }
-
-    /**
-     * Set the state model factory that the resource should use
-     * @param stateModelFactoryId state model factory id
-     * @return Builder
-     */
-    public T stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
-      _stateModelFactoryId = stateModelFactoryId;
-      return self();
-    }
-
-    /**
-     * Set the tag that all participants require in order to serve this resource
-     * @param participantGroupTag the tag
-     * @return Builder
-     */
-    public T participantGroupTag(String participantGroupTag) {
-      _participantGroupTag = participantGroupTag;
-      return self();
-    }
-
-    /**
-     * Set the serializer class for this rebalancer config
-     * @param serializerClass class that implements StringSerializer
-     * @return Builder
-     */
-    public T serializerClass(Class<? extends StringSerializer> serializerClass) {
-      _serializerClass = serializerClass;
-      return self();
-    }
-
-    /**
-     * Specify a custom class to use for rebalancing
-     * @param rebalancerRef RebalancerRef instance
-     * @return Builder
-     */
-    public T rebalancerRef(RebalancerRef rebalancerRef) {
-      _rebalancerRef = rebalancerRef;
-      return self();
-    }
-
-    /**
-     * Update an existing context with base fields
-     * @param config derived config
-     */
-    protected final void update(BasicRebalancerConfig config) {
-      config.setResourceId(_resourceId);
-      config.setStateModelDefId(_stateModelDefId);
-      config.setStateModelFactoryId(_stateModelFactoryId);
-      config.setParticipantGroupTag(_participantGroupTag);
-      config.setSerializerClass(_serializerClass);
-      config.setRebalancerRef(_rebalancerRef);
-    }
-
-    /**
-     * Get a typed reference to "this" class. Final derived classes should simply return the this
-     * reference.
-     * @return this for the most specific type
-     */
-    protected abstract T self();
-
-    /**
-     * Get the rebalancer config from the built fields
-     * @return RebalancerConfig
-     */
-    public abstract RebalancerConfig build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
deleted file mode 100644
index a44b230..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
+++ /dev/null
@@ -1,169 +0,0 @@
-package org.apache.helix.controller.rebalancer.config;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.CustomRebalancer;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerConfig for a resource that should be rebalanced in CUSTOMIZED mode. By default, it
- * corresponds to {@link CustomRebalancer}
- */
-public class CustomRebalancerConfig extends PartitionedRebalancerConfig {
-  private Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
-
-  /**
-   * Instantiate a CustomRebalancerConfig
-   */
-  public CustomRebalancerConfig() {
-    if (getClass().equals(CustomRebalancerConfig.class)) {
-      // only mark this as customized mode if this specifc config is used
-      setRebalanceMode(RebalanceMode.CUSTOMIZED);
-    } else {
-      setRebalanceMode(RebalanceMode.USER_DEFINED);
-    }
-    setRebalancerRef(RebalancerRef.from(CustomRebalancer.class));
-    _preferenceMaps = Maps.newHashMap();
-  }
-
-  /**
-   * Get the preference maps of the partitions and replicas of the resource
-   * @return map of partition to participant and state
-   */
-  public Map<PartitionId, Map<ParticipantId, State>> getPreferenceMaps() {
-    return _preferenceMaps;
-  }
-
-  /**
-   * Set the preference maps of the partitions and replicas of the resource
-   * @param preferenceMaps map of partition to participant and state
-   */
-  public void setPreferenceMaps(Map<PartitionId, Map<ParticipantId, State>> preferenceMaps) {
-    _preferenceMaps = preferenceMaps;
-  }
-
-  /**
-   * Get the preference map of a partition
-   * @param partitionId the partition to look up
-   * @return map of participant to state
-   */
-  @JsonIgnore
-  public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
-    return _preferenceMaps.get(partitionId);
-  }
-
-  /**
-   * Generate preference maps based on a default cluster setup
-   * @param stateModelDef the state model definition to follow
-   * @param participantSet the set of participant ids to configure for
-   */
-  @Override
-  @JsonIgnore
-  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
-      Set<ParticipantId> participantSet) {
-    // compute default upper bounds
-    Map<State, String> upperBounds = Maps.newHashMap();
-    for (State state : stateModelDef.getTypedStatesPriorityList()) {
-      upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
-    }
-
-    // determine the current mapping
-    Map<PartitionId, Map<ParticipantId, State>> currentMapping = getPreferenceMaps();
-
-    // determine the preference maps
-    LinkedHashMap<State, Integer> stateCounts =
-        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
-            getReplicaCount());
-    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
-    List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
-    List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
-    AutoRebalanceStrategy strategy =
-        new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
-            getMaxPartitionsPerParticipant(), placementScheme);
-    Map<String, Map<String, String>> rawPreferenceMaps =
-        strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
-            .getMapFields();
-    Map<PartitionId, Map<ParticipantId, State>> preferenceMaps =
-        Maps.newHashMap(ResourceAssignment.replicaMapsFromStringMaps(rawPreferenceMaps));
-    setPreferenceMaps(preferenceMaps);
-  }
-
-  /**
-   * Build a CustomRebalancerConfig. By default, it corresponds to {@link CustomRebalancer}
-   */
-  public static final class Builder extends PartitionedRebalancerConfig.AbstractBuilder<Builder> {
-    private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
-
-    /**
-     * Instantiate for a resource
-     * @param resourceId resource id
-     */
-    public Builder(ResourceId resourceId) {
-      super(resourceId);
-      super.rebalancerRef(RebalancerRef.from(CustomRebalancer.class));
-      super.rebalanceMode(RebalanceMode.CUSTOMIZED);
-      _preferenceMaps = Maps.newHashMap();
-    }
-
-    /**
-     * Add a preference map for a partition
-     * @param partitionId partition to set
-     * @param preferenceList map of participant id to state indicating where replicas are served
-     * @return Builder
-     */
-    public Builder preferenceMap(PartitionId partitionId, Map<ParticipantId, State> preferenceMap) {
-      _preferenceMaps.put(partitionId, preferenceMap);
-      return self();
-    }
-
-    @Override
-    protected Builder self() {
-      return this;
-    }
-
-    @Override
-    public CustomRebalancerConfig build() {
-      CustomRebalancerConfig config = new CustomRebalancerConfig();
-      super.update(config);
-      config.setPreferenceMaps(_preferenceMaps);
-      return config;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
deleted file mode 100644
index 16bb4cb..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.apache.helix.controller.rebalancer.config;
-
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.model.IdealState.RebalanceMode;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerConfig for FULL_AUTO rebalancing mode. By default, it corresponds to
- * {@link FullAutoRebalancer}
- */
-public class FullAutoRebalancerConfig extends PartitionedRebalancerConfig {
-  public FullAutoRebalancerConfig() {
-    if (getClass().equals(FullAutoRebalancerConfig.class)) {
-      // only mark this as full auto mode if this specifc config is used
-      setRebalanceMode(RebalanceMode.FULL_AUTO);
-    } else {
-      setRebalanceMode(RebalanceMode.USER_DEFINED);
-    }
-    setRebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
-  }
-
-  /**
-   * Builder for a full auto rebalancer config. By default, it corresponds to
-   * {@link FullAutoRebalancer}
-   */
-  public static final class Builder extends PartitionedRebalancerConfig.AbstractBuilder<Builder> {
-    /**
-     * Instantiate with a resource
-     * @param resourceId resource id
-     */
-    public Builder(ResourceId resourceId) {
-      super(resourceId);
-      super.rebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
-      super.rebalanceMode(RebalanceMode.FULL_AUTO);
-    }
-
-    @Override
-    protected Builder self() {
-      return this;
-    }
-
-    @Override
-    public FullAutoRebalancerConfig build() {
-      FullAutoRebalancerConfig config = new FullAutoRebalancerConfig();
-      super.update(config);
-      return config;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
deleted file mode 100644
index 934a9c2..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
+++ /dev/null
@@ -1,523 +0,0 @@
-package org.apache.helix.controller.rebalancer.config;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixConstants.StateModelToken;
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.CustomRebalancer;
-import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
-import org.apache.helix.controller.rebalancer.HelixRebalancer;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.task.TaskRebalancer;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerConfig for a resource whose subunits are partitions. In addition, these partitions can
- * be replicated.
- */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class PartitionedRebalancerConfig extends BasicRebalancerConfig implements
-    ReplicatedRebalancerConfig {
-  private Map<PartitionId, Partition> _partitionMap;
-  private boolean _anyLiveParticipant;
-  private int _replicaCount;
-  private int _maxPartitionsPerParticipant;
-  private RebalanceMode _rebalanceMode;
-
-  @JsonIgnore
-  private static final Set<Class<? extends RebalancerConfig>> BUILTIN_CONFIG_CLASSES = Sets
-      .newHashSet();
-  @JsonIgnore
-  private static final Set<Class<? extends HelixRebalancer>> BUILTIN_REBALANCER_CLASSES = Sets
-      .newHashSet();
-  static {
-    BUILTIN_CONFIG_CLASSES.add(PartitionedRebalancerConfig.class);
-    BUILTIN_CONFIG_CLASSES.add(FullAutoRebalancerConfig.class);
-    BUILTIN_CONFIG_CLASSES.add(SemiAutoRebalancerConfig.class);
-    BUILTIN_CONFIG_CLASSES.add(CustomRebalancerConfig.class);
-    BUILTIN_REBALANCER_CLASSES.add(FullAutoRebalancer.class);
-    BUILTIN_REBALANCER_CLASSES.add(SemiAutoRebalancer.class);
-    BUILTIN_REBALANCER_CLASSES.add(CustomRebalancer.class);
-    BUILTIN_REBALANCER_CLASSES.add(TaskRebalancer.class);
-  }
-
-  /**
-   * Instantiate a PartitionedRebalancerConfig
-   */
-  public PartitionedRebalancerConfig() {
-    _partitionMap = Collections.emptyMap();
-    _replicaCount = 1;
-    _anyLiveParticipant = false;
-    _maxPartitionsPerParticipant = Integer.MAX_VALUE;
-    _rebalanceMode = RebalanceMode.USER_DEFINED;
-  }
-
-  /**
-   * Get a map from partition id to partition
-   * @return partition map (mutable)
-   */
-  public Map<PartitionId, Partition> getPartitionMap() {
-    return _partitionMap;
-  }
-
-  /**
-   * Set a map of partition id to partition
-   * @param partitionMap partition map
-   */
-  public void setPartitionMap(Map<PartitionId, Partition> partitionMap) {
-    _partitionMap = Maps.newHashMap(partitionMap);
-  }
-
-  /**
-   * Get the set of partitions for this resource
-   * @return set of partition ids
-   */
-  @JsonIgnore
-  public Set<PartitionId> getPartitionSet() {
-    return _partitionMap.keySet();
-  }
-
-  /**
-   * Get a partition
-   * @param partitionId id of the partition to get
-   * @return Partition object, or null if not present
-   */
-  @JsonIgnore
-  public Partition getPartition(PartitionId partitionId) {
-    return _partitionMap.get(partitionId);
-  }
-
-  @Override
-  public boolean anyLiveParticipant() {
-    return _anyLiveParticipant;
-  }
-
-  /**
-   * Indicate if this resource should be assigned to any live participant
-   * @param anyLiveParticipant true if any live participant expected, false otherwise
-   */
-  public void setAnyLiveParticipant(boolean anyLiveParticipant) {
-    _anyLiveParticipant = anyLiveParticipant;
-  }
-
-  @Override
-  public int getReplicaCount() {
-    return _replicaCount;
-  }
-
-  /**
-   * Set the number of replicas that each partition should have
-   * @param replicaCount
-   */
-  public void setReplicaCount(int replicaCount) {
-    _replicaCount = replicaCount;
-  }
-
-  /**
-   * Get the maximum number of partitions that a participant can serve
-   * @return maximum number of partitions per participant
-   */
-  public int getMaxPartitionsPerParticipant() {
-    return _maxPartitionsPerParticipant;
-  }
-
-  /**
-   * Set the maximum number of partitions that a participant can serve
-   * @param maxPartitionsPerParticipant maximum number of partitions per participant
-   */
-  public void setMaxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
-    _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
-  }
-
-  /**
-   * Set the rebalancer mode of the partitioned resource
-   * @param rebalanceMode {@link RebalanceMode} enum value
-   */
-  public void setRebalanceMode(RebalanceMode rebalanceMode) {
-    _rebalanceMode = rebalanceMode;
-  }
-
-  /**
-   * Get the rebalancer mode of the resource
-   * @return RebalanceMode
-   */
-  public RebalanceMode getRebalanceMode() {
-    return _rebalanceMode;
-  }
-
-  @Override
-  @JsonIgnore
-  public Map<PartitionId, Partition> getSubUnitMap() {
-    return getPartitionMap();
-  }
-
-  /**
-   * Generate a default configuration given the state model and a participant.
-   * @param stateModelDef the state model definition to follow
-   * @param participantSet the set of participant ids to configure for
-   */
-  @JsonIgnore
-  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
-      Set<ParticipantId> participantSet) {
-    // the base config does not understand enough to know do to anything
-  }
-
-  /**
-   * Safely get a {@link PartitionedRebalancerConfig} from a {@link RebalancerConfig}
-   * @param config the base config
-   * @return a {@link PartitionedRebalancerConfig}, or null if the conversion is not possible
-   */
-  public static PartitionedRebalancerConfig from(RebalancerConfig config) {
-    try {
-      return PartitionedRebalancerConfig.class.cast(config);
-    } catch (ClassCastException e) {
-      return null;
-    }
-  }
-
-  /**
-   * Check if the given class is compatible with an {@link IdealState}
-   * @param clazz the PartitionedRebalancerConfig subclass
-   * @return true if IdealState can be used to describe this config, false otherwise
-   */
-  public static boolean isBuiltinConfig(Class<? extends RebalancerConfig> clazz) {
-    return BUILTIN_CONFIG_CLASSES.contains(clazz);
-  }
-
-  /**
-   * Check if the given class is a built-in rebalancer class
-   * @param clazz the HelixRebalancer subclass
-   * @return true if the rebalancer class is built in, false otherwise
-   */
-  public static boolean isBuiltinRebalancer(Class<? extends HelixRebalancer> clazz) {
-    return BUILTIN_REBALANCER_CLASSES.contains(clazz);
-  }
-
-  /**
-   * Convert a physically-stored IdealState into a rebalancer config for a partitioned resource
-   * @param idealState populated IdealState
-   * @return PartitionedRebalancerConfig
-   */
-  public static PartitionedRebalancerConfig from(IdealState idealState) {
-    PartitionedRebalancerConfig config;
-    RebalanceMode mode = idealState.getRebalanceMode();
-    if (mode == RebalanceMode.USER_DEFINED) {
-      Class<? extends RebalancerConfig> configClass = idealState.getRebalancerConfigClass();
-      if (configClass.equals(FullAutoRebalancerConfig.class)) {
-        mode = RebalanceMode.FULL_AUTO;
-      } else if (configClass.equals(SemiAutoRebalancerConfig.class)) {
-        mode = RebalanceMode.SEMI_AUTO;
-      } else if (configClass.equals(CustomRebalancerConfig.class)) {
-        mode = RebalanceMode.CUSTOMIZED;
-      }
-    }
-    switch (mode) {
-    case FULL_AUTO:
-      FullAutoRebalancerConfig.Builder fullAutoBuilder =
-          new FullAutoRebalancerConfig.Builder(idealState.getResourceId());
-      populateConfig(fullAutoBuilder, idealState);
-      config = fullAutoBuilder.build();
-      break;
-    case SEMI_AUTO:
-      SemiAutoRebalancerConfig.Builder semiAutoBuilder =
-          new SemiAutoRebalancerConfig.Builder(idealState.getResourceId());
-      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
-        semiAutoBuilder.preferenceList(partitionId, idealState.getPreferenceList(partitionId));
-      }
-      populateConfig(semiAutoBuilder, idealState);
-      config = semiAutoBuilder.build();
-      break;
-    case CUSTOMIZED:
-      CustomRebalancerConfig.Builder customBuilder =
-          new CustomRebalancerConfig.Builder(idealState.getResourceId());
-      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
-        customBuilder.preferenceMap(partitionId, idealState.getParticipantStateMap(partitionId));
-      }
-      populateConfig(customBuilder, idealState);
-      config = customBuilder.build();
-      break;
-    default:
-      Builder baseBuilder = new Builder(idealState.getResourceId());
-      populateConfig(baseBuilder, idealState);
-      config = baseBuilder.build();
-      break;
-    }
-    return config;
-  }
-
-  /**
-   * Update a builder subclass with all the fields of the ideal state
-   * @param builder builder that extends AbstractBuilder
-   * @param idealState populated IdealState
-   */
-  private static <T extends AbstractBuilder<T>> void populateConfig(T builder, IdealState idealState) {
-    String replicas = idealState.getReplicas();
-    int replicaCount = 0;
-    boolean anyLiveParticipant = false;
-    if (replicas.equals(StateModelToken.ANY_LIVEINSTANCE.toString())) {
-      anyLiveParticipant = true;
-    } else {
-      replicaCount = Integer.parseInt(replicas);
-    }
-    if (idealState.getNumPartitions() > 0 && idealState.getPartitionIdSet().size() == 0) {
-      // backwards compatibility: partition sets were based on pref lists/maps previously
-      builder.addPartitions(idealState.getNumPartitions());
-    } else {
-      for (PartitionId partitionId : idealState.getPartitionIdSet()) {
-        builder.addPartition(new Partition(partitionId));
-      }
-    }
-    builder.anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
-        .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
-        .participantGroupTag(idealState.getInstanceGroupTag())
-        .stateModelDefId(idealState.getStateModelDefId())
-        .stateModelFactoryId(idealState.getStateModelFactoryId())
-        .rebalanceMode(idealState.getRebalanceMode());
-    RebalancerRef rebalancerRef = idealState.getRebalancerRef();
-    if (rebalancerRef != null) {
-      builder.rebalancerRef(rebalancerRef);
-    }
-  }
-
-  /**
-   * Get an ideal state from a rebalancer config if the resource is partitioned
-   * @param config RebalancerConfig instance
-   * @param bucketSize bucket size to use
-   * @param batchMessageMode true if batch messaging allowed, false otherwise
-   * @return IdealState, or null
-   */
-  public static IdealState rebalancerConfigToIdealState(RebalancerConfig config, int bucketSize,
-      boolean batchMessageMode) {
-    PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
-    if (partitionedConfig != null) {
-      if (!PartitionedRebalancerConfig.isBuiltinConfig(partitionedConfig.getClass())) {
-        // don't proceed if this resource cannot be described by an ideal state
-        return null;
-      }
-      IdealState idealState = new IdealState(partitionedConfig.getResourceId());
-      idealState.setRebalanceMode(partitionedConfig.getRebalanceMode());
-
-      RebalancerRef ref = partitionedConfig.getRebalancerRef();
-      if (ref != null) {
-        idealState.setRebalancerRef(partitionedConfig.getRebalancerRef());
-      }
-      String replicas = null;
-      if (partitionedConfig.anyLiveParticipant()) {
-        replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
-      } else {
-        replicas = Integer.toString(partitionedConfig.getReplicaCount());
-      }
-      idealState.setReplicas(replicas);
-      idealState.setNumPartitions(partitionedConfig.getPartitionSet().size());
-      idealState.setInstanceGroupTag(partitionedConfig.getParticipantGroupTag());
-      idealState.setMaxPartitionsPerInstance(partitionedConfig.getMaxPartitionsPerParticipant());
-      idealState.setStateModelDefId(partitionedConfig.getStateModelDefId());
-      idealState.setStateModelFactoryId(partitionedConfig.getStateModelFactoryId());
-      idealState.setBucketSize(bucketSize);
-      idealState.setBatchMessageMode(batchMessageMode);
-      idealState.setRebalancerConfigClass(config.getClass());
-      if (SemiAutoRebalancerConfig.class.equals(config.getClass())) {
-        SemiAutoRebalancerConfig semiAutoConfig =
-            BasicRebalancerConfig.convert(config, SemiAutoRebalancerConfig.class);
-        for (PartitionId partitionId : semiAutoConfig.getPartitionSet()) {
-          idealState.setPreferenceList(partitionId, semiAutoConfig.getPreferenceList(partitionId));
-        }
-      } else if (CustomRebalancerConfig.class.equals(config.getClass())) {
-        CustomRebalancerConfig customConfig =
-            BasicRebalancerConfig.convert(config, CustomRebalancerConfig.class);
-        for (PartitionId partitionId : customConfig.getPartitionSet()) {
-          idealState
-              .setParticipantStateMap(partitionId, customConfig.getPreferenceMap(partitionId));
-        }
-      } else {
-        for (PartitionId partitionId : partitionedConfig.getPartitionSet()) {
-          List<ParticipantId> preferenceList = Collections.emptyList();
-          idealState.setPreferenceList(partitionId, preferenceList);
-          Map<ParticipantId, State> participantStateMap = Collections.emptyMap();
-          idealState.setParticipantStateMap(partitionId, participantStateMap);
-        }
-      }
-      return idealState;
-    }
-    return null;
-  }
-
-  /**
-   * Builder for a basic data rebalancer config
-   */
-  public static final class Builder extends AbstractBuilder<Builder> {
-    /**
-     * Instantiate with a resource
-     * @param resourceId resource id
-     */
-    public Builder(ResourceId resourceId) {
-      super(resourceId);
-    }
-
-    @Override
-    protected Builder self() {
-      return this;
-    }
-
-    @Override
-    public PartitionedRebalancerConfig build() {
-      PartitionedRebalancerConfig config = new PartitionedRebalancerConfig();
-      super.update(config);
-      return config;
-    }
-  }
-
-  /**
-   * Abstract builder for a generic partitioned resource rebalancer config
-   */
-  public static abstract class AbstractBuilder<T extends BasicRebalancerConfig.AbstractBuilder<T>>
-      extends BasicRebalancerConfig.AbstractBuilder<T> {
-    private final ResourceId _resourceId;
-    private final Map<PartitionId, Partition> _partitionMap;
-    private RebalanceMode _rebalanceMode;
-    private boolean _anyLiveParticipant;
-    private int _replicaCount;
-    private int _maxPartitionsPerParticipant;
-
-    /**
-     * Instantiate with a resource
-     * @param resourceId resource id
-     */
-    public AbstractBuilder(ResourceId resourceId) {
-      super(resourceId);
-      _resourceId = resourceId;
-      _partitionMap = Maps.newHashMap();
-      _rebalanceMode = RebalanceMode.USER_DEFINED;
-      _anyLiveParticipant = false;
-      _replicaCount = 1;
-      _maxPartitionsPerParticipant = Integer.MAX_VALUE;
-    }
-
-    /**
-     * Set the rebalance mode for a partitioned rebalancer config
-     * @param rebalanceMode {@link RebalanceMode} enum value
-     * @return Builder
-     */
-    public T rebalanceMode(RebalanceMode rebalanceMode) {
-      _rebalanceMode = rebalanceMode;
-      return self();
-    }
-
-    /**
-     * Add a partition that the resource serves
-     * @param partition fully-qualified partition
-     * @return Builder
-     */
-    public T addPartition(Partition partition) {
-      _partitionMap.put(partition.getId(), partition);
-      return self();
-    }
-
-    /**
-     * Add a collection of partitions
-     * @param partitions any collection of Partition objects
-     * @return Builder
-     */
-    public T addPartitions(Collection<Partition> partitions) {
-      for (Partition partition : partitions) {
-        addPartition(partition);
-      }
-      return self();
-    }
-
-    /**
-     * Add a specified number of partitions with a default naming scheme, namely
-     * resourceId_partitionNumber where partitionNumber starts at 0
-     * @param partitionCount number of partitions to add
-     * @return Builder
-     */
-    public T addPartitions(int partitionCount) {
-      for (int i = 0; i < partitionCount; i++) {
-        addPartition(new Partition(PartitionId.from(_resourceId, Integer.toString(i))));
-      }
-      return self();
-    }
-
-    /**
-     * Set whether any live participant should be used in rebalancing
-     * @param anyLiveParticipant true if any live participant can be used, false otherwise
-     * @return Builder
-     */
-    public T anyLiveParticipant(boolean anyLiveParticipant) {
-      _anyLiveParticipant = anyLiveParticipant;
-      return self();
-    }
-
-    /**
-     * Set the number of replicas
-     * @param replicaCount number of replicas
-     * @return Builder
-     */
-    public T replicaCount(int replicaCount) {
-      _replicaCount = replicaCount;
-      return self();
-    }
-
-    /**
-     * Set the maximum number of partitions to assign to any participant
-     * @param maxPartitionsPerParticipant the maximum
-     * @return Builder
-     */
-    public T maxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
-      _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
-      return self();
-    }
-
-    /**
-     * Update a PartitionedRebalancerConfig with fields from this builder level
-     * @param config PartitionedRebalancerConfig
-     */
-    protected final void update(PartitionedRebalancerConfig config) {
-      super.update(config);
-      // enforce at least one partition
-      if (_partitionMap.isEmpty()) {
-        addPartitions(1);
-      }
-      config.setRebalanceMode(_rebalanceMode);
-      config.setPartitionMap(_partitionMap);
-      config.setAnyLiveParticipant(_anyLiveParticipant);
-      config.setMaxPartitionsPerParticipant(_maxPartitionsPerParticipant);
-      config.setReplicaCount(_replicaCount);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java
index 3f8c9d1..b725c9e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java
@@ -1,14 +1,6 @@
 package org.apache.helix.controller.rebalancer.config;
 
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
 import org.apache.helix.controller.serializer.StringSerializer;
 
 /*
@@ -31,31 +23,10 @@ import org.apache.helix.controller.serializer.StringSerializer;
  */
 
 /**
- * Defines the state available to a rebalancer. The most common use case is to use a
- * {@link PartitionedRebalancerConfig} or a subclass and set up a resource with it. A rebalancer
- * configuration, at a minimum, is aware of subunits of a resource, the state model to follow, and
- * how the configuration should be serialized.
+ * Defines the state available to a rebalancer. This is fully optional, but it allows persistence of
+ * arbitrary fields that would be useful for a user-defined rebalancer
  */
 public interface RebalancerConfig {
-  /**
-   * Get a map of resource partition identifiers to partitions. A partition is a subunit of a
-   * resource, e.g. a subtask of a task
-   * @return map of (subunit id, subunit) pairs
-   */
-  public Map<? extends PartitionId, ? extends Partition> getSubUnitMap();
-
-  /**
-   * Get the subunits of the resource (e.g. partitions)
-   * @return set of subunit ids
-   */
-  public Set<? extends PartitionId> getSubUnitIdSet();
-
-  /**
-   * Get a specific subunit
-   * @param subUnitId the id of the subunit
-   * @return SubUnit
-   */
-  public Partition getSubUnit(PartitionId subUnitId);
 
   /**
    * Get the resource to rebalance
@@ -64,32 +35,8 @@ public interface RebalancerConfig {
   public ResourceId getResourceId();
 
   /**
-   * Get the state model definition that the resource follows
-   * @return state model definition id
-   */
-  public StateModelDefId getStateModelDefId();
-
-  /**
-   * Get the state model factory of this resource
-   * @return state model factory id
-   */
-  public StateModelFactoryId getStateModelFactoryId();
-
-  /**
-   * Get the tag, if any, that participants must have in order to serve this resource
-   * @return participant group tag, or null
-   */
-  public String getParticipantGroupTag();
-
-  /**
    * Get the serializer for this config
    * @return StringSerializer class object
    */
   public Class<? extends StringSerializer> getSerializerClass();
-
-  /**
-   * Get a reference to the class used to rebalance this resource
-   * @return RebalancerRef
-   */
-  public RebalancerRef getRebalancerRef();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
index d6ddb50..19970c4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
@@ -119,21 +119,6 @@ public final class RebalancerConfigHolder {
   }
 
   /**
-   * Get a rebalancer class instance
-   * @return Rebalancer
-   */
-  public HelixRebalancer getRebalancer() {
-    // cache the rebalancer to avoid loading and instantiating it excessively
-    if (_rebalancer == null) {
-      if (_config == null || _config.getRebalancerRef() == null) {
-        return null;
-      }
-      _rebalancer = _config.getRebalancerRef().getRebalancer();
-    }
-    return _rebalancer;
-  }
-
-  /**
    * Get the instantiated RebalancerConfig
    * @param configClass specific class of the RebalancerConfig
    * @return RebalancerConfig subclass instance, or null if conversion is not possible

http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java
deleted file mode 100644
index 3118b2a..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.helix.controller.rebalancer.config;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Methods specifying a rebalancer config that allows replicas. For instance, a rebalancer config
- * with partitions may accept state model definitions that support multiple replicas per partition,
- * and it's possible that the policy is that each live participant in the system should have a
- * replica.
- */
-public interface ReplicatedRebalancerConfig extends RebalancerConfig {
-  /**
-   * Check if this resource should be assigned to any live participant
-   * @return true if any live participant expected, false otherwise
-   */
-  public boolean anyLiveParticipant();
-
-  /**
-   * Get the number of replicas that each resource subunit should have
-   * @return replica count
-   */
-  public int getReplicaCount();
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
deleted file mode 100644
index 60e30f4..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
+++ /dev/null
@@ -1,183 +0,0 @@
-package org.apache.helix.controller.rebalancer.config;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerConfig for SEMI_AUTO rebalancer mode. It indicates the preferred locations of each
- * partition replica. By default, it corresponds to {@link SemiAutoRebalancer}
- */
-public final class SemiAutoRebalancerConfig extends PartitionedRebalancerConfig {
-  @JsonProperty("preferenceLists")
-  private Map<PartitionId, List<ParticipantId>> _preferenceLists;
-
-  /**
-   * Instantiate a SemiAutoRebalancerConfig
-   */
-  public SemiAutoRebalancerConfig() {
-    if (getClass().equals(SemiAutoRebalancerConfig.class)) {
-      // only mark this as semi auto mode if this specifc config is used
-      setRebalanceMode(RebalanceMode.SEMI_AUTO);
-    } else {
-      setRebalanceMode(RebalanceMode.USER_DEFINED);
-    }
-    setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
-    _preferenceLists = Maps.newHashMap();
-  }
-
-  /**
-   * Get the preference lists of all partitions of the resource
-   * @return map of partition id to list of participant ids
-   */
-  public Map<PartitionId, List<ParticipantId>> getPreferenceLists() {
-    return _preferenceLists;
-  }
-
-  /**
-   * Set the preference lists of all partitions of the resource
-   * @param preferenceLists
-   */
-  public void setPreferenceLists(Map<PartitionId, List<ParticipantId>> preferenceLists) {
-    _preferenceLists = preferenceLists;
-  }
-
-  /**
-   * Get the preference list of a partition
-   * @param partitionId the partition to look up
-   * @return list of participant ids
-   */
-  @JsonIgnore
-  public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
-    return _preferenceLists.get(partitionId);
-  }
-
-  /**
-   * Generate preference lists based on a default cluster setup
-   * @param stateModelDef the state model definition to follow
-   * @param participantSet the set of participant ids to configure for
-   */
-  @Override
-  @JsonIgnore
-  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
-      Set<ParticipantId> participantSet) {
-    // compute default upper bounds
-    Map<State, String> upperBounds = Maps.newHashMap();
-    for (State state : stateModelDef.getTypedStatesPriorityList()) {
-      upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
-    }
-
-    // determine the current mapping
-    Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
-    for (PartitionId partitionId : getPartitionSet()) {
-      List<ParticipantId> preferenceList = getPreferenceList(partitionId);
-      if (preferenceList != null && !preferenceList.isEmpty()) {
-        Set<ParticipantId> disabledParticipants = Collections.emptySet();
-        Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
-        Map<ParticipantId, State> initialMap =
-            ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, participantSet,
-                stateModelDef, preferenceList, emptyCurrentState, disabledParticipants, true);
-        currentMapping.put(partitionId, initialMap);
-      }
-    }
-
-    // determine the preference
-    LinkedHashMap<State, Integer> stateCounts =
-        ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
-            getReplicaCount());
-    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
-    List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
-    List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
-    AutoRebalanceStrategy strategy =
-        new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
-            getMaxPartitionsPerParticipant(), placementScheme);
-    Map<String, List<String>> rawPreferenceLists =
-        strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
-            .getListFields();
-    Map<PartitionId, List<ParticipantId>> preferenceLists =
-        Maps.newHashMap(IdealState.preferenceListsFromStringLists(rawPreferenceLists));
-    setPreferenceLists(preferenceLists);
-  }
-
-  /**
-   * Build a SemiAutoRebalancerConfig. By default, it corresponds to {@link SemiAutoRebalancer}
-   */
-  public static final class Builder extends PartitionedRebalancerConfig.AbstractBuilder<Builder> {
-    private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
-
-    /**
-     * Instantiate for a resource
-     * @param resourceId resource id
-     */
-    public Builder(ResourceId resourceId) {
-      super(resourceId);
-      super.rebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
-      super.rebalanceMode(RebalanceMode.SEMI_AUTO);
-      _preferenceLists = Maps.newHashMap();
-    }
-
-    /**
-     * Add a preference list for a partition
-     * @param partitionId partition to set
-     * @param preferenceList ordered list of participants who can serve the partition
-     * @return Builder
-     */
-    public Builder preferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
-      _preferenceLists.put(partitionId, preferenceList);
-      return self();
-    }
-
-    @Override
-    protected Builder self() {
-      return this;
-    }
-
-    @Override
-    public SemiAutoRebalancerConfig build() {
-      SemiAutoRebalancerConfig config = new SemiAutoRebalancerConfig();
-      super.update(config);
-      config.setPreferenceLists(_preferenceLists);
-      return config;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 6f34953..f6f5d61 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -25,7 +25,6 @@ import java.util.Set;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Resource;
 import org.apache.helix.api.State;
 import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.id.ParticipantId;
@@ -193,51 +192,40 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       }
       ResourceConfig resourceConfig = resourceMap.get(resourceId);
       RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
-      StateModelDefinition stateModelDef =
-          stateModelDefs.get(rebalancerConfig.getStateModelDefId());
+      IdealState idealState = resourceConfig.getIdealState();
+      StateModelDefinition stateModelDef = stateModelDefs.get(idealState.getStateModelDefId());
       ResourceAssignment resourceAssignment = null;
-      if (rebalancerConfig != null) {
-        // use a cached rebalancer if possible
-        RebalancerRef ref = rebalancerConfig.getRebalancerRef();
-        HelixRebalancer rebalancer = null;
-        if (_rebalancerMap.containsKey(resourceId)) {
-          HelixRebalancer candidateRebalancer = _rebalancerMap.get(resourceId);
-          if (ref != null && candidateRebalancer.getClass().equals(ref.toString())) {
-            rebalancer = candidateRebalancer;
-          }
+      // use a cached rebalancer if possible
+      RebalancerRef ref = idealState.getRebalancerRef();
+      HelixRebalancer rebalancer = null;
+      if (_rebalancerMap.containsKey(resourceId)) {
+        HelixRebalancer candidateRebalancer = _rebalancerMap.get(resourceId);
+        if (ref != null && candidateRebalancer.getClass().equals(ref.toString())) {
+          rebalancer = candidateRebalancer;
         }
+      }
 
-        // otherwise instantiate a new one
-        if (rebalancer == null) {
-          if (ref != null) {
-            rebalancer = ref.getRebalancer();
-          }
-          HelixManager manager = event.getAttribute("helixmanager");
-          ControllerContextProvider provider =
-              event.getAttribute(AttributeName.CONTEXT_PROVIDER.toString());
-          if (rebalancer == null) {
-            rebalancer = new FallbackRebalancer();
-          }
-          rebalancer.init(manager, provider);
-          _rebalancerMap.put(resourceId, rebalancer);
+      // otherwise instantiate a new one
+      if (rebalancer == null) {
+        if (ref != null) {
+          rebalancer = ref.getRebalancer();
         }
-        ResourceAssignment currentAssignment = null;
-        IdealState idealState;
-        Resource resourceSnapshot = cluster.getResource(resourceId);
-        if (resourceSnapshot != null) {
-          currentAssignment = resourceSnapshot.getResourceAssignment();
-          idealState = resourceSnapshot.getIdealState();
-        } else {
-          idealState = new IdealState(resourceId);
-        }
-        try {
-
-          resourceAssignment =
-              rebalancer.computeResourceMapping(idealState, rebalancerConfig, currentAssignment,
-                  cluster, currentStateOutput);
-        } catch (Exception e) {
-          LOG.error("Rebalancer for resource " + resourceId + " failed.", e);
+        HelixManager manager = event.getAttribute("helixmanager");
+        ControllerContextProvider provider =
+            event.getAttribute(AttributeName.CONTEXT_PROVIDER.toString());
+        if (rebalancer == null) {
+          rebalancer = new FallbackRebalancer();
         }
+        rebalancer.init(manager, provider);
+        _rebalancerMap.put(resourceId, rebalancer);
+      }
+      ResourceAssignment currentAssignment = null;
+      try {
+        resourceAssignment =
+            rebalancer.computeResourceMapping(idealState, rebalancerConfig, currentAssignment,
+                cluster, currentStateOutput);
+      } catch (Exception e) {
+        LOG.error("Rebalancer for resource " + resourceId + " failed.", e);
       }
       if (resourceAssignment == null) {
         resourceAssignment =

http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index deabb56..810a675 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -44,7 +44,6 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -93,7 +92,6 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
       // if resource ideal state has bucket size, set it
       // otherwise resource has been dropped, use bucket size from current state instead
       ResourceConfig resource = resourceMap.get(resourceId);
-      RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
       SchedulerTaskConfig schedulerTaskConfig = resource.getSchedulerTaskConfig();
 
       if (resource.getIdealState().getBucketSize() > 0) {
@@ -146,10 +144,8 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
         // partitions are finished (COMPLETED or ERROR), update the status update of the original
         // scheduler
         // message, and then remove the partitions from the ideal state
-        if (rebalancerConfig != null
-            && rebalancerConfig.getStateModelDefId() != null
-            && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
-                StateModelDefId.SchedulerTaskQueue)) {
+        if (idealState != null && idealState.getStateModelDefId() != null
+            && idealState.getStateModelDefId().equalsIgnoreCase(StateModelDefId.SchedulerTaskQueue)) {
           updateScheduledTaskStatus(resourceId, view, manager, schedulerTaskConfig);
         }
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
index 61da673..4ba249f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
@@ -39,7 +39,7 @@ import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.api.id.StateModelFactoryId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
@@ -75,12 +75,10 @@ public class MessageGenerationStage extends AbstractBaseStage {
     for (ResourceId resourceId : resourceMap.keySet()) {
       ResourceConfig resourceConfig = resourceMap.get(resourceId);
       int bucketSize = 0;
-      if (resourceConfig.getIdealState() != null) {
-        bucketSize = resourceConfig.getIdealState().getBucketSize();
-      }
+      bucketSize = resourceConfig.getIdealState().getBucketSize();
 
-      RebalancerConfig rebalancerCfg = resourceConfig.getRebalancerConfig();
-      StateModelDefinition stateModelDef = stateModelDefMap.get(rebalancerCfg.getStateModelDefId());
+      IdealState idealState = resourceConfig.getIdealState();
+      StateModelDefinition stateModelDef = stateModelDefMap.get(idealState.getStateModelDefId());
 
       ResourceAssignment resourceAssignment =
           bestPossibleStateOutput.getResourceAssignment(resourceId);
@@ -134,15 +132,14 @@ public class MessageGenerationStage extends AbstractBaseStage {
             SessionId sessionId =
                 SessionId.from(cluster.getLiveParticipantMap().get(participantId).getLiveInstance()
                     .getSessionId());
-            RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
             Message message =
                 createMessage(manager, resourceId, subUnitId, participantId, currentState,
                     nextState, sessionId, StateModelDefId.from(stateModelDef.getId()),
-                    rebalancerConfig.getStateModelFactoryId(), bucketSize);
+                    idealState.getStateModelFactoryId(), bucketSize);
 
             // TODO refactor get/set timeout/inner-message
-            if (rebalancerConfig != null
-                && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
+            if (idealState != null
+                && idealState.getStateModelDefId().equalsIgnoreCase(
                     StateModelDefId.SchedulerTaskQueue)) {
               if (resourceConfig.getSubUnitSet().size() > 0) {
                 // TODO refactor it -- we need a way to read in scheduler tasks a priori

http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 2408e29..0e4a694 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.helix.HelixConstants.StateModelToken;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.Resource;
@@ -37,9 +38,7 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.ReplicatedRebalancerConfig;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -108,7 +107,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
     for (ResourceId resourceId : resourceMap.keySet()) {
       ResourceConfig resource = resourceMap.get(resourceId);
       StateModelDefinition stateModelDef =
-          stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
+          stateModelDefMap.get(resource.getIdealState().getStateModelDefId());
 
       if (stateModelDef == null) {
         LOG.info("resource: "
@@ -124,7 +123,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
       // if configResource == null, the resource has been dropped
       Map<State, Bounds> stateConstraints =
           computeStateConstraints(stateModelDef,
-              configResource == null ? null : configResource.getRebalancerConfig(), cluster);
+              configResource == null ? null : configResource.getIdealState(), cluster);
 
       // TODO fix it
       for (PartitionId partitionId : bestPossibleStateOutput.getResourceAssignment(resourceId)
@@ -270,10 +269,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
    * @return
    */
   private Map<State, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
-      RebalancerConfig rebalancerConfig, Cluster cluster) {
-    ReplicatedRebalancerConfig config =
-        (rebalancerConfig != null) ? BasicRebalancerConfig.convert(rebalancerConfig,
-            ReplicatedRebalancerConfig.class) : null;
+      IdealState idealState, Cluster cluster) {
     Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
 
     List<State> statePriorityList = stateModelDefinition.getTypedStatesPriorityList();
@@ -285,11 +281,12 @@ public class MessageSelectionStage extends AbstractBaseStage {
       } else if ("R".equals(numInstancesPerState)) {
         // idealState is null when resource has been dropped,
         // R can't be evaluated and ignore state constraints
-        if (config != null) {
-          if (config.anyLiveParticipant()) {
+        if (idealState != null) {
+          String replicas = idealState.getReplicas();
+          if (replicas.equals(StateModelToken.ANY_LIVEINSTANCE.toString())) {
             max = cluster.getLiveParticipantMap().size();
           } else {
-            max = config.getReplicaCount();
+            max = Integer.parseInt(replicas);
           }
         }
       } else {

http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 1036b35..ccd8ae6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -19,22 +19,23 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
-import org.apache.helix.api.Partition;
 import org.apache.helix.api.Resource;
+import org.apache.helix.api.State;
 import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelFactoryId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
 import org.apache.log4j.Logger;
 
 /**
@@ -85,14 +86,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
    * @throws StageException
    */
   Map<ResourceId, ResourceConfig> getCurStateResourceCfgMap(Cluster cluster) throws StageException {
-    Map<ResourceId, ResourceConfig.Builder> resCfgBuilderMap =
-        new HashMap<ResourceId, ResourceConfig.Builder>();
-
-    Map<ResourceId, PartitionedRebalancerConfig.Builder> rebCtxBuilderMap =
-        new HashMap<ResourceId, PartitionedRebalancerConfig.Builder>();
-
-    Map<ResourceId, Integer> bucketSizeMap = new HashMap<ResourceId, Integer>();
-    Map<ResourceId, Boolean> batchModeMap = new HashMap<ResourceId, Boolean>();
+    Map<ResourceId, IdealState> idealStateMap = new HashMap<ResourceId, IdealState>();
 
     for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
       for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
@@ -111,35 +105,28 @@ public class ResourceComputationStage extends AbstractBaseStage {
               + currentState.getResourceId());
         }
 
-        if (!resCfgBuilderMap.containsKey(resourceId)) {
-          PartitionedRebalancerConfig.Builder rebCtxBuilder =
-              new PartitionedRebalancerConfig.Builder(resourceId);
-          rebCtxBuilder.stateModelDefId(currentState.getStateModelDefId());
-          rebCtxBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
-              .getStateModelFactoryName()));
-          rebCtxBuilderMap.put(resourceId, rebCtxBuilder);
-
-          ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
-          resCfgBuilderMap.put(resourceId, resCfgBuilder);
-          bucketSizeMap.put(resourceId, currentState.getBucketSize());
-          batchModeMap.put(resourceId, currentState.getBatchMessageMode());
+        if (!idealStateMap.containsKey(resourceId)) {
+          IdealState idealState = new IdealState(resourceId);
+          idealState.setStateModelDefId(currentState.getStateModelDefId());
+          idealState.setStateModelFactoryName(currentState.getStateModelFactoryName());
+          idealState.setBucketSize(currentState.getBucketSize());
+          idealState.setBatchMessageMode(currentState.getBatchMessageMode());
+          idealStateMap.put(resourceId, idealState);
         }
 
-        PartitionedRebalancerConfig.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
+        IdealState idealState = idealStateMap.get(resourceId);
         for (PartitionId partitionId : currentState.getTypedPartitionStateMap().keySet()) {
-          rebCtxBuilder.addPartition(new Partition(partitionId));
+          idealState.setParticipantStateMap(partitionId, new HashMap<ParticipantId, State>());
+          idealState.setPreferenceList(partitionId, new ArrayList<ParticipantId>());
         }
       }
     }
 
     Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
-    for (ResourceId resourceId : resCfgBuilderMap.keySet()) {
-      ResourceConfig.Builder resCfgBuilder = resCfgBuilderMap.get(resourceId);
-      PartitionedRebalancerConfig.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
-      RebalancerConfig rebalancerConfig = rebCtxBuilder.build();
-      resCfgBuilder.rebalancerConfig(rebalancerConfig);
-      resCfgBuilder.idealState(PartitionedRebalancerConfig.rebalancerConfigToIdealState(
-          rebalancerConfig, bucketSizeMap.get(resourceId), batchModeMap.get(resourceId)));
+    for (ResourceId resourceId : idealStateMap.keySet()) {
+      ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
+      IdealState idealState = idealStateMap.get(resourceId);
+      resCfgBuilder.idealState(idealState);
       resCfgMap.put(resourceId, resCfgBuilder.build());
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 88ec610..b0347ef 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -40,14 +40,11 @@ import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.controller.rebalancer.CustomRebalancer;
+import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
 import org.apache.helix.controller.rebalancer.HelixRebalancer;
 import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
-import org.apache.helix.util.HelixUtil;
+import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Enums;
@@ -217,47 +214,26 @@ public class IdealState extends HelixProperty {
    * @return RebalancerRef
    */
   public RebalancerRef getRebalancerRef() {
+    RebalancerRef ref = null;
     String className = getRebalancerClassName();
     if (className != null) {
-      return RebalancerRef.from(getRebalancerClassName());
-    }
-    return null;
-  }
-
-  /**
-   * Set the RebalancerConfig implementation class for this resource
-   * @param clazz the class object
-   */
-  public void setRebalancerConfigClass(Class<? extends RebalancerConfig> clazz) {
-    String className = clazz.getName();
-    _record.setSimpleField(IdealStateProperty.REBALANCER_CONFIG_NAME.toString(), className);
-  }
-
-  /**
-   * Get the class representing the rebalancer config of this resource
-   * @return The rebalancer config class
-   */
-  public Class<? extends RebalancerConfig> getRebalancerConfigClass() {
-    // try to extract the class from the persisted data
-    String className = _record.getSimpleField(IdealStateProperty.REBALANCER_CONFIG_NAME.toString());
-    if (className != null) {
-      try {
-        return HelixUtil.loadClass(getClass(), className).asSubclass(RebalancerConfig.class);
-      } catch (ClassNotFoundException e) {
-        logger.error(className + " is not a valid class");
+      ref = RebalancerRef.from(getRebalancerClassName());
+    } else {
+      switch (getRebalanceMode()) {
+      case FULL_AUTO:
+        ref = RebalancerRef.from(FullAutoRebalancer.class);
+        break;
+      case SEMI_AUTO:
+        ref = RebalancerRef.from(SemiAutoRebalancer.class);
+        break;
+      case CUSTOMIZED:
+        ref = RebalancerRef.from(CustomRebalancer.class);
+        break;
+      default:
+        break;
       }
     }
-    // the fallback is to use the mode
-    switch (getRebalanceMode()) {
-    case FULL_AUTO:
-      return FullAutoRebalancerConfig.class;
-    case SEMI_AUTO:
-      return SemiAutoRebalancerConfig.class;
-    case CUSTOMIZED:
-      return CustomRebalancerConfig.class;
-    default:
-      return PartitionedRebalancerConfig.class;
-    }
+    return ref;
   }
 
   /**
@@ -333,13 +309,7 @@ public class IdealState extends HelixProperty {
     case CUSTOMIZED:
       return _record.getMapFields().keySet();
     case USER_DEFINED:
-      Class<? extends RebalancerConfig> configClass = getRebalancerConfigClass();
-      if (configClass.equals(SemiAutoRebalancerConfig.class)
-          || configClass.equals(FullAutoRebalancerConfig.class)) {
-        return _record.getListFields().keySet();
-      } else {
-        return _record.getMapFields().keySet();
-      }
+      return _record.getMapFields().keySet();
     default:
       logger.error("Invalid ideal state mode:" + getResourceName());
       return Collections.emptySet();
@@ -414,18 +384,8 @@ public class IdealState extends HelixProperty {
    * @return set of instance names
    */
   public Set<String> getInstanceSet(String partitionName) {
-    boolean useListFields = false;
     RebalanceMode rebalanceMode = getRebalanceMode();
-    if (rebalanceMode == RebalanceMode.USER_DEFINED) {
-      Class<? extends RebalancerConfig> configClass = getRebalancerConfigClass();
-      if (configClass.equals(SemiAutoRebalancerConfig.class)
-          || configClass.equals(FullAutoRebalancerConfig.class)) {
-        // override: if the user defined rebalancer expects auto-type inputs, use the list fields
-        useListFields = true;
-      }
-    }
-    if (useListFields || rebalanceMode == RebalanceMode.SEMI_AUTO
-        || rebalanceMode == RebalanceMode.FULL_AUTO) {
+    if (rebalanceMode == RebalanceMode.SEMI_AUTO || rebalanceMode == RebalanceMode.FULL_AUTO) {
       // get instances from list fields
       List<String> prefList = _record.getListField(partitionName);
       if (prefList != null) {
@@ -825,7 +785,6 @@ public class IdealState extends HelixProperty {
   }
 
   /**
-   * <<<<<<< HEAD
    * Get the non-Helix simple fields from this property and add them to a UserConfig
    * @param userConfig the user config to update
    */