You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:25 UTC

[17/53] [abbrv] git commit: [HELIX-238] Logical model accessor implementation

[HELIX-238] Logical model accessor implementation


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/bc657373
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/bc657373
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/bc657373

Branch: refs/heads/master
Commit: bc6573734c5a2db439cc10d6755ff4e629ff0375
Parents: 917af3e
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Sep 24 14:04:36 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Nov 6 13:17:34 2013 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/helix/PropertyKey.java |  21 +++
 .../org/apache/helix/PropertyPathConfig.java    |   6 +
 .../java/org/apache/helix/PropertyType.java     |   1 +
 .../org/apache/helix/api/ClusterAccessor.java   |  92 +++--------
 .../apache/helix/api/ControllerAccessor.java    |  17 +-
 .../java/org/apache/helix/api/Resource.java     |  10 ++
 .../org/apache/helix/api/ResourceAccessor.java  | 155 ++++++++++++++++++-
 .../helix/api/StateModelDefinitionAccessor.java |   3 -
 .../controller/GenericHelixController.java      |   2 +
 .../rebalancer/context/RebalancerContext.java   |   2 +-
 .../stages/NewBestPossibleStateCalcStage.java   |   2 +-
 .../stages/NewBestPossibleStateOutput.java      |   5 +-
 .../stages/PersistAssignmentStage.java          |  45 ++++++
 .../controller/stages/ResourceCurrentState.java |   2 +-
 .../java/org/apache/helix/model/IdealState.java |   4 +-
 .../apache/helix/model/ResourceAssignment.java  |  17 +-
 .../helix/tools/ClusterStateVerifier.java       |   2 +-
 .../src/test/java/org/apache/helix/Mocks.java   |   2 +-
 .../java/org/apache/helix/TestPerfCounters.java |   1 +
 .../stages/TestMessageThrottleStage.java        |   2 -
 .../stages/TestParseInfoFromAlert.java          |  16 +-
 .../stages/TestRebalancePipeline.java           |   2 -
 .../org/apache/helix/model/TestConstraint.java  |   4 +-
 .../apache/helix/examples/NewModelExample.java  |  19 ++-
 24 files changed, 316 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 1733d39..1a6d11d 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -38,6 +38,7 @@ import static org.apache.helix.PropertyType.MESSAGES;
 import static org.apache.helix.PropertyType.MESSAGES_CONTROLLER;
 import static org.apache.helix.PropertyType.PAUSE;
 import static org.apache.helix.PropertyType.PERSISTENTSTATS;
+import static org.apache.helix.PropertyType.RESOURCEASSIGNMENTS;
 import static org.apache.helix.PropertyType.STATEMODELDEFS;
 import static org.apache.helix.PropertyType.STATUSUPDATES;
 import static org.apache.helix.PropertyType.STATUSUPDATES_CONTROLLER;
@@ -62,6 +63,7 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.PartitionConfiguration;
 import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.PersistentStats;
+import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfiguration;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.StatusUpdate;
@@ -173,6 +175,25 @@ public class PropertyKey {
     }
 
     /**
+     * Get a property key associated with all {@link ResourceAssignment}s on the cluster
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey resourceAssignments() {
+      return new PropertyKey(RESOURCEASSIGNMENTS, ResourceAssignment.class, _clusterName);
+    }
+
+    /**
+     * Get a property key associated with {@link ResourceAssignment} representing the most recent
+     * assignment
+     * @param resourceName name of the resource
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey resourceAssignment(String resourceName) {
+      return new PropertyKey(RESOURCEASSIGNMENTS, ResourceAssignment.class, _clusterName,
+          resourceName);
+    }
+
+    /**
      * Get a property key associated with {@link StateModelDefinition}
      * @return {@link PropertyKey}
      */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java b/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
index 7fa4404..60a92f4 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
@@ -30,6 +30,7 @@ import static org.apache.helix.PropertyType.IDEALSTATES;
 import static org.apache.helix.PropertyType.LIVEINSTANCES;
 import static org.apache.helix.PropertyType.MESSAGES;
 import static org.apache.helix.PropertyType.PAUSE;
+import static org.apache.helix.PropertyType.RESOURCEASSIGNMENTS;
 import static org.apache.helix.PropertyType.STATEMODELDEFS;
 import static org.apache.helix.PropertyType.STATUSUPDATES;
 
@@ -50,6 +51,7 @@ import org.apache.helix.model.LeaderHistory;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.StatusUpdate;
 import org.apache.log4j.Logger;
@@ -78,6 +80,7 @@ public class PropertyPathConfig {
     typeToClassMapping.put(ALERTS, Alerts.class);
     typeToClassMapping.put(ALERT_STATUS, AlertStatus.class);
     typeToClassMapping.put(PAUSE, PauseSignal.class);
+    typeToClassMapping.put(RESOURCEASSIGNMENTS, ResourceAssignment.class);
 
     // @formatter:off
     addEntry(PropertyType.CLUSTER, 1, "/{clusterName}");
@@ -92,6 +95,9 @@ public class PropertyPathConfig {
     addEntry(PropertyType.INSTANCES, 2, "/{clusterName}/INSTANCES/{instanceName}");
     addEntry(PropertyType.IDEALSTATES, 1, "/{clusterName}/IDEALSTATES");
     addEntry(PropertyType.IDEALSTATES, 2, "/{clusterName}/IDEALSTATES/{resourceName}");
+    addEntry(PropertyType.RESOURCEASSIGNMENTS, 1, "/{clusterName}/RESOURCEASSIGNMENTS");
+    addEntry(PropertyType.RESOURCEASSIGNMENTS, 2,
+        "/{clusterName}/RESOURCEASSIGNMENTS/{resourceName}");
     addEntry(PropertyType.EXTERNALVIEW, 1, "/{clusterName}/EXTERNALVIEW");
     addEntry(PropertyType.EXTERNALVIEW, 2, "/{clusterName}/EXTERNALVIEW/{resourceName}");
     addEntry(PropertyType.STATEMODELDEFS, 1, "/{clusterName}/STATEMODELDEFS");

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/PropertyType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyType.java b/helix-core/src/main/java/org/apache/helix/PropertyType.java
index 446f1a2..680dc06 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyType.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyType.java
@@ -41,6 +41,7 @@ public enum PropertyType {
   LIVEINSTANCES(Type.CLUSTER, false, false, false, true, true),
   INSTANCES(Type.CLUSTER, true, false),
   IDEALSTATES(Type.CLUSTER, true, false, false, false, true),
+  RESOURCEASSIGNMENTS(Type.CLUSTER, true, false),
   EXTERNALVIEW(Type.CLUSTER, true, false),
   STATEMODELDEFS(Type.CLUSTER, true, false, false, false, true),
   CONTROLLER(Type.CLUSTER, true, false),

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
index fd608e3..a87ce74 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
@@ -25,26 +25,22 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.helix.HelixConstants.StateModelToken;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
 import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
 import org.apache.helix.model.ClusterConfiguration;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfiguration;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -127,7 +123,6 @@ public class ClusterAccessor {
    * @return cluster
    */
   public Cluster readCluster() {
-    // TODO many of these should live in resource, participant, etc accessors
     /**
      * map of instance-id to instance-config
      */
@@ -190,40 +185,22 @@ public class ClusterAccessor {
     Map<String, ResourceConfiguration> resourceConfigMap =
         _accessor.getChildValuesMap(_keyBuilder.resourceConfigs());
 
+    /**
+     * Map of resource id to resource assignment
+     */
+    Map<String, ResourceAssignment> resourceAssignmentMap =
+        _accessor.getChildValuesMap(_keyBuilder.resourceAssignments());
+
+    // read all the resources
     Map<ResourceId, Resource> resourceMap = new HashMap<ResourceId, Resource>();
     for (String resourceName : idealStateMap.keySet()) {
-      IdealState idealState = idealStateMap.get(resourceName);
-      // TODO pass resource assignment
       ResourceId resourceId = ResourceId.from(resourceName);
-      UserConfig userConfig;
-      if (resourceConfigMap != null && resourceConfigMap.containsKey(resourceName)) {
-        userConfig = UserConfig.from(resourceConfigMap.get(resourceName));
-      } else {
-        userConfig = new UserConfig(Scope.resource(resourceId));
-      }
-      int bucketSize = 0;
-      boolean batchMessageMode = false;
-      RebalancerContext rebalancerContext;
-      if (idealState != null) {
-        rebalancerContext = PartitionedRebalancerContext.from(idealState);
-        bucketSize = idealState.getBucketSize();
-        batchMessageMode = idealState.getBatchMessageMode();
-      } else {
-        ResourceConfiguration resourceConfiguration = resourceConfigMap.get(resourceName);
-        if (resourceConfiguration != null) {
-          bucketSize = resourceConfiguration.getBucketSize();
-          batchMessageMode = resourceConfiguration.getBatchMessageMode();
-          RebalancerConfig rebalancerConfig = new RebalancerConfig(resourceConfiguration);
-          rebalancerContext = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
-        } else {
-          rebalancerContext = new PartitionedRebalancerContext(RebalanceMode.NONE);
-        }
-      }
-      resourceMap.put(resourceId,
-          new Resource(resourceId, idealState, null, externalViewMap.get(resourceName),
-              rebalancerContext, userConfig, bucketSize, batchMessageMode));
+      resourceMap.put(resourceId, ResourceAccessor.createResource(resourceId,
+          resourceConfigMap.get(resourceName), idealStateMap.get(resourceName),
+          externalViewMap.get(resourceName), resourceAssignmentMap.get(resourceName)));
     }
 
+    // read all the participants
     Map<ParticipantId, Participant> participantMap = new HashMap<ParticipantId, Participant>();
     for (String participantName : instanceConfigMap.keySet()) {
       InstanceConfig instanceConfig = instanceConfigMap.get(participantName);
@@ -238,6 +215,7 @@ public class ClusterAccessor {
           currentStateMap.get(participantName)));
     }
 
+    // read the controllers
     Map<ControllerId, Controller> controllerMap = new HashMap<ControllerId, Controller>();
     ControllerId leaderId = null;
     if (leader != null) {
@@ -245,6 +223,7 @@ public class ClusterAccessor {
       controllerMap.put(leaderId, new Controller(leaderId, leader, true));
     }
 
+    // read the constraints
     Map<ConstraintType, ClusterConstraints> clusterConstraintMap =
         new HashMap<ConstraintType, ClusterConstraints>();
     for (String constraintType : constraintMap.keySet()) {
@@ -252,6 +231,7 @@ public class ClusterAccessor {
           constraintMap.get(constraintType));
     }
 
+    // read the pause status
     PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
     boolean isPaused = pauseSignal != null;
 
@@ -263,10 +243,13 @@ public class ClusterAccessor {
       userConfig = new UserConfig(Scope.cluster(_clusterId));
     }
 
+    // read the state model definitions
     StateModelDefinitionAccessor stateModelDefAccessor =
         new StateModelDefinitionAccessor(_accessor);
     Map<StateModelDefId, StateModelDefinition> stateModelMap =
         stateModelDefAccessor.readStateModelDefinitions();
+
+    // create the cluster snapshot object
     return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
         clusterConstraintMap, stateModelMap, userConfig, isPaused);
   }
@@ -290,7 +273,6 @@ public class ClusterAccessor {
    * @param resource
    */
   public void addResourceToCluster(ResourceConfig resource) {
-    // TODO: this belongs in ResourceAccessor
     RebalancerContext context =
         resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
     StateModelDefId stateModelDefId = context.getStateModelDefId();
@@ -317,40 +299,10 @@ public class ClusterAccessor {
 
     // Create an IdealState from a RebalancerConfig (if the resource is partitioned)
     RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
-    PartitionedRebalancerContext partitionedContext =
-        rebalancerConfig.getRebalancerContext(PartitionedRebalancerContext.class);
-    if (context != null) {
-      IdealState idealState = new IdealState(resourceId);
-      idealState.setRebalanceMode(partitionedContext.getRebalanceMode());
-      idealState.setRebalancerRef(partitionedContext.getRebalancerRef());
-      String replicas = null;
-      if (partitionedContext.anyLiveParticipant()) {
-        replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
-      } else {
-        replicas = Integer.toString(partitionedContext.getReplicaCount());
-      }
-      idealState.setReplicas(replicas);
-      idealState.setNumPartitions(partitionedContext.getPartitionSet().size());
-      idealState.setInstanceGroupTag(partitionedContext.getParticipantGroupTag());
-      idealState.setMaxPartitionsPerInstance(partitionedContext.getMaxPartitionsPerParticipant());
-      idealState.setStateModelDefId(partitionedContext.getStateModelDefId());
-      idealState.setStateModelFactoryId(partitionedContext.getStateModelFactoryId());
-      idealState.setBucketSize(resource.getBucketSize());
-      idealState.setBatchMessageMode(resource.getBatchMessageMode());
-      if (partitionedContext.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
-        SemiAutoRebalancerContext semiAutoContext =
-            rebalancerConfig.getRebalancerContext(SemiAutoRebalancerContext.class);
-        for (PartitionId partitionId : semiAutoContext.getPartitionSet()) {
-          idealState.setPreferenceList(partitionId, semiAutoContext.getPreferenceList(partitionId));
-        }
-      } else if (partitionedContext.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
-        CustomRebalancerContext customContext =
-            rebalancerConfig.getRebalancerContext(CustomRebalancerContext.class);
-        for (PartitionId partitionId : customContext.getPartitionSet()) {
-          idealState.setParticipantStateMap(partitionId,
-              customContext.getPreferenceMap(partitionId));
-        }
-      }
+    IdealState idealState =
+        ResourceAccessor.rebalancerConfigToIdealState(rebalancerConfig, resource.getBucketSize(),
+            resource.getBatchMessageMode());
+    if (idealState != null) {
       _accessor.createProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java
index 153bab6..b835a4c 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java
@@ -20,19 +20,28 @@ package org.apache.helix.api;
  */
 
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.LiveInstance;
 
 public class ControllerAccessor {
   private final HelixDataAccessor _accessor;
+  private final PropertyKey.Builder _keyBuilder;
 
   public ControllerAccessor(HelixDataAccessor accessor) {
     _accessor = accessor;
+    _keyBuilder = accessor.keyBuilder();
   }
 
   /**
-   * create leader
-   * @param controllerId
+   * Read the leader controller if it is live
+   * @return Controller snapshot, or null
    */
-  public void connectLeader(ControllerId controllerId) {
-    // TODO impl this
+  public Controller readLeader() {
+    LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+    if (leader != null) {
+      ControllerId leaderId = ControllerId.from(leader.getId());
+      return new Controller(leaderId, leader, true);
+    }
+    return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index b8a6daf..7816888 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -38,6 +38,7 @@ import org.apache.helix.model.ResourceAssignment;
 public class Resource {
   private final ResourceConfig _config;
   private final ExternalView _externalView;
+  private final ResourceAssignment _resourceAssignment;
 
   /**
    * Construct a resource
@@ -73,6 +74,7 @@ public class Resource {
         new ResourceConfig(id, ResourceType.DATA, schedulerTaskConfig, rebalancerConfig,
             userConfig, bucketSize, batchMessageMode);
     _externalView = externalView;
+    _resourceAssignment = resourceAssignment;
   }
 
   /**
@@ -151,6 +153,14 @@ public class Resource {
   }
 
   /**
+   * Get the current resource assignment
+   * @return ResourceAssignment, or null if no current assignment
+   */
+  public ResourceAssignment getResourceAssignment() {
+    return _resourceAssignment;
+  }
+
+  /**
    * Get the resource properties configuring rebalancing
    * @return RebalancerConfig properties
    */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
index fa36247..bdc44c7 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
@@ -19,38 +19,90 @@ package org.apache.helix.api;
  * under the License.
  */
 
+import org.apache.helix.HelixConstants.StateModelToken;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfiguration;
 
 public class ResourceAccessor {
-  private final ClusterId _clusterId;
   private final HelixDataAccessor _accessor;
   private final PropertyKey.Builder _keyBuilder;
 
-  public ResourceAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
-    _clusterId = clusterId;
+  public ResourceAccessor(HelixDataAccessor accessor) {
     _accessor = accessor;
     _keyBuilder = accessor.keyBuilder();
   }
 
   /**
+   * Read a single snapshot of a resource
+   * @param resourceId the resource id to read
+   * @return Resource
+   */
+  public Resource readResource(ResourceId resourceId) {
+    ResourceConfiguration config =
+        _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+    IdealState idealState = _accessor.getProperty(_keyBuilder.idealState(resourceId.stringify()));
+    ExternalView externalView =
+        _accessor.getProperty(_keyBuilder.externalView(resourceId.stringify()));
+    ResourceAssignment resourceAssignment =
+        _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
+    return createResource(resourceId, config, idealState, externalView, resourceAssignment);
+  }
+
+  /**
    * save resource assignment
    * @param resourceId
    * @param resourceAssignment
    */
-  public void setRresourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
+  public void setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
+    _accessor.setProperty(_keyBuilder.resourceAssignment(resourceId.stringify()),
+        resourceAssignment);
+  }
+
+  /**
+   * save resource assignment
+   * @param resourceId
+   * @return resource assignment or null
+   */
+  public void getResourceAssignment(ResourceId resourceId) {
+    _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
+  }
+
+  /**
+   * Set a resource configuration, which may include user-defined configuration, as well as
+   * rebalancer configuration
+   * @param resourceId
+   * @param configuration
+   */
+  public void setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) {
+    _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+    // also set an ideal state if the resource supports it
+    RebalancerConfig rebalancerConfig = new RebalancerConfig(configuration);
+    IdealState idealState =
+        rebalancerConfigToIdealState(rebalancerConfig, configuration.getBucketSize(),
+            configuration.getBatchMessageMode());
+    if (idealState != null) {
+      _accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
+    }
   }
 
   /**
-   * set ideal-state
+   * Get a resource configuration, which may include user-defined configuration, as well as
+   * rebalancer configuration
    * @param resourceId
-   * @param idealState
+   * @return configuration
    */
-  public void setIdealState(ResourceId resourceId, IdealState idealState) {
-    _accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
+  public void getConfiguration(ResourceId resourceId) {
+    _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
   }
 
   /**
@@ -70,4 +122,91 @@ public class ResourceAccessor {
     _accessor.removeProperty(_keyBuilder.externalView(resourceId.stringify()));
   }
 
+  /**
+   * Get an ideal state from a rebalancer config if the resource is partitioned
+   * @param config RebalancerConfig instance
+   * @param bucketSize bucket size to use
+   * @param batchMessageMode true if batch messaging allowed, false otherwise
+   * @return IdealState, or null
+   */
+  static IdealState rebalancerConfigToIdealState(RebalancerConfig config, int bucketSize,
+      boolean batchMessageMode) {
+    PartitionedRebalancerContext partitionedContext =
+        config.getRebalancerContext(PartitionedRebalancerContext.class);
+    if (partitionedContext != null) {
+      IdealState idealState = new IdealState(partitionedContext.getResourceId());
+      idealState.setRebalanceMode(partitionedContext.getRebalanceMode());
+      idealState.setRebalancerRef(partitionedContext.getRebalancerRef());
+      String replicas = null;
+      if (partitionedContext.anyLiveParticipant()) {
+        replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
+      } else {
+        replicas = Integer.toString(partitionedContext.getReplicaCount());
+      }
+      idealState.setReplicas(replicas);
+      idealState.setNumPartitions(partitionedContext.getPartitionSet().size());
+      idealState.setInstanceGroupTag(partitionedContext.getParticipantGroupTag());
+      idealState.setMaxPartitionsPerInstance(partitionedContext.getMaxPartitionsPerParticipant());
+      idealState.setStateModelDefId(partitionedContext.getStateModelDefId());
+      idealState.setStateModelFactoryId(partitionedContext.getStateModelFactoryId());
+      idealState.setBucketSize(bucketSize);
+      idealState.setBatchMessageMode(batchMessageMode);
+      if (partitionedContext.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+        SemiAutoRebalancerContext semiAutoContext =
+            config.getRebalancerContext(SemiAutoRebalancerContext.class);
+        for (PartitionId partitionId : semiAutoContext.getPartitionSet()) {
+          idealState.setPreferenceList(partitionId, semiAutoContext.getPreferenceList(partitionId));
+        }
+      } else if (partitionedContext.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+        CustomRebalancerContext customContext =
+            config.getRebalancerContext(CustomRebalancerContext.class);
+        for (PartitionId partitionId : customContext.getPartitionSet()) {
+          idealState.setParticipantStateMap(partitionId,
+              customContext.getPreferenceMap(partitionId));
+        }
+      }
+      return idealState;
+    }
+    return null;
+  }
+
+  /**
+   * Create a resource snapshot instance from the physical model
+   * @param resourceId the resource id
+   * @param resourceConfiguration physical resource configuration
+   * @param idealState ideal state of the resource
+   * @param externalView external view of the resource
+   * @param resourceAssignment current resource assignment
+   * @return Resource
+   */
+  static Resource createResource(ResourceId resourceId,
+      ResourceConfiguration resourceConfiguration, IdealState idealState,
+      ExternalView externalView, ResourceAssignment resourceAssignment) {
+    // TODO pass resource assignment
+    UserConfig userConfig;
+    if (resourceConfiguration != null) {
+      userConfig = UserConfig.from(resourceConfiguration);
+    } else {
+      userConfig = new UserConfig(Scope.resource(resourceId));
+    }
+    int bucketSize = 0;
+    boolean batchMessageMode = false;
+    RebalancerContext rebalancerContext;
+    if (idealState != null) {
+      rebalancerContext = PartitionedRebalancerContext.from(idealState);
+      bucketSize = idealState.getBucketSize();
+      batchMessageMode = idealState.getBatchMessageMode();
+    } else {
+      if (resourceConfiguration != null) {
+        bucketSize = resourceConfiguration.getBucketSize();
+        batchMessageMode = resourceConfiguration.getBatchMessageMode();
+        RebalancerConfig rebalancerConfig = new RebalancerConfig(resourceConfiguration);
+        rebalancerContext = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+      } else {
+        rebalancerContext = new PartitionedRebalancerContext(RebalanceMode.NONE);
+      }
+    }
+    return new Resource(resourceId, idealState, resourceAssignment, externalView,
+        rebalancerContext, userConfig, bucketSize, batchMessageMode);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java b/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
index f2bbf6d..988484b 100644
--- a/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
@@ -25,13 +25,10 @@ import java.util.Map;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
 
 import com.google.common.collect.ImmutableMap;
 
 public class StateModelDefinitionAccessor {
-  private static Logger LOG = Logger.getLogger(StateModelDefinitionAccessor.class);
-
   private final HelixDataAccessor _accessor;
   private final PropertyKey.Builder _keyBuilder;
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index ab1f824..3c118ce 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -55,6 +55,7 @@ import org.apache.helix.controller.stages.NewMessageThrottleStage;
 import org.apache.helix.controller.stages.NewReadClusterDataStage;
 import org.apache.helix.controller.stages.NewResourceComputationStage;
 import org.apache.helix.controller.stages.NewTaskAssignmentStage;
+import org.apache.helix.controller.stages.PersistAssignmentStage;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HealthStat;
@@ -182,6 +183,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       rebalancePipeline.addStage(new NewResourceComputationStage());
       rebalancePipeline.addStage(new NewCurrentStateComputationStage());
       rebalancePipeline.addStage(new NewBestPossibleStateCalcStage());
+      rebalancePipeline.addStage(new PersistAssignmentStage());
       rebalancePipeline.addStage(new NewMessageGenerationStage());
       rebalancePipeline.addStage(new NewMessageSelectionStage());
       rebalancePipeline.addStage(new NewMessageThrottleStage());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
index 3aff151..87863b5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
@@ -53,7 +53,7 @@ public interface RebalancerContext {
    * @param subUnitId the id of the subunit
    * @return SubUnit
    */
-  public Partition getSubUnit(PartitionId partitionId);
+  public Partition getSubUnit(PartitionId subUnitId);
 
   /**
    * Get the resource to rebalance

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
index 6334279..7434c2a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -85,7 +85,7 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
   private ResourceAssignment mapDroppedResource(Cluster cluster, ResourceId resourceId,
       ResourceCurrentState currentStateOutput, StateModelDefinition stateModelDef) {
     ResourceAssignment partitionMapping = new ResourceAssignment(resourceId);
-    Set<PartitionId> mappedPartitions =
+    Set<? extends PartitionId> mappedPartitions =
         currentStateOutput.getCurrentStateMappedPartitions(resourceId);
     if (mappedPartitions == null) {
       return partitionMapping;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
index fbdea55..aaeeb34 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
@@ -1,18 +1,19 @@
 package org.apache.helix.controller.stages;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.api.ResourceId;
 import org.apache.helix.model.ResourceAssignment;
 
+import com.google.common.collect.Maps;
+
 public class NewBestPossibleStateOutput {
 
   Map<ResourceId, ResourceAssignment> _resourceAssignmentMap;
 
   public NewBestPossibleStateOutput() {
-    _resourceAssignmentMap = new HashMap<ResourceId, ResourceAssignment>();
+    _resourceAssignmentMap = Maps.newHashMap();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
new file mode 100644
index 0000000..f982847
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -0,0 +1,45 @@
+package org.apache.helix.controller.stages;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.ResourceAccessor;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.model.ResourceAssignment;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Persist the ResourceAssignment of each resource that went through rebalancing
+ */
+public class PersistAssignmentStage extends AbstractBaseStage {
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    HelixManager helixManager = event.getAttribute("helixmanager");
+    HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
+    ResourceAccessor resourceAccessor = new ResourceAccessor(accessor);
+    NewBestPossibleStateOutput assignments =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+    for (ResourceId resourceId : assignments.getAssignedResources()) {
+      ResourceAssignment assignment = assignments.getResourceAssignment(resourceId);
+      resourceAccessor.setResourceAssignment(resourceId, assignment);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
index db33a4f..e3e8f94 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
@@ -225,7 +225,7 @@ public class ResourceCurrentState {
    * @param resourceId resource to look up
    * @return set of mapped partitions, or empty set if there are none
    */
-  public Set<PartitionId> getCurrentStateMappedPartitions(ResourceId resourceId) {
+  public Set<? extends PartitionId> getCurrentStateMappedPartitions(ResourceId resourceId) {
     Map<PartitionId, Map<ParticipantId, State>> currentStateMap = _currentStateMap.get(resourceId);
     if (currentStateMap != null) {
       return currentStateMap.keySet();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index ff0923d..ba62391 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -747,7 +747,7 @@ public class IdealState extends HelixProperty {
    * @param rawPreferenceLists a map of partition name to a list of participant names
    * @return converted lists as a map
    */
-  public static Map<PartitionId, List<ParticipantId>> preferenceListsFromStringLists(
+  public static Map<? extends PartitionId, List<ParticipantId>> preferenceListsFromStringLists(
       Map<String, List<String>> rawPreferenceLists) {
     if (rawPreferenceLists == null) {
       return Collections.emptyMap();
@@ -812,7 +812,7 @@ public class IdealState extends HelixProperty {
    * @param rawMaps the map of partition name to participant name and state
    * @return converted maps
    */
-  public static Map<PartitionId, Map<ParticipantId, State>> participantStateMapsFromStringMaps(
+  public static Map<? extends PartitionId, Map<ParticipantId, State>> participantStateMapsFromStringMaps(
       Map<String, Map<String, String>> rawMaps) {
     return ResourceAssignment.replicaMapsFromStringMaps(rawMaps);
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
index 65fa14d..14183f5 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.ResourceId;
@@ -63,6 +64,14 @@ public class ResourceAssignment extends HelixProperty {
   }
 
   /**
+   * Instantiate from a record. This supports reading the assignment directly from the backing store
+   * @param record backing record
+   */
+  public ResourceAssignment(ZNRecord record) {
+    super(record);
+  }
+
+  /**
    * Get the resource for which this assignment was created
    * @return resource id
    */
@@ -74,7 +83,7 @@ public class ResourceAssignment extends HelixProperty {
    * Get the currently mapped partitions
    * @return list of Partition objects
    */
-  public List<PartitionId> getMappedPartitions() {
+  public List<? extends PartitionId> getMappedPartitions() {
     ImmutableList.Builder<PartitionId> builder = new ImmutableList.Builder<PartitionId>();
     for (String partitionName : _record.getMapFields().keySet()) {
       builder.add(PartitionId.from(partitionName));
@@ -86,7 +95,7 @@ public class ResourceAssignment extends HelixProperty {
    * Get the entire map of a resource
    * @return map of partition to participant to state
    */
-  public Map<PartitionId, Map<ParticipantId, State>> getResourceMap() {
+  public Map<? extends PartitionId, Map<ParticipantId, State>> getResourceMap() {
     return replicaMapsFromStringMaps(_record.getMapFields());
   }
 
@@ -144,7 +153,7 @@ public class ResourceAssignment extends HelixProperty {
    * @param rawMaps the map of partition name to participant name and state
    * @return converted maps
    */
-  public static Map<PartitionId, Map<ParticipantId, State>> replicaMapsFromStringMaps(
+  public static Map<? extends PartitionId, Map<ParticipantId, State>> replicaMapsFromStringMaps(
       Map<String, Map<String, String>> rawMaps) {
     if (rawMaps == null) {
       return Collections.emptyMap();
@@ -180,7 +189,7 @@ public class ResourceAssignment extends HelixProperty {
    * @return converted maps
    */
   public static Map<String, Map<String, String>> stringMapsFromReplicaMaps(
-      Map<PartitionId, Map<ParticipantId, State>> replicaMaps) {
+      Map<? extends PartitionId, Map<ParticipantId, State>> replicaMaps) {
     if (replicaMaps == null) {
       return Collections.emptyMap();
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index 7e16de0..d4c1691 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -266,7 +266,7 @@ public class ClusterStateVerifier {
           ResourceAssignment resourceAssignment = bestPossOutput.getResourceAssignment(resourceId);
 
           ResourceAssignmentBuilder raBuilder = new ResourceAssignmentBuilder(resourceId);
-          List<PartitionId> mappedPartitions = resourceAssignment.getMappedPartitions();
+          List<? extends PartitionId> mappedPartitions = resourceAssignment.getMappedPartitions();
           for (PartitionId partitionId : mappedPartitions) {
             raBuilder.addAssignments(partitionId, resourceAssignment.getReplicaMap(partitionId));
           }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 1fdf9fd..0558764 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -674,7 +674,7 @@ public class Mocks {
     }
 
     @Override
-    public BaseDataAccessor getBaseDataAccessor() {
+    public BaseDataAccessor<ZNRecord> getBaseDataAccessor() {
       // TODO Auto-generated method stub
       return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/test/java/org/apache/helix/TestPerfCounters.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestPerfCounters.java b/helix-core/src/test/java/org/apache/helix/TestPerfCounters.java
index d95bff8..bac6e7a 100644
--- a/helix-core/src/test/java/org/apache/helix/TestPerfCounters.java
+++ b/helix-core/src/test/java/org/apache/helix/TestPerfCounters.java
@@ -24,6 +24,7 @@ import org.testng.AssertJUnit;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
+@SuppressWarnings("deprecation")
 public class TestPerfCounters {
 
   final String INSTANCE_NAME = "instance_123";

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index b0c3e58..9f31609 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -43,12 +43,10 @@ import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestMessageThrottleStage extends ZkUnitTestBase {
-  private static final Logger LOG = Logger.getLogger(TestMessageThrottleStage.class.getName());
   final String _className = getShortClassName();
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
index ec5d503..8e7b85a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
@@ -27,27 +27,29 @@ import org.testng.annotations.Test;
 public class TestParseInfoFromAlert extends ZkStandAloneCMTestBase {
   @Test
   public void TestParse() {
-    StatsAggregationStage stage = new StatsAggregationStage();
     String controllerName = CONTROLLER_PREFIX + "_0";
     HelixManager manager = _startCMResultMap.get(controllerName)._manager;
 
     String instanceName =
-        stage.parseInstanceName("localhost_12918.TestStat@DB=123.latency", manager);
+        StatsAggregationStage.parseInstanceName("localhost_12918.TestStat@DB=123.latency", manager);
     Assert.assertTrue(instanceName.equals("localhost_12918"));
 
-    instanceName = stage.parseInstanceName("localhost_12955.TestStat@DB=123.latency", manager);
+    instanceName =
+        StatsAggregationStage.parseInstanceName("localhost_12955.TestStat@DB=123.latency", manager);
     Assert.assertTrue(instanceName == null);
 
-    instanceName = stage.parseInstanceName("localhost_12922.TestStat@DB=123.latency", manager);
+    instanceName =
+        StatsAggregationStage.parseInstanceName("localhost_12922.TestStat@DB=123.latency", manager);
     Assert.assertTrue(instanceName.equals("localhost_12922"));
 
     String resourceName =
-        stage.parseResourceName("localhost_12918.TestStat@DB=TestDB.latency", manager);
+        StatsAggregationStage.parseResourceName("localhost_12918.TestStat@DB=TestDB.latency",
+            manager);
     Assert.assertTrue(resourceName.equals("TestDB"));
 
     String partitionName =
-        stage.parsePartitionName("localhost_12918.TestStat@DB=TestDB;Partition=TestDB_22.latency",
-            manager);
+        StatsAggregationStage.parsePartitionName(
+            "localhost_12918.TestStat@DB=TestDB;Partition=TestDB_22.latency", manager);
     Assert.assertTrue(partitionName.equals("TestDB_22"));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index e5674dc..f7d2317 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -42,12 +42,10 @@ import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.Attributes;
-import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestRebalancePipeline extends ZkUnitTestBase {
-  private static final Logger LOG = Logger.getLogger(TestRebalancePipeline.class.getName());
   final String _className = getShortClassName();
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java b/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
index deea4d1..b1fc1cd 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
@@ -35,12 +35,10 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestConstraint extends ZkUnitTestBase {
-  private static Logger LOG = Logger.getLogger(TestConstraint.class);
 
   @Test
   public void testMsgConstraint() {
@@ -185,7 +183,7 @@ public class TestConstraint extends ZkUnitTestBase {
     ConstraintItem constraint2 = new ConstraintItem(record.getMapField("constraint2"));
 
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     accessor.setProperty(keyBuilder.constraint(ConstraintType.STATE_CONSTRAINT.toString()),

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
index ed66ae2..6791bfd 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
@@ -26,6 +26,7 @@ import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.Transition;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Lists;
@@ -77,11 +78,21 @@ public class NewModelExample {
     userConfig.setIntField("sampleInt", 1);
 
     // fully specify the cluster with a ClusterConfig
-    ClusterConfig cluster =
+    ClusterConfig.Builder clusterBuilder =
         new ClusterConfig.Builder(clusterId).addResource(resource).addParticipant(participant)
-            .addStateModelDefinition(lockUnlock).userConfig(userConfig).build();
+            .addStateModelDefinition(lockUnlock).userConfig(userConfig);
 
-    // set up accessors to work with Zookeeper-persisted data
+    // add a state constraint that is more restrictive than what is in the state model
+    clusterBuilder.addStateUpperBoundConstraint(Scope.cluster(clusterId),
+        lockUnlock.getStateModelDefId(), State.from("LOCKED"), 1);
+
+    // add a transition constraint (this time with a resource scope)
+    clusterBuilder.addTransitionConstraint(Scope.resource(resource.getId()),
+        lockUnlock.getStateModelDefId(), Transition.from("RELEASED-LOCKED"), 1);
+
+    ClusterConfig cluster = clusterBuilder.build();
+
+    // set up accessors to work with ZooKeeper-persisted data
     int timeOutInSec = Integer.parseInt(System.getProperty(ZKHelixAdmin.CONNECTION_TIMEOUT, "30"));
     ZkClient zkClient = new ZkClient(args[0], timeOutInSec * 1000);
     zkClient.setZkSerializer(new ZNRecordSerializer());
@@ -150,7 +161,7 @@ public class NewModelExample {
     StateModelDefinition.Builder stateModelBuilder =
         new StateModelDefinition.Builder(stateModelId).addState(LOCKED, 0).addState(RELEASED, 1)
             .addState(DROPPED, 2).addTransition(RELEASED, LOCKED, 0)
-            .addTransition(LOCKED, RELEASED, 1).upperBound(LOCKED, 1).upperBound(RELEASED, -1)
+            .addTransition(LOCKED, RELEASED, 1).upperBound(LOCKED, 2).upperBound(RELEASED, -1)
             .upperBound(DROPPED, -1).initialState(RELEASED);
     return stateModelBuilder.build();
   }