You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/07 07:40:40 UTC

[3/3] git commit: [HELIX-109] adding config classes

[HELIX-109] adding config classes


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/5972a44e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/5972a44e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/5972a44e

Branch: refs/heads/helix-logical-model
Commit: 5972a44e726dcb0d9ea3671eac5caf45fc72c0fc
Parents: c07569d
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Fri Sep 6 22:39:57 2013 -0700
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Fri Sep 6 22:39:57 2013 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/api/Cluster.java |  54 ++++--
 .../org/apache/helix/api/ClusterAccessor.java   |  63 +++++-
 .../org/apache/helix/api/ClusterConfig.java     | 180 +++++++++++++++++
 .../java/org/apache/helix/api/Participant.java  | 157 ++-------------
 .../org/apache/helix/api/ParticipantConfig.java | 194 +++++++++++++++++++
 .../org/apache/helix/api/RebalancerConfig.java  |  64 +++++-
 .../org/apache/helix/api/RebalancerRef.java     |   5 +-
 .../java/org/apache/helix/api/Resource.java     | 172 +++++-----------
 .../org/apache/helix/api/ResourceConfig.java    | 173 +++++++++++++++++
 .../controller/GenericHelixController.java      |  14 +-
 .../rebalancer/NewAutoRebalancer.java           |  42 ++--
 .../rebalancer/NewCustomRebalancer.java         |  13 +-
 .../rebalancer/NewSemiAutoRebalancer.java       |  13 +-
 .../util/NewConstraintBasedAssignment.java      |   5 +-
 .../stages/NewBestPossibleStateCalcStage.java   |   7 +-
 .../stages/NewBestPossibleStateOutput.java      |   9 +
 .../stages/NewCurrentStateComputationStage.java |  12 +-
 .../stages/NewExternalViewComputeStage.java     |  12 +-
 .../stages/NewMessageGenerationStage.java       |  22 ++-
 .../stages/NewMessageSelectionStage.java        |  14 +-
 .../stages/NewMessageThrottleStage.java         |  12 +-
 .../stages/NewResourceComputationStage.java     |  16 +-
 .../stages/NewTaskAssignmentStage.java          |  18 +-
 .../apache/helix/model/ClusterConstraints.java  |   8 +
 .../java/org/apache/helix/model/IdealState.java |  18 +-
 .../apache/helix/model/ResourceAssignment.java  |   8 +
 .../helix/tools/ClusterStateVerifier.java       |  84 +++++---
 .../org/apache/helix/api/TestNewStages.java     |  12 +-
 .../helix/controller/stages/BaseStageTest.java  |  67 ++++---
 .../TestBestPossibleCalcStageCompatibility.java |  73 ++++---
 .../stages/TestBestPossibleStateCalcStage.java  |  35 ++--
 .../stages/TestCompatibilityCheckStage.java     |  13 +-
 .../TestCurrentStateComputationStage.java       |  52 ++---
 .../stages/TestMessageThrottleStage.java        |  35 ++--
 .../stages/TestMsgSelectionStage.java           |  96 ++++++---
 .../stages/TestRebalancePipeline.java           |  76 ++++----
 .../stages/TestResourceComputationStage.java    |  79 ++++----
 .../helix/integration/TestAutoRebalance.java    |   6 +-
 .../TestCustomizedIdealStateRebalancer.java     |  59 +++---
 39 files changed, 1303 insertions(+), 689 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index e890fb4..ce2318a 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -25,13 +25,14 @@ import java.util.Map;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 
+import com.google.common.base.Function;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 
 /**
  * Represent a logical helix cluster
  */
 public class Cluster {
-  private final ClusterId _id;
 
   /**
    * map of resource-id to resource
@@ -60,9 +61,7 @@ public class Cluster {
 
   private final ControllerId _leaderId;
 
-  private final ClusterConfig _config = null;
-
-  private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+  private final ClusterConfig _config;
 
   /**
    * construct a cluster
@@ -74,9 +73,26 @@ public class Cluster {
    */
   public Cluster(ClusterId id, Map<ResourceId, Resource> resourceMap,
       Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
-      ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap) {
-
-    _id = id;
+      ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap, boolean isPaused) {
+
+    // build the config
+    // Guava's transform and "copy" functions really return views so the maps all reflect each other
+    Map<ResourceId, ResourceConfig> resourceConfigMap =
+        Maps.transformValues(resourceMap, new Function<Resource, ResourceConfig>() {
+          @Override
+          public ResourceConfig apply(Resource resource) {
+            return resource.getConfig();
+          }
+        });
+    Map<ParticipantId, ParticipantConfig> participantConfigMap =
+        Maps.transformValues(participantMap, new Function<Participant, ParticipantConfig>() {
+          @Override
+          public ParticipantConfig apply(Participant participant) {
+            return participant.getConfig();
+          }
+        });
+    _config =
+        new ClusterConfig(id, resourceConfigMap, participantConfigMap, constraintMap, isPaused);
 
     _resourceMap = ImmutableMap.copyOf(resourceMap);
 
@@ -94,8 +110,6 @@ public class Cluster {
 
     _leaderId = leaderId;
 
-    _constraintMap = ImmutableMap.copyOf(constraintMap);
-
     // TODO impl this when we persist controllers and spectators on zookeeper
     _controllerMap = ImmutableMap.copyOf(controllerMap);
     _spectatorMap = Collections.emptyMap();
@@ -106,7 +120,7 @@ public class Cluster {
    * @return cluster id
    */
   public ClusterId getId() {
-    return _id;
+    return _config.getId();
   }
 
   /**
@@ -167,17 +181,27 @@ public class Cluster {
   }
 
   /**
-   * @return
+   * Get all the constraints on the cluster
+   * @return map of constraint type to constraints
    */
   public Map<ConstraintType, ClusterConstraints> getConstraintMap() {
-    return _constraintMap;
+    return _config.getConstraintMap();
   }
 
   /**
-   * @param type
-   * @return
+   * Get a cluster constraint
+   * @param type the type of constrant to query
+   * @return cluster constraints, or null if none
    */
   public ClusterConstraints getConstraint(ConstraintType type) {
-    return _constraintMap.get(type);
+    return _config.getConstraintMap().get(type);
+  }
+
+  /**
+   * Check the pasued status of the cluster
+   * @return true if paused, false otherwise
+   */
+  public boolean isPaused() {
+    return _config.isPaused();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
index 04d5831..a9fcf79 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
@@ -24,12 +24,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
@@ -76,7 +78,7 @@ public class ClusterAccessor {
    */
   public void dropCluster() {
     LOG.info("Dropping cluster: " + _clusterId);
-    List<String> liveInstanceNames =_accessor.getChildNames(_keyBuilder.liveInstances());
+    List<String> liveInstanceNames = _accessor.getChildNames(_keyBuilder.liveInstances());
     if (liveInstanceNames.size() > 0) {
       throw new HelixException("Can't drop cluster: " + _clusterId
           + " because there are running participant: " + liveInstanceNames
@@ -97,6 +99,7 @@ public class ClusterAccessor {
    * @return cluster
    */
   public Cluster readCluster() {
+    // TODO many of these should live in resource, participant, etc accessors
     /**
      * map of instance-id to instance-config
      */
@@ -147,13 +150,20 @@ public class ClusterAccessor {
     Map<String, ClusterConstraints> constraintMap =
         _accessor.getChildValuesMap(_keyBuilder.constraints());
 
+    /**
+     * Map of resource id to external view
+     */
+    Map<String, ExternalView> externalViewMap =
+        _accessor.getChildValuesMap(_keyBuilder.externalViews());
+
     Map<ResourceId, Resource> resourceMap = new HashMap<ResourceId, Resource>();
     for (String resourceName : idealStateMap.keySet()) {
       IdealState idealState = idealStateMap.get(resourceName);
-
       // TODO pass resource assignment
       ResourceId resourceId = Id.resource(resourceName);
-      resourceMap.put(resourceId, new Resource(resourceId, idealState, null));
+      resourceMap.put(resourceId,
+          new Resource(resourceId, idealState, null, externalViewMap.get(resourceName),
+              liveInstanceMap.size()));
     }
 
     Map<ParticipantId, Participant> participantMap = new HashMap<ParticipantId, Participant>();
@@ -182,8 +192,11 @@ public class ClusterAccessor {
           constraintMap.get(constraintType));
     }
 
+    PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
+    boolean isPaused = pauseSignal != null;
+
     return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
-        clusterConstraintMap);
+        clusterConstraintMap, isPaused);
   }
 
   /**
@@ -204,7 +217,7 @@ public class ClusterAccessor {
    * add a resource to cluster
    * @param resource
    */
-  public void addResourceToCluster(Resource resource) {
+  public void addResourceToCluster(ResourceConfig resource) {
     StateModelDefId stateModelDefId = resource.getRebalancerConfig().getStateModelDefId();
     if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
       throw new HelixException("State model: " + stateModelDefId + " not found in cluster: "
@@ -217,8 +230,42 @@ public class ClusterAccessor {
           + ", because resource ideal state already exists in cluster: " + _clusterId);
     }
 
-    // TODO convert rebalancerConfig to idealState
-    _accessor.createProperty(_keyBuilder.idealState(resourceId.stringify()), null);
+    // Create an IdealState from a RebalancerConfig
+    RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+    IdealState idealState = new IdealState(resourceId);
+    idealState.setRebalanceMode(rebalancerConfig.getRebalancerMode());
+    idealState.setMaxPartitionsPerInstance(rebalancerConfig.getMaxPartitionsPerParticipant());
+    if (rebalancerConfig.canAssignAnyLiveParticipant()) {
+      idealState.setReplicas(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString());
+    } else {
+      idealState.setReplicas(Integer.toString(rebalancerConfig.getReplicaCount()));
+    }
+    idealState.setStateModelDefId(rebalancerConfig.getStateModelDefId());
+    for (PartitionId partitionId : resource.getPartitionSet()) {
+      List<ParticipantId> preferenceList = rebalancerConfig.getPreferenceList(partitionId);
+      Map<ParticipantId, State> preferenceMap = rebalancerConfig.getPreferenceMap(partitionId);
+      if (preferenceList != null) {
+        idealState.setPreferenceList(partitionId, preferenceList);
+      }
+      if (preferenceMap != null) {
+        idealState.setParticipantStateMap(partitionId, preferenceMap);
+      }
+    }
+    idealState.setBucketSize(rebalancerConfig.getBucketSize());
+    idealState.setBatchMessageMode(rebalancerConfig.getBatchMessageMode());
+    String groupTag = rebalancerConfig.getParticipantGroupTag();
+    if (groupTag != null) {
+      idealState.setInstanceGroupTag(groupTag);
+    }
+    RebalancerRef rebalancerRef = rebalancerConfig.getRebalancerRef();
+    if (rebalancerRef != null) {
+      idealState.setRebalancerRef(rebalancerRef);
+    }
+    StateModelFactoryId stateModelFactoryId = rebalancerConfig.getStateModelFactoryId();
+    if (stateModelFactoryId != null) {
+      idealState.setStateModelFactoryId(stateModelFactoryId);
+    }
+    _accessor.createProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
   }
 
   /**
@@ -244,7 +291,7 @@ public class ClusterAccessor {
    * add a participant to cluster
    * @param participant
    */
-  public void addParticipantToCluster(Participant participant) {
+  public void addParticipantToCluster(ParticipantConfig participant) {
     if (!isClusterStructureValid()) {
       throw new HelixException("Cluster: " + _clusterId + " structure is not valid");
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
index 8a2f629..1585aae 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
@@ -1,5 +1,14 @@
 package org.apache.helix.api;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+
+import com.google.common.collect.ImmutableMap;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,6 +28,177 @@ package org.apache.helix.api;
  * under the License.
  */
 
+/**
+ * Configuration properties of a cluster
+ */
 public class ClusterConfig {
+  private final ClusterId _id;
+  private final Map<ResourceId, ResourceConfig> _resourceMap;
+  private final Map<ParticipantId, ParticipantConfig> _participantMap;
+  private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+  private final boolean _isPaused;
+
+  /**
+   * Initialize a cluster configuration. Also see ClusterConfig.Builder
+   * @param id
+   * @param resourceMap
+   * @param participantMap
+   * @param constraintMap
+   */
+  public ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
+      Map<ParticipantId, ParticipantConfig> participantMap,
+      Map<ConstraintType, ClusterConstraints> constraintMap, boolean isPaused) {
+    _id = id;
+    _resourceMap = ImmutableMap.copyOf(resourceMap);
+    _participantMap = ImmutableMap.copyOf(participantMap);
+    _constraintMap = ImmutableMap.copyOf(constraintMap);
+    _isPaused = isPaused;
+  }
+
+  /**
+   * Get cluster id
+   * @return cluster id
+   */
+  public ClusterId getId() {
+    return _id;
+  }
+
+  /**
+   * Get resources in the cluster
+   * @return a map of resource id to resource, or empty map if none
+   */
+  public Map<ResourceId, ResourceConfig> getResourceMap() {
+    return _resourceMap;
+  }
+
+  /**
+   * Get all the constraints on the cluster
+   * @return map of constraint type to constraints
+   */
+  public Map<ConstraintType, ClusterConstraints> getConstraintMap() {
+    return _constraintMap;
+  }
+
+  /**
+   * Get participants of the cluster
+   * @return a map of participant id to participant, or empty map if none
+   */
+  public Map<ParticipantId, ParticipantConfig> getParticipantMap() {
+    return _participantMap;
+  }
+
+  /**
+   * Check the pasued status of the cluster
+   * @return true if paused, false otherwise
+   */
+  public boolean isPaused() {
+    return _isPaused;
+  }
+
+  /**
+   * Assembles a cluster configuration
+   */
+  public static class Builder {
+    private final ClusterId _id;
+    private final Map<ResourceId, ResourceConfig> _resourceMap;
+    private final Map<ParticipantId, ParticipantConfig> _participantMap;
+    private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+    private boolean _isPaused;
+
+    /**
+     * Initialize builder for a cluster
+     * @param id cluster id
+     */
+    public Builder(ClusterId id) {
+      _id = id;
+      _resourceMap = new HashMap<ResourceId, ResourceConfig>();
+      _participantMap = new HashMap<ParticipantId, ParticipantConfig>();
+      _constraintMap = new HashMap<ConstraintType, ClusterConstraints>();
+      _isPaused = false;
+    }
+
+    /**
+     * Add a resource to the cluster
+     * @param resource resource configuration
+     * @return Builder
+     */
+    public Builder addResource(ResourceConfig resource) {
+      _resourceMap.put(resource.getId(), resource);
+      return this;
+    }
+
+    /**
+     * Add multiple resources to the cluster
+     * @param resources resource configurations
+     * @return Builder
+     */
+    public Builder addResources(Collection<ResourceConfig> resources) {
+      for (ResourceConfig resource : resources) {
+        addResource(resource);
+      }
+      return this;
+    }
+
+    /**
+     * Add a participant to the cluster
+     * @param participant participant configuration
+     * @return Builder
+     */
+    public Builder addParticipant(ParticipantConfig participant) {
+      _participantMap.put(participant.getId(), participant);
+      return this;
+    }
+
+    /**
+     * Add multiple participants to the cluster
+     * @param participants participant configurations
+     * @return Builder
+     */
+    public Builder addParticipants(Collection<ParticipantConfig> participants) {
+      for (ParticipantConfig participant : participants) {
+        addParticipant(participant);
+      }
+      return this;
+    }
+
+    /**
+     * Add a constraint to the cluster
+     * @param constraint cluster constraint of a specific type
+     * @return Builder
+     */
+    public Builder addConstraint(ClusterConstraints constraint) {
+      _constraintMap.put(constraint.getType(), constraint);
+      return this;
+    }
+
+    /**
+     * Add multiple constraints to the cluster
+     * @param constraints cluster constraints of multiple distinct types
+     * @return Builder
+     */
+    public Builder addConstraints(Collection<ClusterConstraints> constraints) {
+      for (ClusterConstraints constraint : constraints) {
+        addConstraint(constraint);
+      }
+      return this;
+    }
+
+    /**
+     * Set the paused status of the cluster
+     * @param isPaused true if paused, false otherwise
+     * @return Builder
+     */
+    public Builder setPausedStatus(boolean isPaused) {
+      _isPaused = isPaused;
+      return this;
+    }
 
+    /**
+     * Create the cluster configuration
+     * @return ClusterConfig
+     */
+    public ClusterConfig build() {
+      return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _isPaused);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/Participant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Participant.java b/helix-core/src/main/java/org/apache/helix/api/Participant.java
index 10ceebd..0c0cd12 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Participant.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Participant.java
@@ -19,8 +19,6 @@ package org.apache.helix.api;
  * under the License.
  */
 
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -28,22 +26,12 @@ import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 
 /**
  * A cluster participant
  */
 public class Participant {
-  private final ParticipantId _id;
-  private final String _hostName;
-  private final int _port;
-  private final boolean _isEnabled;
-
-  /**
-   * set of disabled partition id's
-   */
-  private final Set<PartitionId> _disabledPartitionIdSet;
-  private final Set<String> _tags;
+  private final ParticipantConfig _config;
 
   private final RunningInstance _runningInstance;
 
@@ -64,12 +52,7 @@ public class Participant {
   public Participant(ParticipantId id, String hostName, int port, boolean isEnabled,
       Set<PartitionId> disabledPartitionIdSet, Set<String> tags, RunningInstance runningInstance,
       Map<ResourceId, CurrentState> currentStateMap, Map<MessageId, Message> messageMap) {
-    _id = id;
-    _hostName = hostName;
-    _port = port;
-    _isEnabled = isEnabled;
-    _disabledPartitionIdSet = ImmutableSet.copyOf(disabledPartitionIdSet);
-    _tags = ImmutableSet.copyOf(tags);
+    _config = new ParticipantConfig(id, hostName, port, isEnabled, disabledPartitionIdSet, tags);
     _runningInstance = runningInstance;
     _currentStateMap = ImmutableMap.copyOf(currentStateMap);
     _messageMap = ImmutableMap.copyOf(messageMap);
@@ -80,7 +63,7 @@ public class Participant {
    * @return host name, or null if not applicable
    */
   public String getHostName() {
-    return _hostName;
+    return _config.getHostName();
   }
 
   /**
@@ -88,7 +71,7 @@ public class Participant {
    * @return port number, or -1 if not applicable
    */
   public int getPort() {
-    return _port;
+    return _config.getPort();
   }
 
   /**
@@ -96,7 +79,7 @@ public class Participant {
    * @return true if enabled or false otherwise
    */
   public boolean isEnabled() {
-    return _isEnabled;
+    return _config.isEnabled();
   }
 
   /**
@@ -120,7 +103,7 @@ public class Participant {
    * @return set of disabled partition id's, or empty set if none
    */
   public Set<PartitionId> getDisablePartitionIds() {
-    return _disabledPartitionIdSet;
+    return _config.getDisablePartitionIds();
   }
 
   /**
@@ -128,7 +111,7 @@ public class Participant {
    * @return set of tags
    */
   public Set<String> getTags() {
-    return _tags;
+    return _config.getTags();
   }
 
   /**
@@ -137,7 +120,7 @@ public class Participant {
    * @return true if tagged, false otherwise
    */
   public boolean hasTag(String tag) {
-    return _tags.contains(tag);
+    return _config.hasTag(tag);
   }
 
   /**
@@ -156,125 +139,19 @@ public class Participant {
     return _currentStateMap;
   }
 
+  /**
+   * Get the participant id
+   * @return ParticipantId
+   */
   public ParticipantId getId() {
-    return _id;
+    return _config.getId();
   }
 
   /**
-   * Assemble a participant
+   * Get the participant configuration
+   * @return ParticipantConfig that backs this participant
    */
-  public static class Builder {
-    private final ParticipantId _id;
-    private final Set<PartitionId> _disabledPartitions;
-    private final Set<String> _tags;
-    private final Map<ResourceId, CurrentState> _currentStateMap;
-    private final Map<MessageId, Message> _messageMap;
-    private String _hostName;
-    private int _port;
-    private boolean _isEnabled;
-    private RunningInstance _runningInstance;
-
-    /**
-     * Build a participant with a given id
-     * @param id participant id
-     */
-    public Builder(ParticipantId id) {
-      _id = id;
-      _disabledPartitions = new HashSet<PartitionId>();
-      _tags = new HashSet<String>();
-      _currentStateMap = new HashMap<ResourceId, CurrentState>();
-      _messageMap = new HashMap<MessageId, Message>();
-      _isEnabled = true;
-    }
-
-    /**
-     * Set the participant host name
-     * @param hostName reachable host when live
-     * @return Builder
-     */
-    public Builder hostName(String hostName) {
-      _hostName = hostName;
-      return this;
-    }
-
-    /**
-     * Set the participant port
-     * @param port port number
-     * @return Builder
-     */
-    public Builder port(int port) {
-      _port = port;
-      return this;
-    }
-
-    /**
-     * Set whether or not the participant is enabled
-     * @param isEnabled true if enabled, false otherwise
-     * @return Builder
-     */
-    public Builder enabled(boolean isEnabled) {
-      _isEnabled = isEnabled;
-      return this;
-    }
-
-    /**
-     * Add a partition to disable for this participant
-     * @param partitionId the partition to disable
-     * @return Builder
-     */
-    public Builder addDisabledPartition(PartitionId partitionId) {
-      _disabledPartitions.add(partitionId);
-      return this;
-    }
-
-    /**
-     * Add an arbitrary tag for this participant
-     * @param tag the tag to add
-     * @return Builder
-     */
-    public Builder addTag(String tag) {
-      _tags.add(tag);
-      return this;
-    }
-
-    /**
-     * Add live properties to participants that are running
-     * @param runningInstance live participant properties
-     * @return Builder
-     */
-    public Builder runningInstance(RunningInstance runningInstance) {
-      _runningInstance = runningInstance;
-      return this;
-    }
-
-    /**
-     * Add a resource current state for this participant
-     * @param resourceId the resource the current state corresponds to
-     * @param currentState the current state
-     * @return Builder
-     */
-    public Builder addCurrentState(ResourceId resourceId, CurrentState currentState) {
-      _currentStateMap.put(resourceId, currentState);
-      return this;
-    }
-
-    /**
-     * Add a message for the participant
-     * @param message message to add
-     * @return Builder
-     */
-    public Builder addMessage(Message message) {
-      _messageMap.put(new MessageId(message.getId()), message);
-      return this;
-    }
-
-    /**
-     * Assemble the participant
-     * @return instantiated Participant
-     */
-    public Participant build() {
-      return new Participant(_id, _hostName, _port, _isEnabled, _disabledPartitions, _tags,
-          _runningInstance, _currentStateMap, _messageMap);
-    }
+  public ParticipantConfig getConfig() {
+    return _config;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java b/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
new file mode 100644
index 0000000..3c77a40
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
@@ -0,0 +1,194 @@
+package org.apache.helix.api;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Configuration properties of a Helix participant
+ */
+public class ParticipantConfig {
+  private final ParticipantId _id;
+  private final String _hostName;
+  private final int _port;
+  private final boolean _isEnabled;
+  private final Set<PartitionId> _disabledPartitions;
+  private final Set<String> _tags;
+
+  /**
+   * Initialize a participant configuration. Also see ParticipantConfig.Builder
+   * @param id participant id
+   * @param hostName host where participant can be reached
+   * @param port port to use to contact participant
+   * @param isEnabled true if enabled, false if disabled
+   * @param disabledPartitions set of partitions, if any to disable on this participant
+   * @param tags tags to set for the participant
+   */
+  public ParticipantConfig(ParticipantId id, String hostName, int port, boolean isEnabled,
+      Set<PartitionId> disabledPartitions, Set<String> tags) {
+    _id = id;
+    _hostName = hostName;
+    _port = port;
+    _isEnabled = isEnabled;
+    _disabledPartitions = ImmutableSet.copyOf(disabledPartitions);
+    _tags = ImmutableSet.copyOf(tags);
+  }
+
+  /**
+   * Get the host name of the participant
+   * @return host name, or null if not applicable
+   */
+  public String getHostName() {
+    return _hostName;
+  }
+
+  /**
+   * Get the port of the participant
+   * @return port number, or -1 if not applicable
+   */
+  public int getPort() {
+    return _port;
+  }
+
+  /**
+   * Get if the participant is enabled
+   * @return true if enabled or false otherwise
+   */
+  public boolean isEnabled() {
+    return _isEnabled;
+  }
+
+  /**
+   * Get disabled partition id's
+   * @return set of disabled partition id's, or empty set if none
+   */
+  public Set<PartitionId> getDisablePartitionIds() {
+    return _disabledPartitions;
+  }
+
+  /**
+   * Get tags
+   * @return set of tags
+   */
+  public Set<String> getTags() {
+    return _tags;
+  }
+
+  /**
+   * Check if participant has a tag
+   * @param tag tag to check
+   * @return true if tagged, false otherwise
+   */
+  public boolean hasTag(String tag) {
+    return _tags.contains(tag);
+  }
+
+  /**
+   * Get the participant id
+   * @return ParticipantId
+   */
+  public ParticipantId getId() {
+    return _id;
+  }
+
+  /**
+   * Assemble a participant
+   */
+  public static class Builder {
+    private final ParticipantId _id;
+    private String _hostName;
+    private int _port;
+    private boolean _isEnabled;
+    private final Set<PartitionId> _disabledPartitions;
+    private final Set<String> _tags;
+
+    /**
+     * Build a participant with a given id
+     * @param id participant id
+     */
+    public Builder(ParticipantId id) {
+      _id = id;
+      _disabledPartitions = new HashSet<PartitionId>();
+      _tags = new HashSet<String>();
+      _isEnabled = true;
+    }
+
+    /**
+     * Set the participant host name
+     * @param hostName reachable host when live
+     * @return Builder
+     */
+    public Builder hostName(String hostName) {
+      _hostName = hostName;
+      return this;
+    }
+
+    /**
+     * Set the participant port
+     * @param port port number
+     * @return Builder
+     */
+    public Builder port(int port) {
+      _port = port;
+      return this;
+    }
+
+    /**
+     * Set whether or not the participant is enabled
+     * @param isEnabled true if enabled, false otherwise
+     * @return Builder
+     */
+    public Builder enabled(boolean isEnabled) {
+      _isEnabled = isEnabled;
+      return this;
+    }
+
+    /**
+     * Add a partition to disable for this participant
+     * @param partitionId the partition to disable
+     * @return Builder
+     */
+    public Builder addDisabledPartition(PartitionId partitionId) {
+      _disabledPartitions.add(partitionId);
+      return this;
+    }
+
+    /**
+     * Add an arbitrary tag for this participant
+     * @param tag the tag to add
+     * @return Builder
+     */
+    public Builder addTag(String tag) {
+      _tags.add(tag);
+      return this;
+    }
+
+    /**
+     * Assemble the participant
+     * @return instantiated Participant
+     */
+    public ParticipantConfig build() {
+      return new ParticipantConfig(_id, _hostName, _port, _isEnabled, _disabledPartitions, _tags);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
index 4ac254d..257354d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
@@ -19,9 +19,11 @@ package org.apache.helix.api;
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.HelixConstants;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.ResourceAssignment;
@@ -40,6 +42,7 @@ public class RebalancerConfig {
   private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
   private final ResourceAssignment _resourceAssignment;
   private final int _replicaCount;
+  private final boolean _anyLiveParticipant;
   private final String _participantGroupTag;
   private final int _maxPartitionsPerParticipant;
   private final int _bucketSize;
@@ -51,11 +54,19 @@ public class RebalancerConfig {
    * @param idealState the physical ideal state
    * @param resourceAssignment last mapping of a resource
    */
-  public RebalancerConfig(IdealState idealState, ResourceAssignment resourceAssignment) {
+  public RebalancerConfig(IdealState idealState, ResourceAssignment resourceAssignment,
+      int liveParticipantCount) {
     _rebalancerMode = idealState.getRebalanceMode();
     _rebalancerRef = idealState.getRebalancerRef();
     _stateModelDefId = idealState.getStateModelDefId();
-    _replicaCount = Integer.parseInt(idealState.getReplicas());
+    String replicaCount = idealState.getReplicas();
+    if (replicaCount.equals(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString())) {
+      _replicaCount = liveParticipantCount;
+      _anyLiveParticipant = true;
+    } else {
+      _replicaCount = Integer.parseInt(idealState.getReplicas());
+      _anyLiveParticipant = false;
+    }
     _participantGroupTag = idealState.getInstanceGroupTag();
     _maxPartitionsPerParticipant = idealState.getMaxPartitionsPerInstance();
     _bucketSize = idealState.getBucketSize();
@@ -68,10 +79,14 @@ public class RebalancerConfig {
     ImmutableMap.Builder<PartitionId, Map<ParticipantId, State>> preferenceMaps =
         new ImmutableMap.Builder<PartitionId, Map<ParticipantId, State>>();
     for (PartitionId partitionId : idealState.getPartitionSet()) {
-      preferenceLists.put(partitionId,
-          ImmutableList.copyOf(idealState.getPreferenceList(partitionId)));
-      preferenceMaps.put(partitionId,
-          ImmutableMap.copyOf(idealState.getParticipantStateMap(partitionId)));
+      List<ParticipantId> preferenceList = idealState.getPreferenceList(partitionId);
+      if (preferenceList != null) {
+        preferenceLists.put(partitionId, ImmutableList.copyOf(preferenceList));
+      }
+      Map<ParticipantId, State> preferenceMap = idealState.getParticipantStateMap(partitionId);
+      if (preferenceMap != null) {
+        preferenceMaps.put(partitionId, ImmutableMap.copyOf(preferenceMap));
+      }
     }
     _preferenceLists = preferenceLists.build();
     _preferenceMaps = preferenceMaps.build();
@@ -118,7 +133,10 @@ public class RebalancerConfig {
    * @return the ordered preference list (early entries are more preferred)
    */
   public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
-    return _preferenceLists.get(partitionId);
+    if (_preferenceLists.containsKey(partitionId)) {
+      return _preferenceLists.get(partitionId);
+    }
+    return Collections.emptyList();
   }
 
   /**
@@ -127,7 +145,10 @@ public class RebalancerConfig {
    * @return a mapping of participant to state for each replica
    */
   public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
-    return _preferenceMaps.get(partitionId);
+    if (_preferenceMaps.containsKey(partitionId)) {
+      return _preferenceMaps.get(partitionId);
+    }
+    return Collections.emptyMap();
   }
 
   /**
@@ -179,10 +200,19 @@ public class RebalancerConfig {
   }
 
   /**
+   * Check if replicas can be assigned to any live participant
+   * @return true if they can, false if they cannot
+   */
+  public boolean canAssignAnyLiveParticipant() {
+    return _anyLiveParticipant;
+  }
+
+  /**
    * Assembles a RebalancerConfig
    */
   public static class Builder {
     private final IdealState _idealState;
+    private boolean _anyLiveParticipant;
     private ResourceAssignment _resourceAssignment;
 
     /**
@@ -191,6 +221,7 @@ public class RebalancerConfig {
      */
     public Builder(ResourceId resourceId) {
       _idealState = new IdealState(resourceId);
+      _anyLiveParticipant = false;
     }
 
     /**
@@ -283,11 +314,26 @@ public class RebalancerConfig {
     }
 
     /**
+     * Set whether any live participant should be used in rebalancing
+     * @param useAnyParticipant true if any live participant can be used, false otherwise
+     * @return
+     */
+    public Builder anyLiveParticipant(boolean useAnyParticipant) {
+      _anyLiveParticipant = true;
+      return this;
+    }
+
+    /**
      * Assemble a RebalancerConfig
      * @return a fully defined rebalancer configuration
      */
     public RebalancerConfig build() {
-      return new RebalancerConfig(_idealState, _resourceAssignment);
+      if (_anyLiveParticipant) {
+        return new RebalancerConfig(_idealState, _resourceAssignment, Integer.parseInt(_idealState
+            .getReplicas()));
+      } else {
+        return new RebalancerConfig(_idealState, _resourceAssignment, -1);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
index 5f22898..5c7ce56 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
@@ -62,9 +62,12 @@ public class RebalancerRef {
   /**
    * Get a rebalancer class reference
    * @param rebalancerClassName name of the class
-   * @return RebalancerRef
+   * @return RebalancerRef or null if name is null
    */
   public static RebalancerRef from(String rebalancerClassName) {
+    if (rebalancerClassName == null) {
+      return null;
+    }
     return new RebalancerRef(rebalancerClassName);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 0c8b730..8445617 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -20,7 +20,6 @@ package org.apache.helix.api;
  */
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -29,41 +28,33 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.ResourceAssignment;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
 /**
  * Represent a resource entity in helix cluster
  */
 public class Resource {
-  private final ResourceId _id;
-  private final RebalancerConfig _rebalancerConfig;
-  private final SchedulerTaskConfig _schedulerTaskConfig;
-
-  private final Map<PartitionId, Partition> _partitionMap;
-
+  private final ResourceConfig _config;
   private final ExternalView _externalView;
 
   /**
    * Construct a resource
-   * @param idealState
+   * @param id resource id
+   * @param idealState ideal state of the resource
    * @param currentStateMap map of participant-id to current state
+   * @param liveParticipantCount number of live participants in the system
    */
-  public Resource(ResourceId id, IdealState idealState, ResourceAssignment resourceAssignment) {
-    _id = id;
-    _rebalancerConfig = new RebalancerConfig(idealState, resourceAssignment);
-
+  public Resource(ResourceId id, IdealState idealState, ResourceAssignment resourceAssignment,
+      ExternalView externalView, int liveParticipantCount) {
     Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
-    Map<PartitionId, Map<String, String>> schedulerTaskConfig =
+    Map<PartitionId, Map<String, String>> schedulerTaskConfigMap =
         new HashMap<PartitionId, Map<String, String>>();
     Map<String, Integer> transitionTimeoutMap = new HashMap<String, Integer>();
     for (PartitionId partitionId : idealState.getPartitionSet()) {
       partitionMap.put(partitionId, new Partition(partitionId));
 
       // TODO refactor it
-      Map<String, String> taskConfigMap = idealState.getRecord().getMapField(partitionId.stringify());
+      Map<String, String> taskConfigMap = idealState.getInstanceStateMap(partitionId.stringify());
       if (taskConfigMap != null) {
-        schedulerTaskConfig.put(partitionId, taskConfigMap);
+        schedulerTaskConfigMap.put(partitionId, taskConfigMap);
       }
 
       // TODO refactor it
@@ -78,53 +69,38 @@ public class Resource {
         }
       }
     }
-    _partitionMap = ImmutableMap.copyOf(partitionMap);
-    _schedulerTaskConfig = new SchedulerTaskConfig(transitionTimeoutMap, schedulerTaskConfig);
+    SchedulerTaskConfig schedulerTaskConfig =
+        new SchedulerTaskConfig(transitionTimeoutMap, schedulerTaskConfigMap);
+    RebalancerConfig rebalancerConfig =
+        new RebalancerConfig(idealState, resourceAssignment, liveParticipantCount);
 
-    _externalView = null;
-  }
-
-  /**
-   * Construct a Resource
-   * @param id resource identifier
-   * @param partitionSet disjoint partitions of the resource
-   * @param externalView external view of the resource
-   * @param pendingExternalView pending external view based on unprocessed messages
-   * @param rebalancerConfig configuration properties for rebalancing this resource
-   */
-  public Resource(ResourceId id, Map<PartitionId, Partition> partitionMap,
-      ExternalView externalView,
-      RebalancerConfig rebalancerConfig, SchedulerTaskConfig schedulerTaskConfig) {
-    _id = id;
-    _partitionMap = ImmutableMap.copyOf(partitionMap);
+    _config = new ResourceConfig(id, partitionMap, schedulerTaskConfig, rebalancerConfig);
     _externalView = externalView;
-    _rebalancerConfig = rebalancerConfig;
-    _schedulerTaskConfig = schedulerTaskConfig;
   }
 
   /**
-   * Get the set of partitions of the resource
-   * @return set of partitions or empty set if none
+   * Get the partitions of the resource
+   * @return map of partition id to partition or empty map if none
    */
   public Map<PartitionId, Partition> getPartitionMap() {
-    return _partitionMap;
+    return _config.getPartitionMap();
   }
 
   /**
-   * @param partitionId
-   * @return
+   * Get a partition that the resource contains
+   * @param partitionId the partition id to look up
+   * @return Partition or null if none is present with the given id
    */
   public Partition getPartition(PartitionId partitionId) {
-    return _partitionMap.get(partitionId);
+    return _config.getPartition(partitionId);
   }
 
   /**
-   * @return
+   * Get the set of partition ids that the resource contains
+   * @return partition id set, or empty if none
    */
-  public Set<Partition> getPartitionSet() {
-    Set<Partition> partitionSet = new HashSet<Partition>();
-    partitionSet.addAll(_partitionMap.values());
-    return ImmutableSet.copyOf(partitionSet);
+  public Set<PartitionId> getPartitionSet() {
+    return _config.getPartitionSet();
   }
 
   /**
@@ -135,95 +111,35 @@ public class Resource {
     return _externalView;
   }
 
+  /**
+   * Get the resource properties configuring rebalancing
+   * @return RebalancerConfig properties
+   */
   public RebalancerConfig getRebalancerConfig() {
-    return _rebalancerConfig;
+    return _config.getRebalancerConfig();
   }
 
+  /**
+   * Get the resource id
+   * @return ResourceId
+   */
   public ResourceId getId() {
-    return _id;
+    return _config.getId();
   }
 
+  /**
+   * Get the properties configuring scheduler tasks
+   * @return SchedulerTaskConfig properties
+   */
   public SchedulerTaskConfig getSchedulerTaskConfig() {
-    return _schedulerTaskConfig;
+    return _config.getSchedulerTaskConfig();
   }
 
   /**
-   * Assembles a Resource
+   * Get the configuration of this resource
+   * @return ResourceConfig that backs this Resource
    */
-  public static class Builder {
-    private final ResourceId _id;
-    private final Map<PartitionId, Partition> _partitionMap;
-    private ExternalView _externalView;
-    private RebalancerConfig _rebalancerConfig;
-    private SchedulerTaskConfig _schedulerTaskConfig;
-
-    /**
-     * Build a Resource with an id
-     * @param id resource id
-     */
-    public Builder(ResourceId id) {
-      _id = id;
-      _partitionMap = new HashMap<PartitionId, Partition>();
-    }
-
-    /**
-     * Add a partition that the resource serves
-     * @param partition fully-qualified partition
-     * @return Builder
-     */
-    public Builder addPartition(Partition partition) {
-      _partitionMap.put(partition.getId(), partition);
-      return this;
-    }
-
-    /**
-     * Add a set of partitions
-     * @param partitions
-     * @return Builder
-     */
-    public Builder addPartitions(Set<Partition> partitions) {
-      for (Partition partition : partitions) {
-        addPartition(partition);
-      }
-      return this;
-    }
-
-    /**
-     * Set the external view of this resource
-     * @param extView currently served replica placement and state
-     * @return Builder
-     */
-    public Builder externalView(ExternalView extView) {
-      _externalView = extView;
-      return this;
-    }
-
-    /**
-     * Set the rebalancer configuration
-     * @param rebalancerConfig properties of interest for rebalancing
-     * @return Builder
-     */
-    public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
-      _rebalancerConfig = rebalancerConfig;
-      return this;
-    }
-
-    /**
-     * @param schedulerTaskConfig
-     * @return
-     */
-    public Builder schedulerTaskConfig(SchedulerTaskConfig schedulerTaskConfig) {
-      _schedulerTaskConfig = schedulerTaskConfig;
-      return this;
-    }
-
-    /**
-     * Create a Resource object
-     * @return instantiated Resource
-     */
-    public Resource build() {
-      return new Resource(_id, _partitionMap, _externalView, _rebalancerConfig,
-          _schedulerTaskConfig);
-    }
+  public ResourceConfig getConfig() {
+    return _config;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
new file mode 100644
index 0000000..5170c40
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
@@ -0,0 +1,173 @@
+package org.apache.helix.api;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Full configuration of a Helix resource. Typically used to add or modify resources on a cluster
+ */
+public class ResourceConfig {
+  private final ResourceId _id;
+  private final Map<PartitionId, Partition> _partitionMap;
+  private final RebalancerConfig _rebalancerConfig;
+  private final SchedulerTaskConfig _schedulerTaskConfig;
+
+  /**
+   * Instantiate a configuration. Consider using ResourceConfig.Builder
+   * @param id resource id
+   * @param partitionMap map of partition identifiers to partition objects
+   * @param schedulerTaskConfig configuration for scheduler tasks associated with the resource
+   * @param rebalancerConfig configuration for rebalancing the resource
+   */
+  public ResourceConfig(ResourceId id, Map<PartitionId, Partition> partitionMap,
+      SchedulerTaskConfig schedulerTaskConfig, RebalancerConfig rebalancerConfig) {
+    _id = id;
+    _partitionMap = ImmutableMap.copyOf(partitionMap);
+    _schedulerTaskConfig = schedulerTaskConfig;
+    _rebalancerConfig = rebalancerConfig;
+  }
+
+  /**
+   * Get the partitions of the resource
+   * @return map of partition id to partition or empty map if none
+   */
+  public Map<PartitionId, Partition> getPartitionMap() {
+    return _partitionMap;
+  }
+
+  /**
+   * Get a partition that the resource contains
+   * @param partitionId the partition id to look up
+   * @return Partition or null if none is present with the given id
+   */
+  public Partition getPartition(PartitionId partitionId) {
+    return _partitionMap.get(partitionId);
+  }
+
+  /**
+   * Get the set of partition ids that the resource contains
+   * @return partition id set, or empty if none
+   */
+  public Set<PartitionId> getPartitionSet() {
+    Set<PartitionId> partitionSet = new HashSet<PartitionId>();
+    partitionSet.addAll(_partitionMap.keySet());
+    return ImmutableSet.copyOf(partitionSet);
+  }
+
+  /**
+   * Get the resource properties configuring rebalancing
+   * @return RebalancerConfig properties
+   */
+  public RebalancerConfig getRebalancerConfig() {
+    return _rebalancerConfig;
+  }
+
+  /**
+   * Get the resource id
+   * @return ResourceId
+   */
+  public ResourceId getId() {
+    return _id;
+  }
+
+  /**
+   * Get the properties configuring scheduler tasks
+   * @return SchedulerTaskConfig properties
+   */
+  public SchedulerTaskConfig getSchedulerTaskConfig() {
+    return _schedulerTaskConfig;
+  }
+
+  /**
+   * Assembles a ResourceConfig
+   */
+  public static class Builder {
+    private final ResourceId _id;
+    private final Map<PartitionId, Partition> _partitionMap;
+    private RebalancerConfig _rebalancerConfig;
+    private SchedulerTaskConfig _schedulerTaskConfig;
+
+    /**
+     * Build a Resource with an id
+     * @param id resource id
+     */
+    public Builder(ResourceId id) {
+      _id = id;
+      _partitionMap = new HashMap<PartitionId, Partition>();
+    }
+
+    /**
+     * Add a partition that the resource serves
+     * @param partition fully-qualified partition
+     * @return Builder
+     */
+    public Builder addPartition(Partition partition) {
+      _partitionMap.put(partition.getId(), partition);
+      return this;
+    }
+
+    /**
+     * Add a collection of partitions
+     * @param partitions
+     * @return Builder
+     */
+    public Builder addPartitions(Collection<Partition> partitions) {
+      for (Partition partition : partitions) {
+        addPartition(partition);
+      }
+      return this;
+    }
+
+    /**
+     * Set the rebalancer configuration
+     * @param rebalancerConfig properties of interest for rebalancing
+     * @return Builder
+     */
+    public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
+      _rebalancerConfig = rebalancerConfig;
+      return this;
+    }
+
+    /**
+     * @param schedulerTaskConfig
+     * @return
+     */
+    public Builder schedulerTaskConfig(SchedulerTaskConfig schedulerTaskConfig) {
+      _schedulerTaskConfig = schedulerTaskConfig;
+      return this;
+    }
+
+    /**
+     * Create a Resource object
+     * @return instantiated Resource
+     */
+    public ResourceConfig build() {
+      return new ResourceConfig(_id, _partitionMap, _schedulerTaskConfig, _rebalancerConfig);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 6570dc4..4df2201 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -44,14 +44,7 @@ import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.PipelineRegistry;
-import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.CompatibilityCheckStage;
-import org.apache.helix.controller.stages.CurrentStateComputationStage;
-import org.apache.helix.controller.stages.ExternalViewComputeStage;
-import org.apache.helix.controller.stages.MessageGenerationPhase;
-import org.apache.helix.controller.stages.MessageSelectionStage;
-import org.apache.helix.controller.stages.MessageThrottleStage;
 import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.NewCompatibilityCheckStage;
 import org.apache.helix.controller.stages.NewCurrentStateComputationStage;
@@ -62,10 +55,6 @@ import org.apache.helix.controller.stages.NewMessageThrottleStage;
 import org.apache.helix.controller.stages.NewReadClusterDataStage;
 import org.apache.helix.controller.stages.NewResourceComputationStage;
 import org.apache.helix.controller.stages.NewTaskAssignmentStage;
-import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.controller.stages.RebalanceIdealStateStage;
-import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.controller.stages.TaskAssignmentStage;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HealthStat;
@@ -205,8 +194,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       registry.register("idealStateChange", dataRefresh, rebalancePipeline);
       registry.register("currentStateChange", dataRefresh, rebalancePipeline, externalViewPipeline);
       registry.register("configChange", dataRefresh, rebalancePipeline);
-      registry.register("liveInstanceChange", dataRefresh, rebalancePipeline,
-          externalViewPipeline);
+      registry.register("liveInstanceChange", dataRefresh, rebalancePipeline, externalViewPipeline);
 
       registry.register("messageChange", dataRefresh, rebalancePipeline);
       registry.register("externalView", dataRefresh);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
index 8821082..a28166c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
@@ -29,9 +29,9 @@ import java.util.Set;
 
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Id;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
@@ -46,6 +46,7 @@ import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Function;
 import com.google.common.base.Functions;
 import com.google.common.collect.Lists;
 
@@ -69,12 +70,17 @@ public class NewAutoRebalancer implements NewRebalancer {
   public ResourceAssignment computeResourceMapping(Resource resource, Cluster cluster,
       StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
     // Compute a preference list based on the current ideal state
-    List<Partition> partitions = new ArrayList<Partition>(resource.getPartitionSet());
+    List<PartitionId> partitions = new ArrayList<PartitionId>(resource.getPartitionSet());
     List<String> partitionNames = Lists.transform(partitions, Functions.toStringFunction());
     RebalancerConfig config = resource.getRebalancerConfig();
     Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
     Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
-    int replicas = config.getReplicaCount();
+    int replicas = -1;
+    if (config.canAssignAnyLiveParticipant()) {
+      replicas = liveParticipants.size();
+    } else {
+      replicas = config.getReplicaCount();
+    }
 
     LinkedHashMap<String, Integer> stateCountMap =
         ConstraintBasedAssignment.stateCount(stateModelDef, liveParticipants.size(), replicas);
@@ -129,17 +135,25 @@ public class NewAutoRebalancer implements NewRebalancer {
       LOG.debug("Processing resource:" + resource.getId());
     }
     ResourceAssignment partitionMapping = new ResourceAssignment(resource.getId());
-    for (Partition partition : partitions) {
+    for (PartitionId partition : partitions) {
       Set<ParticipantId> disabledParticipantsForPartition =
-          NewConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition.getId());
+          NewConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition);
       List<ParticipantId> preferenceList =
-          NewConstraintBasedAssignment.getPreferenceList(cluster, partition.getId(), config);
+          Lists.transform(newMapping.getListField(partition.stringify()),
+              new Function<String, ParticipantId>() {
+                @Override
+                public ParticipantId apply(String participantName) {
+                  return Id.participant(participantName);
+                }
+              });
+      preferenceList =
+          NewConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
       Map<ParticipantId, State> bestStateForPartition =
           NewConstraintBasedAssignment.computeAutoBestStateForPartition(liveParticipants,
               stateModelDef, preferenceList,
-              currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId()),
+              currentStateOutput.getCurrentStateMap(resource.getId(), partition),
               disabledParticipantsForPartition);
-      partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
   }
@@ -149,23 +163,23 @@ public class NewAutoRebalancer implements NewRebalancer {
     Map<PartitionId, Map<ParticipantId, State>> map =
         new HashMap<PartitionId, Map<ParticipantId, State>>();
 
-    for (Partition partition : resource.getPartitionSet()) {
+    for (PartitionId partition : resource.getPartitionSet()) {
       Map<ParticipantId, State> curStateMap =
-          currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId());
-      map.put(partition.getId(), new HashMap<ParticipantId, State>());
+          currentStateOutput.getCurrentStateMap(resource.getId(), partition);
+      map.put(partition, new HashMap<ParticipantId, State>());
       for (ParticipantId node : curStateMap.keySet()) {
         State state = curStateMap.get(node);
         if (stateCountMap.containsKey(state)) {
-          map.get(partition.getId()).put(node, state);
+          map.get(partition).put(node, state);
         }
       }
 
       Map<ParticipantId, State> pendingStateMap =
-          currentStateOutput.getPendingStateMap(resource.getId(), partition.getId());
+          currentStateOutput.getPendingStateMap(resource.getId(), partition);
       for (ParticipantId node : pendingStateMap.keySet()) {
         State state = pendingStateMap.get(node);
         if (stateCountMap.containsKey(state)) {
-          map.get(partition.getId()).put(node, state);
+          map.get(partition).put(node, state);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
index 8d000f5..0dd346b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
@@ -27,7 +27,7 @@ import org.apache.helix.HelixDefinedState;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.State;
@@ -58,17 +58,16 @@ public class NewCustomRebalancer implements NewRebalancer {
     }
     ResourceAssignment partitionMapping = new ResourceAssignment(resource.getId());
     RebalancerConfig config = resource.getRebalancerConfig();
-    for (Partition partition : resource.getPartitionSet()) {
+    for (PartitionId partition : resource.getPartitionSet()) {
       Map<ParticipantId, State> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId());
+          currentStateOutput.getCurrentStateMap(resource.getId(), partition);
       Set<ParticipantId> disabledInstancesForPartition =
           NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
-              partition.getId());
+              partition);
       Map<ParticipantId, State> bestStateForPartition =
           computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap(), stateModelDef,
-              config.getPreferenceMap(partition.getId()), currentStateMap,
-              disabledInstancesForPartition);
-      partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
+              config.getPreferenceMap(partition), currentStateMap, disabledInstancesForPartition);
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
index 27bb513..0b1611c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
@@ -25,7 +25,7 @@ import java.util.Set;
 
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.State;
@@ -56,19 +56,20 @@ public class NewSemiAutoRebalancer implements NewRebalancer {
     }
     ResourceAssignment partitionMapping = new ResourceAssignment(resource.getId());
     RebalancerConfig config = resource.getRebalancerConfig();
-    for (Partition partition : resource.getPartitionSet()) {
+    for (PartitionId partition : resource.getPartitionSet()) {
       Map<ParticipantId, State> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId());
+          currentStateOutput.getCurrentStateMap(resource.getId(), partition);
       Set<ParticipantId> disabledInstancesForPartition =
           NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
-              partition.getId());
+              partition);
       List<ParticipantId> preferenceList =
-          NewConstraintBasedAssignment.getPreferenceList(cluster, partition.getId(), config);
+          NewConstraintBasedAssignment.getPreferenceList(cluster, partition,
+              config.getPreferenceList(partition));
       Map<ParticipantId, State> bestStateForPartition =
           NewConstraintBasedAssignment.computeAutoBestStateForPartition(
               cluster.getLiveParticipantMap(), stateModelDef, preferenceList, currentStateMap,
               disabledInstancesForPartition);
-      partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
+      partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
index 224853b..07856ca 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
@@ -34,7 +34,6 @@ import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.State;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -76,9 +75,7 @@ public class NewConstraintBasedAssignment {
    * @return list with most preferred participants first
    */
   public static List<ParticipantId> getPreferenceList(Cluster cluster, PartitionId partitionId,
-      RebalancerConfig config) {
-    List<ParticipantId> prefList = config.getPreferenceList(partitionId);
-
+      List<ParticipantId> prefList) {
     if (prefList != null && prefList.size() == 1
         && StateModelToken.ANY_LIVEINSTANCE.toString().equals(prefList.get(0).stringify())) {
       prefList = new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
index bc14297..52a5af2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -23,11 +23,11 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Id;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -58,7 +58,8 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
 
     NewCurrentStateOutput currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
     Cluster cluster = event.getAttribute("ClusterDataCache");
 
     if (currentStateOutput == null || resourceMap == null || cluster == null) {
@@ -106,7 +107,7 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
 
   // TODO check this
   private NewBestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
-      Map<ResourceId, Resource> resourceMap, NewCurrentStateOutput currentStateOutput) {
+      Map<ResourceId, ResourceConfig> resourceMap, NewCurrentStateOutput currentStateOutput) {
     NewBestPossibleStateOutput output = new NewBestPossibleStateOutput();
     Map<StateModelDefId, StateModelDefinition> stateModelDefs =
         event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
index d5ee850..8c5005d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
@@ -2,6 +2,7 @@ package org.apache.helix.controller.stages;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.model.ResourceAssignment;
@@ -31,4 +32,12 @@ public class NewBestPossibleStateOutput {
   public ResourceAssignment getResourceAssignment(ResourceId resourceId) {
     return _resourceAssignmentMap.get(resourceId);
   }
+
+  /**
+   * Get all of the resources currently assigned
+   * @return set of assigned resource ids
+   */
+  public Set<ResourceId> getAssignedResources() {
+    return _resourceAssignmentMap.keySet();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
index 0f8c33e..e0cd22f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
@@ -28,7 +28,7 @@ import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.Partition;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.SessionId;
 import org.apache.helix.api.State;
@@ -48,7 +48,8 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
     Cluster cluster = event.getAttribute("ClusterDataCache");
-    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
 
     if (cluster == null || resourceMap == null) {
       throw new StageException("Missing attributes in event:" + event
@@ -72,7 +73,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
         }
 
         ResourceId resourceId = message.getResourceId();
-        Resource resource = resourceMap.get(resourceId);
+        ResourceConfig resource = resourceMap.get(resourceId);
         if (resource == null) {
           continue;
         }
@@ -92,8 +93,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
             for (PartitionId partitionId : partitionNames) {
               Partition partition = resource.getPartition(partitionId);
               if (partition != null) {
-                currentStateOutput.setPendingState(resourceId, partitionId,
-                    participantId,
+                currentStateOutput.setPendingState(resourceId, partitionId, participantId,
                     message.getToState());
               } else {
                 // log
@@ -113,7 +113,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
 
         ResourceId resourceId = curState.getResourceId();
         StateModelDefId stateModelDefId = curState.getStateModelDefId();
-        Resource resource = resourceMap.get(resourceId);
+        ResourceConfig resource = resourceMap.get(resourceId);
         if (resource == null) {
           continue;
         }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
index 1a99346..68169ae 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
@@ -29,7 +29,6 @@ import java.util.TreeMap;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
-import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
@@ -40,7 +39,7 @@ import org.apache.helix.api.Id;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
-import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.State;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -50,7 +49,6 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
 import org.apache.helix.model.StatusUpdate;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
@@ -64,7 +62,8 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
     log.info("START ExternalViewComputeStage.process()");
 
     HelixManager manager = event.getAttribute("helixmanager");
-    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
     Cluster cluster = event.getAttribute("ClusterDataCache");
 
     if (manager == null || resourceMap == null || cluster == null) {
@@ -89,7 +88,7 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
       // view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
       // if resource ideal state has bucket size, set it
       // otherwise resource has been dropped, use bucket size from current state instead
-      Resource resource = resourceMap.get(resourceId);
+      ResourceConfig resource = resourceMap.get(resourceId);
       RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
       if (rebalancerConfig.getBucketSize() > 0) {
         view.setBucketSize(rebalancerConfig.getBucketSize());
@@ -139,8 +138,7 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
         // message, and then remove the partitions from the ideal state
         if (rebalancerConfig != null
             && rebalancerConfig.getStateModelDefId().stringify()
-                .equalsIgnoreCase(
-                DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+                .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
           // TODO fix it
           // updateScheduledTaskStatus(view, manager, idealState);
         }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
index 0a72dc0..0ae72cf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
@@ -30,10 +30,10 @@ import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Id;
 import org.apache.helix.api.MessageId;
 import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.SessionId;
 import org.apache.helix.api.State;
@@ -42,12 +42,10 @@ import org.apache.helix.api.StateModelFactoryId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
@@ -63,7 +61,8 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
     Cluster cluster = event.getAttribute("ClusterDataCache");
     Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
         event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
-    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
     NewCurrentStateOutput currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
     NewBestPossibleStateOutput bestPossibleStateOutput =
@@ -77,7 +76,7 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
     NewMessageOutput output = new NewMessageOutput();
 
     for (ResourceId resourceId : resourceMap.keySet()) {
-      Resource resource = resourceMap.get(resourceId);
+      ResourceConfig resource = resourceMap.get(resourceId);
       int bucketSize = resource.getRebalancerConfig().getBucketSize();
 
       StateModelDefinition stateModelDef =
@@ -145,10 +144,13 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
             if (rebalancerConfig != null
                 && rebalancerConfig.getStateModelDefId().stringify()
                     .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-              if (resource.getPartitionSet().size() > 0) {
-                // TODO refactor it
-                message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
-                    resource.getSchedulerTaskConfig().getTaskConfig(partitionId));
+              if (resource.getPartitionMap().size() > 0) {
+                // TODO refactor it -- we need a way to read in scheduler tasks a priori
+                Resource activeResource = cluster.getResource(resourceId);
+                if (activeResource != null) {
+                  message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
+                      activeResource.getSchedulerTaskConfig().getTaskConfig(partitionId));
+                }
               }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
index 9745c64..ed06c5f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
@@ -28,10 +28,12 @@ import java.util.TreeMap;
 
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantConfig;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.RebalancerConfig;
 import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.api.State;
 import org.apache.helix.api.StateModelDefId;
@@ -91,11 +93,11 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
     Cluster cluster = event.getAttribute("ClusterDataCache");
     Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
         event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
-    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
     NewCurrentStateOutput currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    NewMessageOutput messageGenOutput =
-        event.getAttribute(AttributeName.MESSAGES_ALL.toString());
+    NewMessageOutput messageGenOutput = event.getAttribute(AttributeName.MESSAGES_ALL.toString());
     if (cluster == null || resourceMap == null || currentStateOutput == null
         || messageGenOutput == null) {
       throw new StageException("Missing attributes in event:" + event
@@ -105,7 +107,7 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
     NewMessageOutput output = new NewMessageOutput();
 
     for (ResourceId resourceId : resourceMap.keySet()) {
-      Resource resource = resourceMap.get(resourceId);
+      ResourceConfig resource = resourceMap.get(resourceId);
       StateModelDefinition stateModelDef =
           stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
 
@@ -119,8 +121,8 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
         List<Message> messages = messageGenOutput.getMessages(resourceId, partitionId);
         List<Message> selectedMessages =
             selectMessages(cluster.getLiveParticipantMap(),
-            currentStateOutput.getCurrentStateMap(resourceId, partitionId),
-            currentStateOutput.getPendingStateMap(resourceId, partitionId), messages,
+                currentStateOutput.getCurrentStateMap(resourceId, partitionId),
+                currentStateOutput.getPendingStateMap(resourceId, partitionId), messages,
                 stateConstraints, stateTransitionPriorities, stateModelDef.getInitialState());
         output.setMessages(resourceId, partitionId, selectedMessages);
       }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
index 5bea5b4..cfbd45c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
@@ -30,17 +30,16 @@ import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.ClusterConstraints;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
 import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
-import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ClusterConstraints.ConstraintValue;
+import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.Message;
 import org.apache.log4j.Logger;
 
 public class NewMessageThrottleStage extends AbstractBaseStage {
@@ -120,7 +119,8 @@ public class NewMessageThrottleStage extends AbstractBaseStage {
     Cluster cluster = event.getAttribute("ClusterDataCache");
     NewMessageOutput msgSelectionOutput =
         event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
-    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
 
     if (cluster == null || resourceMap == null || msgSelectionOutput == null) {
       throw new StageException("Missing attributes in event: " + event
@@ -145,7 +145,7 @@ public class NewMessageThrottleStage extends AbstractBaseStage {
     // go through all new messages, throttle if necessary
     // assume messages should be sorted by state transition priority in messageSelection stage
     for (ResourceId resourceId : resourceMap.keySet()) {
-      Resource resource = resourceMap.get(resourceId);
+      ResourceConfig resource = resourceMap.get(resourceId);
       // TODO fix it
       for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
         List<Message> messages = msgSelectionOutput.getMessages(resourceId, partitionId);