You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/16 01:51:44 UTC

[7/9] [HELIX-209] Shuffling around rebalancer code to allow for compatibility

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerRef.java
deleted file mode 100644
index a90b77a..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerRef.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.util.HelixUtil;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
- * Reference to a class that extends {@link Rebalancer}. It loads the class automatically.
- */
-public class RebalancerRef {
-  private static final Logger LOG = Logger.getLogger(RebalancerRef.class);
-
-  @JsonProperty("rebalancerClassName")
-  private final String _rebalancerClassName;
-
-  @JsonCreator
-  private RebalancerRef(@JsonProperty("rebalancerClassName") String rebalancerClassName) {
-    _rebalancerClassName = rebalancerClassName;
-  }
-
-  /**
-   * Get an instantiated Rebalancer
-   * @return Rebalancer or null if instantiation failed
-   */
-  @JsonIgnore
-  public Rebalancer getRebalancer() {
-    try {
-      return (Rebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
-    } catch (Exception e) {
-      LOG.warn("Exception while invoking custom rebalancer class:" + _rebalancerClassName, e);
-    }
-    return null;
-  }
-
-  @Override
-  public String toString() {
-    return _rebalancerClassName;
-  }
-
-  @Override
-  public boolean equals(Object that) {
-    if (that instanceof RebalancerRef) {
-      return this.toString().equals(((RebalancerRef) that).toString());
-    } else if (that instanceof String) {
-      return this.toString().equals(that);
-    }
-    return false;
-  }
-
-  /**
-   * Get a rebalancer class reference
-   * @param rebalancerClassName name of the class
-   * @return RebalancerRef or null if name is null
-   */
-  public static RebalancerRef from(String rebalancerClassName) {
-    if (rebalancerClassName == null) {
-      return null;
-    }
-    return new RebalancerRef(rebalancerClassName);
-  }
-
-  /**
-   * Get a RebalancerRef from a class object
-   * @param rebalancerClass class that implements Rebalancer
-   * @return RebalancerRef
-   */
-  public static RebalancerRef from(Class<? extends Rebalancer> rebalancerClass) {
-    if (rebalancerClass == null) {
-      return null;
-    }
-    return RebalancerRef.from(rebalancerClass.getName());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
deleted file mode 100644
index 525931d..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Methods specifying a rebalancer context that allows replicas. For instance, a rebalancer context
- * with partitions may accept state model definitions that support multiple replicas per partition,
- * and it's possible that the policy is that each live participant in the system should have a
- * replica.
- */
-public interface ReplicatedRebalancerContext extends RebalancerContext {
-  /**
-   * Check if this resource should be assigned to any live participant
-   * @return true if any live participant expected, false otherwise
-   */
-  public boolean anyLiveParticipant();
-
-  /**
-   * Get the number of replicas that each resource subunit should have
-   * @return replica count
-   */
-  public int getReplicaCount();
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
deleted file mode 100644
index 3f0dd13..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ResourceCurrentState;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Rebalancer for the SEMI_AUTO mode. It expects a RebalancerConfig that understands the preferred
- * locations of each partition replica
- */
-public class SemiAutoRebalancer implements Rebalancer {
-  private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
-
-  @Override
-  public void init(HelixManager helixManager) {
-    // do nothing
-  }
-
-  @Override
-  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
-      Cluster cluster, ResourceCurrentState currentState) {
-    SemiAutoRebalancerContext config =
-        rebalancerConfig.getRebalancerContext(SemiAutoRebalancerContext.class);
-    StateModelDefinition stateModelDef =
-        cluster.getStateModelMap().get(config.getStateModelDefId());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing resource:" + config.getResourceId());
-    }
-    ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
-    for (PartitionId partition : config.getPartitionSet()) {
-      Map<ParticipantId, State> currentStateMap =
-          currentState.getCurrentStateMap(config.getResourceId(), partition);
-      Set<ParticipantId> disabledInstancesForPartition =
-          NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
-              partition);
-      List<ParticipantId> preferenceList =
-          NewConstraintBasedAssignment.getPreferenceList(cluster, partition,
-              config.getPreferenceList(partition));
-      Map<State, String> upperBounds =
-          NewConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
-              cluster.getConfig());
-      Map<ParticipantId, State> bestStateForPartition =
-          NewConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
-              .getLiveParticipantMap().keySet(), stateModelDef, preferenceList, currentStateMap,
-              disabledInstancesForPartition);
-      partitionMapping.addReplicaMap(partition, bestStateForPartition);
-    }
-    return partitionMapping;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
deleted file mode 100644
index 72b3bc7..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
+++ /dev/null
@@ -1,176 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for SEMI_AUTO rebalancer mode. It indicates the preferred locations of each
- * partition replica. By default, it corresponds to {@link SemiAutoRebalancer}
- */
-public final class SemiAutoRebalancerContext extends PartitionedRebalancerContext {
-  @JsonProperty("preferenceLists")
-  private Map<PartitionId, List<ParticipantId>> _preferenceLists;
-
-  /**
-   * Instantiate a SemiAutoRebalancerContext
-   */
-  public SemiAutoRebalancerContext() {
-    super(RebalanceMode.SEMI_AUTO);
-    setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
-    _preferenceLists = Maps.newHashMap();
-  }
-
-  /**
-   * Get the preference lists of all partitions of the resource
-   * @return map of partition id to list of participant ids
-   */
-  public Map<PartitionId, List<ParticipantId>> getPreferenceLists() {
-    return _preferenceLists;
-  }
-
-  /**
-   * Set the preference lists of all partitions of the resource
-   * @param preferenceLists
-   */
-  public void setPreferenceLists(Map<PartitionId, List<ParticipantId>> preferenceLists) {
-    _preferenceLists = preferenceLists;
-  }
-
-  /**
-   * Get the preference list of a partition
-   * @param partitionId the partition to look up
-   * @return list of participant ids
-   */
-  @JsonIgnore
-  public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
-    return _preferenceLists.get(partitionId);
-  }
-
-  /**
-   * Generate preference lists based on a default cluster setup
-   * @param stateModelDef the state model definition to follow
-   * @param participantSet the set of participant ids to configure for
-   */
-  @Override
-  @JsonIgnore
-  public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
-      Set<ParticipantId> participantSet) {
-    // compute default upper bounds
-    Map<State, String> upperBounds = Maps.newHashMap();
-    for (State state : stateModelDef.getTypedStatesPriorityList()) {
-      upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
-    }
-
-    // determine the current mapping
-    Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
-    for (PartitionId partitionId : getPartitionSet()) {
-      List<ParticipantId> preferenceList = getPreferenceList(partitionId);
-      if (preferenceList != null && !preferenceList.isEmpty()) {
-        Set<ParticipantId> disabledParticipants = Collections.emptySet();
-        Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
-        Map<ParticipantId, State> initialMap =
-            NewConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
-                participantSet, stateModelDef, preferenceList, emptyCurrentState,
-                disabledParticipants);
-        currentMapping.put(partitionId, initialMap);
-      }
-    }
-
-    // determine the preference
-    LinkedHashMap<State, Integer> stateCounts =
-        NewConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
-            getReplicaCount());
-    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
-    List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
-    List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
-    AutoRebalanceStrategy strategy =
-        new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
-            getMaxPartitionsPerParticipant(), placementScheme);
-    Map<String, List<String>> rawPreferenceLists =
-        strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
-            .getListFields();
-    Map<PartitionId, List<ParticipantId>> preferenceLists =
-        Maps.newHashMap(IdealState.preferenceListsFromStringLists(rawPreferenceLists));
-    setPreferenceLists(preferenceLists);
-  }
-
-  /**
-   * Build a SemiAutoRebalancerContext. By default, it corresponds to {@link SemiAutoRebalancer}
-   */
-  public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
-    private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
-
-    /**
-     * Instantiate for a resource
-     * @param resourceId resource id
-     */
-    public Builder(ResourceId resourceId) {
-      super(resourceId);
-      super.rebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
-      _preferenceLists = Maps.newHashMap();
-    }
-
-    /**
-     * Add a preference list for a partition
-     * @param partitionId partition to set
-     * @param preferenceList ordered list of participants who can serve the partition
-     * @return Builder
-     */
-    public Builder preferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
-      _preferenceLists.put(partitionId, preferenceList);
-      return self();
-    }
-
-    @Override
-    protected Builder self() {
-      return this;
-    }
-
-    @Override
-    public SemiAutoRebalancerContext build() {
-      SemiAutoRebalancerContext context = new SemiAutoRebalancerContext();
-      super.update(context);
-      context.setPreferenceLists(_preferenceLists);
-      return context;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index 323be34..ce52a19 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -39,7 +39,9 @@ import org.apache.log4j.Logger;
 
 /**
  * Collection of functions that will compute the best possible states given the live instances and
- * an ideal state.
+ * an ideal state.<br/>
+ * <br/>
+ * Deprecated. Use {@link org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment} instead.
  */
 @Deprecated
 public class ConstraintBasedAssignment {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
deleted file mode 100644
index f703073..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
+++ /dev/null
@@ -1,244 +0,0 @@
-package org.apache.helix.controller.rebalancer.util;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixConstants.StateModelToken;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.State;
-import org.apache.helix.api.config.ClusterConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * Collection of functions that will compute the best possible state based on the participants and
- * the rebalancer configuration of a resource.
- */
-public class NewConstraintBasedAssignment {
-  private static Logger logger = Logger.getLogger(NewConstraintBasedAssignment.class);
-
-  /**
-   * Get a set of disabled participants for a partition
-   * @param participantMap map of all participants
-   * @param partitionId the partition to check
-   * @return a set of all participants that are disabled for the partition
-   */
-  public static Set<ParticipantId> getDisabledParticipants(
-      final Map<ParticipantId, Participant> participantMap, final PartitionId partitionId) {
-    Set<ParticipantId> participantSet = new HashSet<ParticipantId>(participantMap.keySet());
-    Set<ParticipantId> disabledParticipantsForPartition =
-        Sets.filter(participantSet, new Predicate<ParticipantId>() {
-          @Override
-          public boolean apply(ParticipantId participantId) {
-            Participant participant = participantMap.get(participantId);
-            return !participant.isEnabled()
-                || participant.getDisabledPartitionIds().contains(partitionId);
-          }
-        });
-    return disabledParticipantsForPartition;
-  }
-
-  /**
-   * Get an ordered list of participants that can serve a partition
-   * @param cluster cluster snapshot
-   * @param partitionId the partition to look up
-   * @param config rebalancing constraints
-   * @return list with most preferred participants first
-   */
-  public static List<ParticipantId> getPreferenceList(Cluster cluster, PartitionId partitionId,
-      List<ParticipantId> prefList) {
-    if (prefList != null && prefList.size() == 1
-        && StateModelToken.ANY_LIVEINSTANCE.toString().equals(prefList.get(0).stringify())) {
-      prefList = new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
-      Collections.sort(prefList);
-    }
-    return prefList;
-  }
-
-  /**
-   * Get a map of state to upper bound constraint given a cluster
-   * @param stateModelDef the state model definition to check
-   * @param resourceId the resource that is constraint
-   * @param cluster the cluster the resource belongs to
-   * @return map of state to upper bound
-   */
-  public static Map<State, String> stateConstraints(StateModelDefinition stateModelDef,
-      ResourceId resourceId, ClusterConfig cluster) {
-    Map<State, String> stateMap = Maps.newHashMap();
-    for (State state : stateModelDef.getTypedStatesPriorityList()) {
-      String num =
-          cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
-              stateModelDef.getStateModelDefId(), state);
-      stateMap.put(state, num);
-    }
-    return stateMap;
-  }
-
-  /**
-   * compute best state for resource in SEMI_AUTO and FULL_AUTO modes
-   * @param upperBounds map of state to upper bound
-   * @param liveParticipantSet set of live participant ids
-   * @param stateModelDef
-   * @param participantPreferenceList
-   * @param currentStateMap
-   *          : participant->state for each partition
-   * @param disabledParticipantsForPartition
-   * @return
-   */
-  public static Map<ParticipantId, State> computeAutoBestStateForPartition(
-      Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
-      StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
-      Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
-    Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
-
-    // if the resource is deleted, instancePreferenceList will be empty and
-    // we should drop all resources.
-    if (currentStateMap != null) {
-      for (ParticipantId participantId : currentStateMap.keySet()) {
-        if ((participantPreferenceList == null || !participantPreferenceList
-            .contains(participantId)) && !disabledParticipantsForPartition.contains(participantId)) {
-          // if dropped and not disabled, transit to DROPPED
-          participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
-        } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
-            participantId).equals(State.from(HelixDefinedState.ERROR)))
-            && disabledParticipantsForPartition.contains(participantId)) {
-          // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
-          participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
-        }
-      }
-    }
-
-    // resource is deleted
-    if (participantPreferenceList == null) {
-      return participantStateMap;
-    }
-
-    List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
-    boolean assigned[] = new boolean[participantPreferenceList.size()];
-
-    for (State state : statesPriorityList) {
-      String num = upperBounds.get(state);
-      int stateCount = -1;
-      if ("N".equals(num)) {
-        Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantSet);
-        liveAndEnabled.removeAll(disabledParticipantsForPartition);
-        stateCount = liveAndEnabled.size();
-      } else if ("R".equals(num)) {
-        stateCount = participantPreferenceList.size();
-      } else {
-        try {
-          stateCount = Integer.parseInt(num);
-        } catch (Exception e) {
-          logger.error("Invalid count for state:" + state + " ,count=" + num);
-        }
-      }
-      if (stateCount > -1) {
-        int count = 0;
-        for (int i = 0; i < participantPreferenceList.size(); i++) {
-          ParticipantId participantId = participantPreferenceList.get(i);
-
-          boolean notInErrorState =
-              currentStateMap == null
-                  || currentStateMap.get(participantId) == null
-                  || !currentStateMap.get(participantId)
-                      .equals(State.from(HelixDefinedState.ERROR));
-
-          if (liveParticipantSet.contains(participantId) && !assigned[i] && notInErrorState
-              && !disabledParticipantsForPartition.contains(participantId)) {
-            participantStateMap.put(participantId, state);
-            count = count + 1;
-            assigned[i] = true;
-            if (count == stateCount) {
-              break;
-            }
-          }
-        }
-      }
-    }
-    return participantStateMap;
-  }
-
-  /**
-   * Get the number of replicas that should be in each state for a partition
-   * @param upperBounds map of state to upper bound
-   * @param stateModelDef StateModelDefinition object
-   * @param liveNodesNb number of live nodes
-   * @param total number of replicas
-   * @return state count map: state->count
-   */
-  public static LinkedHashMap<State, Integer> stateCount(Map<State, String> upperBounds,
-      StateModelDefinition stateModelDef, int liveNodesNb, int totalReplicas) {
-    LinkedHashMap<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
-    List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
-
-    int replicas = totalReplicas;
-    for (State state : statesPriorityList) {
-      String num = upperBounds.get(state);
-      if ("N".equals(num)) {
-        stateCountMap.put(state, liveNodesNb);
-      } else if ("R".equals(num)) {
-        // wait until we get the counts for all other states
-        continue;
-      } else {
-        int stateCount = -1;
-        try {
-          stateCount = Integer.parseInt(num);
-        } catch (Exception e) {
-          // LOG.error("Invalid count for state: " + state + ", count: " + num +
-          // ", use -1 instead");
-        }
-
-        if (stateCount > 0) {
-          stateCountMap.put(state, stateCount);
-          replicas -= stateCount;
-        }
-      }
-    }
-
-    // get state count for R
-    for (State state : statesPriorityList) {
-      String num = upperBounds.get(state);
-      if ("R".equals(num)) {
-        stateCountMap.put(state, replicas);
-        // should have at most one state using R
-        break;
-      }
-    }
-    return stateCountMap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 9699dcb..51301f0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -20,114 +20,123 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
 import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.rebalancer.Rebalancer;
+import org.apache.helix.api.rebalancer.RebalancerConfig;
+import org.apache.helix.api.rebalancer.RebalancerContext;
+import org.apache.helix.api.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.AutoRebalancer;
-import org.apache.helix.controller.rebalancer.CustomRebalancer;
-import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.util.HelixUtil;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
 /**
  * For partition compute best possible (instance,state) pair based on
  * IdealState,StateModel,LiveInstance
  */
-@Deprecated
 public class BestPossibleStateCalcStage extends AbstractBaseStage {
-  private static final Logger logger = Logger.getLogger(BestPossibleStateCalcStage.class.getName());
+  private static final Logger LOG = Logger.getLogger(BestPossibleStateCalcStage.class.getName());
 
   @Override
   public void process(ClusterEvent event) throws Exception {
     long startTime = System.currentTimeMillis();
-    logger.info("START BestPossibleStateCalcStage.process()");
+    if (LOG.isInfoEnabled()) {
+      LOG.info("START BestPossibleStateCalcStage.process()");
+    }
 
-    CurrentStateOutput currentStateOutput =
+    ResourceCurrentState currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
+    Cluster cluster = event.getAttribute("ClusterDataCache");
 
-    if (currentStateOutput == null || resourceMap == null || cache == null) {
+    if (currentStateOutput == null || resourceMap == null || cluster == null) {
       throw new StageException("Missing attributes in event:" + event
           + ". Requires CURRENT_STATE|RESOURCES|DataCache");
     }
 
     BestPossibleStateOutput bestPossibleStateOutput =
-        compute(event, resourceMap, currentStateOutput);
+        compute(cluster, event, resourceMap, currentStateOutput);
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
 
     long endTime = System.currentTimeMillis();
-    logger.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
+    if (LOG.isInfoEnabled()) {
+      LOG.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
+    }
   }
 
-  private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
-      CurrentStateOutput currentStateOutput) {
-    // for each ideal state
-    // read the state model def
-    // for each resource
-    // get the preference list
-    // for each instanceName check if its alive then assign a state
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+  /**
+   * Fallback for cases when the resource has been dropped, but current state exists
+   * @param cluster cluster snapshot
+   * @param resourceId the resource for which to generate an assignment
+   * @param currentStateOutput full snapshot of the current state
+   * @param stateModelDef state model the resource follows
+   * @return assignment for the dropped resource
+   */
+  private ResourceAssignment mapDroppedResource(Cluster cluster, ResourceId resourceId,
+      ResourceCurrentState currentStateOutput, StateModelDefinition stateModelDef) {
+    ResourceAssignment partitionMapping = new ResourceAssignment(resourceId);
+    Set<? extends PartitionId> mappedPartitions =
+        currentStateOutput.getCurrentStateMappedPartitions(resourceId);
+    if (mappedPartitions == null) {
+      return partitionMapping;
+    }
+    for (PartitionId partitionId : mappedPartitions) {
+      Set<ParticipantId> disabledParticipantsForPartition =
+          ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+              partitionId);
+      Map<State, String> upperBounds =
+          ConstraintBasedAssignment.stateConstraints(stateModelDef, resourceId,
+              cluster.getConfig());
+      partitionMapping.addReplicaMap(partitionId, ConstraintBasedAssignment
+          .computeAutoBestStateForPartition(upperBounds, cluster.getLiveParticipantMap().keySet(),
+              stateModelDef, null, currentStateOutput.getCurrentStateMap(resourceId, partitionId),
+              disabledParticipantsForPartition));
+    }
+    return partitionMapping;
+  }
 
+  private BestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
+      Map<ResourceId, ResourceConfig> resourceMap, ResourceCurrentState currentStateOutput) {
     BestPossibleStateOutput output = new BestPossibleStateOutput();
+    Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
 
-    for (String resourceName : resourceMap.keySet()) {
-      logger.debug("Processing resource:" + resourceName);
-
-      Resource resource = resourceMap.get(resourceName);
-      // Ideal state may be gone. In that case we need to get the state model name
-      // from the current state
-      IdealState idealState = cache.getIdealState(resourceName);
-
-      if (idealState == null) {
-        // if ideal state is deleted, use an empty one
-        logger.info("resource:" + resourceName + " does not exist anymore");
-        idealState = new IdealState(resourceName);
+    for (ResourceId resourceId : resourceMap.keySet()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Processing resource:" + resourceId);
       }
-
-      Rebalancer rebalancer = null;
-      if (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED
-          && idealState.getRebalancerRef() != null) {
-        String rebalancerClassName = idealState.getRebalancerRef().toString();
-        logger
-            .info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
-        try {
-          rebalancer =
-              (Rebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
-        } catch (Exception e) {
-          logger.warn("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
+      ResourceConfig resourceConfig = resourceMap.get(resourceId);
+      RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
+      ResourceAssignment resourceAssignment = null;
+      if (rebalancerConfig != null) {
+        Rebalancer rebalancer = rebalancerConfig.getRebalancer();
+        if (rebalancer != null) {
+          HelixManager manager = event.getAttribute("helixmanager");
+          rebalancer.init(manager);
+          resourceAssignment =
+              rebalancer.computeResourceMapping(rebalancerConfig, cluster, currentStateOutput);
         }
       }
-      if (rebalancer == null) {
-        if (idealState.getRebalanceMode() == RebalanceMode.FULL_AUTO) {
-          rebalancer = new AutoRebalancer();
-        } else if (idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
-          rebalancer = new SemiAutoRebalancer();
-        } else {
-          rebalancer = new CustomRebalancer();
-        }
+      if (resourceAssignment == null) {
+        RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+        StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
+        resourceAssignment =
+            mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
       }
 
-      HelixManager manager = event.getAttribute("helixmanager");
-      rebalancer.init(manager);
-      ResourceAssignment partitionStateAssignment =
-          rebalancer.computeResourceMapping(resource, idealState, currentStateOutput, cache);
-      for (Partition partition : resource.getPartitions()) {
-        Map<ParticipantId, State> newStateMap =
-            partitionStateAssignment.getReplicaMap(PartitionId.from(partition.getPartitionName()));
-        output.setParticipantStateMap(resourceName, partition, newStateMap);
-      }
+      output.setResourceAssignment(resourceId, resourceAssignment);
     }
+
     return output;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
index 362bbb6..afcb6f7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
@@ -1,82 +1,49 @@
 package org.apache.helix.controller.stages;
 
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.model.Partition;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.model.ResourceAssignment;
 
-@Deprecated
-public class BestPossibleStateOutput {
-  // resource->partition->instance->state
-  Map<String, Map<Partition, Map<String, String>>> _dataMap;
+import com.google.common.collect.Maps;
 
-  public BestPossibleStateOutput() {
-    _dataMap = new HashMap<String, Map<Partition, Map<String, String>>>();
-  }
+public class BestPossibleStateOutput {
 
-  public void setState(String resourceName, Partition resource,
-      Map<String, String> bestInstanceStateMappingForResource) {
-    if (!_dataMap.containsKey(resourceName)) {
-      _dataMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
-    }
-    Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
-    map.put(resource, bestInstanceStateMappingForResource);
-  }
+  Map<ResourceId, ResourceAssignment> _resourceAssignmentMap;
 
-  public void setParticipantStateMap(String resourceName, Partition partition,
-      Map<ParticipantId, State> bestInstanceStateMappingForResource) {
-    Map<String, String> rawStateMap = new HashMap<String, String>();
-    for (ParticipantId participantId : bestInstanceStateMappingForResource.keySet()) {
-      rawStateMap.put(participantId.stringify(),
-          bestInstanceStateMappingForResource.get(participantId).toString());
-    }
-    setState(resourceName, partition, rawStateMap);
+  public BestPossibleStateOutput() {
+    _resourceAssignmentMap = Maps.newHashMap();
   }
 
-  public Map<String, String> getInstanceStateMap(String resourceName, Partition resource) {
-    Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
-    if (map != null) {
-      return map.get(resource);
-    }
-    return Collections.emptyMap();
+  /**
+   * Set the computed resource assignment for a resource
+   * @param resourceId the resource to set
+   * @param resourceAssignment the computed assignment
+   */
+  public void setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
+    _resourceAssignmentMap.put(resourceId, resourceAssignment);
   }
 
-  public Map<Partition, Map<String, String>> getResourceMap(String resourceName) {
-    Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
-    if (map != null) {
-      return map;
-    }
-    return Collections.emptyMap();
+  /**
+   * Get the resource assignment computed for a resource
+   * @param resourceId resource to look up
+   * @return ResourceAssignment computed by the best possible state calculation
+   */
+  public ResourceAssignment getResourceAssignment(ResourceId resourceId) {
+    return _resourceAssignmentMap.get(resourceId);
   }
 
-  public Map<String, Map<Partition, Map<String, String>>> getStateMap() {
-    return _dataMap;
+  /**
+   * Get all of the resources currently assigned
+   * @return set of assigned resource ids
+   */
+  public Set<ResourceId> getAssignedResources() {
+    return _resourceAssignmentMap.keySet();
   }
 
   @Override
   public String toString() {
-    return _dataMap.toString();
+    return _resourceAssignmentMap.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
index 64e881c..532ecb5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
@@ -23,10 +23,12 @@ import java.util.Map;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.api.Cluster;
 import org.apache.helix.api.HelixVersion;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.LiveInstance;
 import org.apache.log4j.Logger;
 
 /**
@@ -38,16 +40,17 @@ public class CompatibilityCheckStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
     HelixManager manager = event.getAttribute("helixmanager");
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    if (manager == null || cache == null) {
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    if (manager == null || cluster == null) {
       throw new StageException("Missing attributes in event:" + event
           + ". Requires HelixManager | DataCache");
     }
 
     HelixManagerProperties properties = manager.getProperties();
-    Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
-    for (LiveInstance liveInstance : liveInstanceMap.values()) {
-      HelixVersion version = liveInstance.getTypedHelixVersion();
+    // Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
+    Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
+    for (Participant liveParticipant : liveParticipants.values()) {
+      HelixVersion version = liveParticipant.getRunningInstance().getVersion();
       String participantVersion = (version != null) ? version.toString() : null;
       if (!properties.isParticipantCompatible(participantVersion)) {
         String errorMsg =
@@ -55,7 +58,7 @@ public class CompatibilityCheckStage extends AbstractBaseStage {
                 + manager.getInstanceName() + ", controllerVersion: " + properties.getVersion()
                 + ", minimumSupportedParticipantVersion: "
                 + properties.getProperty("minimum_supported_version.participant")
-                + ", participant: " + liveInstance.getInstanceName() + ", participantVersion: "
+                + ", participant: " + liveParticipant.getId() + ", participantVersion: "
                 + participantVersion;
         LOG.error(errorMsg);
         throw new StageException(errorMsg);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 7036512..5730289 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -22,59 +22,68 @@ package org.apache.helix.controller.stages;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.MessageId;
+import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.SessionId;
+import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
 
 /**
  * For each LiveInstances select currentState and message whose sessionId matches
  * sessionId from LiveInstance Get Partition,State for all the resources computed in
  * previous State [ResourceComputationStage]
  */
-@Deprecated
 public class CurrentStateComputationStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
 
-    if (cache == null || resourceMap == null) {
+    if (cluster == null || resourceMap == null) {
       throw new StageException("Missing attributes in event:" + event
           + ". Requires DataCache|RESOURCE");
     }
 
-    Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    ResourceCurrentState currentStateOutput = new ResourceCurrentState();
 
-    for (LiveInstance instance : liveInstances.values()) {
-      String instanceName = instance.getInstanceName();
-      Map<String, Message> instanceMessages = cache.getMessages(instanceName);
-      for (Message message : instanceMessages.values()) {
+    for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
+      ParticipantId participantId = liveParticipant.getId();
+
+      // add pending messages
+      Map<MessageId, Message> instanceMsgs = liveParticipant.getMessageMap();
+      for (Message message : instanceMsgs.values()) {
         if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())) {
           continue;
         }
-        if (!instance.getTypedSessionId().equals(message.getTypedTgtSessionId())) {
+
+        if (!liveParticipant.getRunningInstance().getSessionId().equals(message.getTypedTgtSessionId())) {
           continue;
         }
+
         ResourceId resourceId = message.getResourceId();
-        Resource resource = resourceMap.get(resourceId.stringify());
+        ResourceConfig resource = resourceMap.get(resourceId);
         if (resource == null) {
           continue;
         }
 
         if (!message.getBatchMessageMode()) {
           PartitionId partitionId = message.getPartitionId();
-          Partition partition = resource.getPartition(partitionId.stringify());
+          Partition partition = resource.getSubUnit(partitionId);
           if (partition != null) {
-            currentStateOutput.setPendingState(resourceId.stringify(), partition, instanceName,
-                message.getTypedToState().toString());
+            currentStateOutput.setPendingState(resourceId, partitionId, participantId,
+                message.getTypedToState());
           } else {
             // log
           }
@@ -82,10 +91,10 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
           List<PartitionId> partitionNames = message.getPartitionIds();
           if (!partitionNames.isEmpty()) {
             for (PartitionId partitionId : partitionNames) {
-              Partition partition = resource.getPartition(partitionId.stringify());
+              Partition partition = resource.getSubUnit(partitionId);
               if (partition != null) {
-                currentStateOutput.setPendingState(resourceId.stringify(), partition, instanceName,
-                    message.getTypedToState().toString());
+                currentStateOutput.setPendingState(resourceId, partitionId, participantId,
+                    message.getTypedToState());
               } else {
                 // log
               }
@@ -93,43 +102,41 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
           }
         }
       }
-    }
-    for (LiveInstance instance : liveInstances.values()) {
-      String instanceName = instance.getInstanceName();
-
-      String clientSessionId = instance.getTypedSessionId().stringify();
-      Map<String, CurrentState> currentStateMap =
-          cache.getCurrentState(instanceName, clientSessionId);
-      for (CurrentState currentState : currentStateMap.values()) {
 
-        if (!instance.getTypedSessionId().equals(currentState.getTypedSessionId())) {
+      // add current state
+      SessionId sessionId = liveParticipant.getRunningInstance().getSessionId();
+      Map<ResourceId, CurrentState> curStateMap = liveParticipant.getCurrentStateMap();
+      for (CurrentState curState : curStateMap.values()) {
+        if (!sessionId.equals(curState.getTypedSessionId())) {
           continue;
         }
-        String resourceName = currentState.getResourceName();
-        String stateModelDefName = currentState.getStateModelDefRef();
-        Resource resource = resourceMap.get(resourceName);
+
+        ResourceId resourceId = curState.getResourceId();
+        StateModelDefId stateModelDefId = curState.getStateModelDefId();
+        ResourceConfig resource = resourceMap.get(resourceId);
         if (resource == null) {
           continue;
         }
-        if (stateModelDefName != null) {
-          currentStateOutput.setResourceStateModelDef(resourceName, stateModelDefName);
+
+        if (stateModelDefId != null) {
+          currentStateOutput.setResourceStateModelDef(resourceId, stateModelDefId);
         }
 
-        currentStateOutput.setBucketSize(resourceName, currentState.getBucketSize());
+        currentStateOutput.setBucketSize(resourceId, curState.getBucketSize());
 
-        Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
-        for (String partitionName : partitionStateMap.keySet()) {
-          Partition partition = resource.getPartition(partitionName);
+        Map<PartitionId, State> partitionStateMap = curState.getTypedPartitionStateMap();
+        for (PartitionId partitionId : partitionStateMap.keySet()) {
+          Partition partition = resource.getSubUnit(partitionId);
           if (partition != null) {
-            currentStateOutput.setCurrentState(resourceName, partition, instanceName,
-                currentState.getState(partitionName));
-
+            currentStateOutput.setCurrentState(resourceId, partitionId, participantId,
+                curState.getState(partitionId));
           } else {
             // log
           }
         }
       }
     }
+
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 3c3a9d9..5ecbddf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -34,33 +34,39 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZNRecordDelta;
 import org.apache.helix.ZNRecordDelta.MergeOperation;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.config.SchedulerTaskConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.rebalancer.RebalancerConfig;
+import org.apache.helix.api.rebalancer.RebalancerContext;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
 import org.apache.helix.model.StatusUpdate;
-import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
-@Deprecated
 public class ExternalViewComputeStage extends AbstractBaseStage {
-  private static Logger log = Logger.getLogger(ExternalViewComputeStage.class);
+  private static Logger LOG = Logger.getLogger(ExternalViewComputeStage.class);
 
   @Override
   public void process(ClusterEvent event) throws Exception {
     long startTime = System.currentTimeMillis();
-    log.info("START ExternalViewComputeStage.process()");
+    LOG.info("START ExternalViewComputeStage.process()");
 
     HelixManager manager = event.getAttribute("helixmanager");
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+    Map<ResourceId, ResourceConfig> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
+    Cluster cluster = event.getAttribute("ClusterDataCache");
 
-    if (manager == null || resourceMap == null || cache == null) {
+    if (manager == null || resourceMap == null || cluster == null) {
       throw new StageException("Missing attributes in event:" + event
           + ". Requires ClusterManager|RESOURCES|DataCache");
     }
@@ -68,58 +74,64 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
 
-    CurrentStateOutput currentStateOutput =
+    ResourceCurrentState currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
 
     List<ExternalView> newExtViews = new ArrayList<ExternalView>();
     List<PropertyKey> keys = new ArrayList<PropertyKey>();
 
+    // TODO use external-view accessor
     Map<String, ExternalView> curExtViews =
         dataAccessor.getChildValuesMap(keyBuilder.externalViews());
 
-    for (String resourceName : resourceMap.keySet()) {
-      ExternalView view = new ExternalView(resourceName);
+    for (ResourceId resourceId : resourceMap.keySet()) {
+      ExternalView view = new ExternalView(resourceId.stringify());
       // view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
       // if resource ideal state has bucket size, set it
       // otherwise resource has been dropped, use bucket size from current state instead
-      Resource resource = resourceMap.get(resourceName);
+      ResourceConfig resource = resourceMap.get(resourceId);
+      RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+      SchedulerTaskConfig schedulerTaskConfig = resource.getSchedulerTaskConfig();
+
       if (resource.getBucketSize() > 0) {
         view.setBucketSize(resource.getBucketSize());
       } else {
-        view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
+        view.setBucketSize(currentStateOutput.getBucketSize(resourceId));
       }
-
-      for (Partition partition : resource.getPartitions()) {
-        Map<String, String> currentStateMap =
-            currentStateOutput.getCurrentStateMap(resourceName, partition);
+      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
+        Map<ParticipantId, State> currentStateMap =
+            currentStateOutput.getCurrentStateMap(resourceId, partitionId);
         if (currentStateMap != null && currentStateMap.size() > 0) {
           // Set<String> disabledInstances
           // = cache.getDisabledInstancesForResource(resource.toString());
-          for (String instance : currentStateMap.keySet()) {
+          for (ParticipantId participantId : currentStateMap.keySet()) {
             // if (!disabledInstances.contains(instance))
             // {
-            view.setState(partition.getPartitionName(), instance, currentStateMap.get(instance));
+            view.setState(partitionId.stringify(), participantId.stringify(),
+                currentStateMap.get(participantId).toString());
             // }
           }
         }
       }
+
+      // TODO fix this
       // Update cluster status monitor mbean
-      ClusterStatusMonitor clusterStatusMonitor =
-          (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
-      IdealState idealState = cache._idealStateMap.get(view.getResourceName());
-      if (idealState != null) {
-        if (clusterStatusMonitor != null
-            && !idealState.getStateModelDefRef().equalsIgnoreCase(
-                DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-          clusterStatusMonitor.onExternalViewChange(view,
-              cache._idealStateMap.get(view.getResourceName()));
-        }
-      }
+      // ClusterStatusMonitor clusterStatusMonitor =
+      // (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+      // IdealState idealState = cache._idealStateMap.get(view.getResourceName());
+      // if (idealState != null) {
+      // if (clusterStatusMonitor != null
+      // && !idealState.getStateModelDefRef().equalsIgnoreCase(
+      // DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+      // clusterStatusMonitor.onExternalViewChange(view,
+      // cache._idealStateMap.get(view.getResourceName()));
+      // }
+      // }
 
       // compare the new external view with current one, set only on different
-      ExternalView curExtView = curExtViews.get(resourceName);
+      ExternalView curExtView = curExtViews.get(resourceId.stringify());
       if (curExtView == null || !curExtView.getRecord().equals(view.getRecord())) {
-        keys.add(keyBuilder.externalView(resourceName));
+        keys.add(keyBuilder.externalView(resourceId.stringify()));
         newExtViews.add(view);
 
         // For SCHEDULER_TASK_RESOURCE resource group (helix task queue), we need to find out which
@@ -127,10 +139,13 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
         // partitions are finished (COMPLETED or ERROR), update the status update of the original
         // scheduler
         // message, and then remove the partitions from the ideal state
-        if (idealState != null
-            && idealState.getStateModelDefRef().equalsIgnoreCase(
-                DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-          updateScheduledTaskStatus(view, manager, idealState);
+        RebalancerContext rebalancerContext =
+            (rebalancerConfig != null) ? rebalancerConfig
+                .getRebalancerContext(RebalancerContext.class) : null;
+        if (rebalancerContext != null
+            && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
+                StateModelDefId.SchedulerTaskQueue)) {
+          updateScheduledTaskStatus(resourceId, view, manager, schedulerTaskConfig);
         }
       }
     }
@@ -144,18 +159,21 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
 
     // remove dead external-views
     for (String resourceName : curExtViews.keySet()) {
-      if (!resourceMap.keySet().contains(resourceName)) {
+      if (!resourceMap.containsKey(ResourceId.from(resourceName))) {
         dataAccessor.removeProperty(keyBuilder.externalView(resourceName));
       }
     }
 
     long endTime = System.currentTimeMillis();
-    log.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
+    LOG.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
   }
 
-  private void updateScheduledTaskStatus(ExternalView ev, HelixManager manager,
-      IdealState taskQueueIdealState) {
+  // TODO fix it
+  private void updateScheduledTaskStatus(ResourceId resourceId, ExternalView ev,
+      HelixManager manager, SchedulerTaskConfig schedulerTaskConfig) {
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
     ZNRecord finishedTasks = new ZNRecord(ev.getResourceName());
 
     // Place holder for finished partitions
@@ -166,23 +184,21 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
     Map<String, Map<String, String>> controllerMsgUpdates =
         new HashMap<String, Map<String, String>>();
 
-    Builder keyBuilder = accessor.keyBuilder();
-
     for (String taskPartitionName : ev.getPartitionSet()) {
       for (String taskState : ev.getStateMap(taskPartitionName).values()) {
         if (taskState.equalsIgnoreCase(HelixDefinedState.ERROR.toString())
             || taskState.equalsIgnoreCase("COMPLETED")) {
-          log.info(taskPartitionName + " finished as " + taskState);
-          finishedTasks.getListFields().put(taskPartitionName, emptyList);
-          finishedTasks.getMapFields().put(taskPartitionName, emptyMap);
+          LOG.info(taskPartitionName + " finished as " + taskState);
+          finishedTasks.setListField(taskPartitionName, emptyList);
+          finishedTasks.setMapField(taskPartitionName, emptyMap);
 
           // Update original scheduler message status update
-          if (taskQueueIdealState.getRecord().getMapField(taskPartitionName) != null) {
-            String controllerMsgId =
-                taskQueueIdealState.getRecord().getMapField(taskPartitionName)
-                    .get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
+          Message innerMessage =
+              schedulerTaskConfig.getInnerMessage(PartitionId.from(taskPartitionName));
+          if (innerMessage != null) {
+            String controllerMsgId = innerMessage.getControllerMessagId();
             if (controllerMsgId != null) {
-              log.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
+              LOG.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
               if (!controllerMsgUpdates.containsKey(controllerMsgId)) {
                 controllerMsgUpdates.put(controllerMsgId, new HashMap<String, String>());
               }
@@ -193,16 +209,16 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
       }
     }
     // fill the controllerMsgIdCountMap
-    for (String taskId : taskQueueIdealState.getPartitionSet()) {
-      String controllerMsgId =
-          taskQueueIdealState.getRecord().getMapField(taskId)
-              .get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
+    for (PartitionId taskId : schedulerTaskConfig.getPartitionSet()) {
+      Message innerMessage = schedulerTaskConfig.getInnerMessage(taskId);
+      String controllerMsgId = innerMessage.getControllerMessagId();
+
       if (controllerMsgId != null) {
-        if (!controllerMsgIdCountMap.containsKey(controllerMsgId)) {
-          controllerMsgIdCountMap.put(controllerMsgId, 0);
+        Integer curCnt = controllerMsgIdCountMap.get(controllerMsgId);
+        if (curCnt == null) {
+          curCnt = 0;
         }
-        controllerMsgIdCountMap.put(controllerMsgId,
-            (controllerMsgIdCountMap.get(controllerMsgId) + 1));
+        controllerMsgIdCountMap.put(controllerMsgId, curCnt + 1);
       }
     }
 
@@ -212,18 +228,16 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
             keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), controllerMsgId);
         StatusUpdate controllerStatusUpdate = accessor.getProperty(controllerStatusUpdateKey);
         for (String taskPartitionName : controllerMsgUpdates.get(controllerMsgId).keySet()) {
+          Message innerMessage =
+              schedulerTaskConfig.getInnerMessage(PartitionId.from(taskPartitionName));
+
           Map<String, String> result = new HashMap<String, String>();
           result.put("Result", controllerMsgUpdates.get(controllerMsgId).get(taskPartitionName));
           controllerStatusUpdate.getRecord().setMapField(
-              "MessageResult "
-                  + taskQueueIdealState.getRecord().getMapField(taskPartitionName)
-                      .get(Message.Attributes.TGT_NAME.toString())
-                  + " "
-                  + taskPartitionName
-                  + " "
-                  + taskQueueIdealState.getRecord().getMapField(taskPartitionName)
-                      .get(Message.Attributes.MSG_ID.toString()), result);
+              "MessageResult " + innerMessage.getTgtName() + " " + taskPartitionName + " "
+                  + innerMessage.getMessageId(), result);
         }
+
         // All done for the scheduled tasks that came from controllerMsgId, add summary for it
         if (controllerMsgUpdates.get(controllerMsgId).size() == controllerMsgIdCountMap.get(
             controllerMsgId).intValue()) {
@@ -255,12 +269,12 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
       ZNRecordDelta znDelta = new ZNRecordDelta(finishedTasks, MergeOperation.SUBTRACT);
       List<ZNRecordDelta> deltaList = new LinkedList<ZNRecordDelta>();
       deltaList.add(znDelta);
-      IdealState delta = new IdealState(taskQueueIdealState.getResourceName());
+      IdealState delta = new IdealState(resourceId);
       delta.setDeltaList(deltaList);
 
       // Remove the finished (COMPLETED or ERROR) tasks from the SCHEDULER_TASK_RESOURCE idealstate
       keyBuilder = accessor.keyBuilder();
-      accessor.updateProperty(keyBuilder.idealState(taskQueueIdealState.getResourceName()), delta);
+      accessor.updateProperty(keyBuilder.idealState(resourceId.stringify()), delta);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
deleted file mode 100644
index 3056cd5..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ /dev/null
@@ -1,215 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.MessageId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.SessionId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * Compares the currentState, pendingState with IdealState and generate messages
- */
-@Deprecated
-public class MessageGenerationPhase extends AbstractBaseStage {
-  private static Logger logger = Logger.getLogger(MessageGenerationPhase.class);
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    HelixManager manager = event.getAttribute("helixmanager");
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-    CurrentStateOutput currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    BestPossibleStateOutput bestPossibleStateOutput =
-        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
-    if (manager == null || cache == null || resourceMap == null || currentStateOutput == null
-        || bestPossibleStateOutput == null) {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
-    }
-
-    Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-    Map<String, String> sessionIdMap = new HashMap<String, String>();
-
-    for (LiveInstance liveInstance : liveInstances.values()) {
-      sessionIdMap
-          .put(liveInstance.getInstanceName(), liveInstance.getTypedSessionId().stringify());
-    }
-    MessageGenerationOutput output = new MessageGenerationOutput();
-
-    for (String resourceName : resourceMap.keySet()) {
-      Resource resource = resourceMap.get(resourceName);
-      int bucketSize = resource.getBucketSize();
-
-      StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
-
-      for (Partition partition : resource.getPartitions()) {
-        Map<String, String> instanceStateMap =
-            bestPossibleStateOutput.getInstanceStateMap(resourceName, partition);
-
-        // we should generate message based on the desired-state priority
-        // so keep generated messages in a temp map keyed by state
-        // desired-state->list of generated-messages
-        Map<String, List<Message>> messageMap = new HashMap<String, List<Message>>();
-
-        for (String instanceName : instanceStateMap.keySet()) {
-          String desiredState = instanceStateMap.get(instanceName);
-
-          String currentState =
-              currentStateOutput.getCurrentState(resourceName, partition, instanceName);
-          if (currentState == null) {
-            currentState = stateModelDef.getInitialState();
-          }
-
-          if (desiredState.equalsIgnoreCase(currentState)) {
-            continue;
-          }
-
-          String pendingState =
-              currentStateOutput.getPendingState(resourceName, partition, instanceName);
-
-          String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
-          if (nextState == null) {
-            logger.error("Unable to find a next state for partition: "
-                + partition.getPartitionName() + " from stateModelDefinition"
-                + stateModelDef.getClass() + " from:" + currentState + " to:" + desiredState);
-            continue;
-          }
-
-          if (pendingState != null) {
-            if (nextState.equalsIgnoreCase(pendingState)) {
-              logger.debug("Message already exists for " + instanceName + " to transit "
-                  + partition.getPartitionName() + " from " + currentState + " to " + nextState);
-            } else if (currentState.equalsIgnoreCase(pendingState)) {
-              logger.info("Message hasn't been removed for " + instanceName + " to transit"
-                  + partition.getPartitionName() + " to " + pendingState + ", desiredState: "
-                  + desiredState);
-            } else {
-              logger.info("IdealState changed before state transition completes for "
-                  + partition.getPartitionName() + " on " + instanceName + ", pendingState: "
-                  + pendingState + ", currentState: " + currentState + ", nextState: " + nextState);
-            }
-          } else {
-            Message message =
-                createMessage(manager, resourceName, partition.getPartitionName(), instanceName,
-                    currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId(),
-                    resource.getStateModelFactoryname(), bucketSize);
-            IdealState idealState = cache.getIdealState(resourceName);
-            if (idealState != null
-                && idealState.getStateModelDefRef().equalsIgnoreCase(
-                    DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-              if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
-                message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
-                    idealState.getRecord().getMapField(partition.getPartitionName()));
-              }
-            }
-            // Set timeout of needed
-            String stateTransition =
-                currentState + "-" + nextState + "_" + Message.Attributes.TIMEOUT;
-            if (idealState != null) {
-              String timeOutStr = idealState.getRecord().getSimpleField(stateTransition);
-              if (timeOutStr == null
-                  && idealState.getStateModelDefRef().equalsIgnoreCase(
-                      DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-                // scheduled task queue
-                if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
-                  timeOutStr =
-                      idealState.getRecord().getMapField(partition.getPartitionName())
-                          .get(Message.Attributes.TIMEOUT.toString());
-                }
-              }
-              if (timeOutStr != null) {
-                try {
-                  int timeout = Integer.parseInt(timeOutStr);
-                  if (timeout > 0) {
-                    message.setExecutionTimeout(timeout);
-                  }
-                } catch (Exception e) {
-                  logger.error("", e);
-                }
-              }
-            }
-            message.getRecord().setSimpleField("ClusterEventName", event.getName());
-            // output.addMessage(resourceName, partition, message);
-            if (!messageMap.containsKey(desiredState)) {
-              messageMap.put(desiredState, new ArrayList<Message>());
-            }
-            messageMap.get(desiredState).add(message);
-          }
-        }
-
-        // add generated messages to output according to state priority
-        List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
-        for (String state : statesPriorityList) {
-          if (messageMap.containsKey(state)) {
-            for (Message message : messageMap.get(state)) {
-              output.addMessage(resourceName, partition, message);
-            }
-          }
-        }
-
-      } // end of for-each-partition
-    }
-    event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
-  }
-
-  private Message createMessage(HelixManager manager, String resourceName, String partitionName,
-      String instanceName, String currentState, String nextState, String sessionId,
-      String stateModelDefName, String stateModelFactoryName, int bucketSize) {
-    MessageId uuid = MessageId.from(UUID.randomUUID().toString());
-    Message message = new Message(MessageType.STATE_TRANSITION, uuid);
-    message.setSrcName(manager.getInstanceName());
-    message.setTgtName(instanceName);
-    message.setMsgState(MessageState.NEW);
-    message.setPartitionId(PartitionId.from(partitionName));
-    message.setResourceId(ResourceId.from(resourceName));
-    message.setFromState(State.from(currentState));
-    message.setToState(State.from(nextState));
-    message.setTgtSessionId(SessionId.from(sessionId));
-    message.setSrcSessionId(SessionId.from(manager.getSessionId()));
-    message.setStateModelDef(StateModelDefId.from(stateModelDefName));
-    message.setStateModelFactoryName(stateModelFactoryName);
-    message.setBucketSize(bucketSize);
-
-    return message;
-  }
-}