You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:40 UTC

[11/47] Refactoring from com.linkedin.helix to org.apache.helix

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/HealthDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/HealthDataCache.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/HealthDataCache.java
deleted file mode 100644
index 1fdc05d..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/HealthDataCache.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixProperty;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.model.AlertStatus;
-import com.linkedin.helix.model.Alerts;
-import com.linkedin.helix.model.HealthStat;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.PersistentStats;
-
-public class HealthDataCache
-{
-  Map<String, LiveInstance> _liveInstanceMap;
-
-  Map<String, Map<String, HealthStat>> _healthStatMap;
-  HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
-  PersistentStats _persistentStats;
-  Alerts _alerts;
-  AlertStatus _alertStatus;
-
-  public boolean refresh(DataAccessor accessor)
-  {
-    _liveInstanceMap = accessor.getChildValuesMap(LiveInstance.class,
-        PropertyType.LIVEINSTANCES);
-
-    Map<String, Map<String, HealthStat>> hsMap = new HashMap<String, Map<String, HealthStat>>();
-
-    for (String instanceName : _liveInstanceMap.keySet())
-    {
-      // xxx clearly getting znodes for the instance here...so get the
-      // timestamp!
-
-      hsMap.put(instanceName, accessor.getChildValuesMap(HealthStat.class,
-          PropertyType.HEALTHREPORT, instanceName));
-    }
-    _healthStatMap = Collections.unmodifiableMap(hsMap);
-    _persistentStats = accessor.getProperty(PersistentStats.class,
-        PropertyType.PERSISTENTSTATS);
-    _alerts = accessor.getProperty(Alerts.class, PropertyType.ALERTS);
-    _alertStatus = accessor.getProperty(AlertStatus.class,
-        PropertyType.ALERT_STATUS);
-
-    return true;
-  }
-
-  public HealthStat getGlobalStats()
-  {
-    return _globalStats;
-  }
-
-  public PersistentStats getPersistentStats()
-  {
-    return _persistentStats;
-  }
-
-  public Alerts getAlerts()
-  {
-    return _alerts;
-  }
-
-  public AlertStatus getAlertStatus()
-  {
-    return _alertStatus;
-  }
-
-  public Map<String, HealthStat> getHealthStats(String instanceName)
-  {
-    Map<String, HealthStat> map = _healthStatMap.get(instanceName);
-    if (map != null)
-    {
-      return map;
-    } else
-    {
-      return Collections.emptyMap();
-    }
-  }
-
-  public Map<String, LiveInstance> getLiveInstances()
-  {
-    return _liveInstanceMap;
-  }
-
-  public boolean refresh(HelixDataAccessor accessor)
-  {
-    Builder keyBuilder = accessor.keyBuilder();
-    _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
-
-    Map<String, Map<String, HealthStat>> hsMap = new HashMap<String, Map<String, HealthStat>>();
-
-    for (String instanceName : _liveInstanceMap.keySet())
-    {
-      // xxx clearly getting znodes for the instance here...so get the
-      // timestamp!
-
-      Map<String, HealthStat> childValuesMap = accessor
-          .getChildValuesMap(keyBuilder.healthReports(instanceName));
-      hsMap.put(instanceName, childValuesMap);
-    }
-    _healthStatMap = Collections.unmodifiableMap(hsMap);
-    _persistentStats = accessor.getProperty(keyBuilder.persistantStat());
-    _alerts = accessor.getProperty(keyBuilder.alerts());
-    _alertStatus = accessor.getProperty(keyBuilder.alertStatus());
-
-    return true;
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageGenerationOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageGenerationOutput.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageGenerationOutput.java
deleted file mode 100644
index 61b8319..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageGenerationOutput.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Partition;
-
-public class MessageGenerationOutput
-{
-
-  private final Map<String, Map<Partition, List<Message>>> _messagesMap;
-
-  public MessageGenerationOutput()
-  {
-    _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
-
-  }
-
-  public void addMessage(String resourceName, Partition resource,
-      Message message)
-  {
-    if (!_messagesMap.containsKey(resourceName))
-    {
-      _messagesMap.put(resourceName,
-          new HashMap<Partition, List<Message>>());
-    }
-    if (!_messagesMap.get(resourceName).containsKey(resource))
-    {
-      _messagesMap.get(resourceName).put(resource,
-          new ArrayList<Message>());
-
-    }
-    _messagesMap.get(resourceName).get(resource).add(message);
-
-  }
-
-  public List<Message> getMessages(String resourceName,
-      Partition resource)
-  {
-    Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
-    if (map != null)
-    {
-      return map.get(resource);
-    }
-    return Collections.emptyList();
-
-  }
-  
-  @Override
-  public String toString()
-  {
-    return _messagesMap.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageGenerationPhase.java
deleted file mode 100644
index 1ffc14f..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageGenerationPhase.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageState;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.model.Partition;
-import com.linkedin.helix.model.Resource;
-import com.linkedin.helix.model.StateModelDefinition;
-
-/**
- * Compares the currentState,pendingState with IdealState and generate messages
- * 
- * @author kgopalak
- * 
- */
-public class MessageGenerationPhase extends AbstractBaseStage
-{
-  private static Logger logger = Logger.getLogger(MessageGenerationPhase.class);
-
-  @Override
-  public void process(ClusterEvent event) throws Exception
-  {
-    HelixManager manager = event.getAttribute("helixmanager");
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-    CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE
-        .toString());
-    BestPossibleStateOutput bestPossibleStateOutput = event
-        .getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
-    if (manager == null || cache == null || resourceMap == null || currentStateOutput == null
-        || bestPossibleStateOutput == null)
-    {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
-    }
-
-    Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-    Map<String, String> sessionIdMap = new HashMap<String, String>();
-
-    for (LiveInstance liveInstance : liveInstances.values())
-    {
-      sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
-    }
-    MessageGenerationOutput output = new MessageGenerationOutput();
-
-    for (String resourceName : resourceMap.keySet())
-    {
-      Resource resource = resourceMap.get(resourceName);
-      int bucketSize = resource.getBucketSize();
-
-      StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
-
-      for (Partition partition : resource.getPartitions())
-      {
-        Map<String, String> instanceStateMap = bestPossibleStateOutput.getInstanceStateMap(
-            resourceName, partition);
-
-        for (String instanceName : instanceStateMap.keySet())
-        {
-          String desiredState = instanceStateMap.get(instanceName);
-
-          String currentState = currentStateOutput.getCurrentState(resourceName, partition,
-              instanceName);
-          if (currentState == null)
-          {
-            currentState = stateModelDef.getInitialState();
-          }
-
-          if (desiredState.equalsIgnoreCase(currentState))
-          {
-            continue;
-          }
-
-          String pendingState = currentStateOutput.getPendingState(resourceName, partition,
-              instanceName);
-
-          String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
-          if (nextState == null)
-          {
-            logger.error("Unable to find a next state for partition: "
-                + partition.getPartitionName() + " from stateModelDefinition"
-                + stateModelDef.getClass() + " from:" + currentState + " to:" + desiredState);
-            continue;
-          }
-
-          if (pendingState != null)
-          {
-            if (nextState.equalsIgnoreCase(pendingState))
-            {
-              logger.debug("Message already exists for " + instanceName + " to transit "
-                  + partition.getPartitionName() + " from " + currentState + " to " + nextState);
-            } else if (currentState.equalsIgnoreCase(pendingState))
-            {
-              logger.info("Message hasn't been removed for " + instanceName + " to transit"
-                  + partition.getPartitionName() + " to " + pendingState + ", desiredState: "
-                  + desiredState);
-            } else
-            {
-              logger.info("IdealState changed before state transition completes for "
-                  + partition.getPartitionName() + " on " + instanceName + ", pendingState: "
-                  + pendingState + ", currentState: " + currentState + ", nextState: " + nextState);
-            }
-          } else
-          {
-            Message message = createMessage(manager, resourceName, partition.getPartitionName(),
-                instanceName, currentState, nextState, sessionIdMap.get(instanceName),
-                stateModelDef.getId(), resource.getStateModelFactoryname(), bucketSize);
-            IdealState idealState = cache.getIdealState(resourceName);
-            // Set timeout of needed
-            String stateTransition = currentState + "-" + nextState + "_"
-                + Message.Attributes.TIMEOUT;
-            if (idealState != null
-                && idealState.getRecord().getSimpleField(stateTransition) != null)
-            {
-              try
-              {
-                int timeout = Integer.parseInt(idealState.getRecord().getSimpleField(
-                    stateTransition));
-                if (timeout > 0)
-                {
-                  message.setExecutionTimeout(timeout);
-                }
-              } catch (Exception e)
-              {
-                logger.error("", e);
-              }
-            }
-            message.getRecord().setSimpleField("ClusterEventName", event.getName());
-            output.addMessage(resourceName, partition, message);
-          }
-        }
-      }
-    }
-    event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
-  }
-
-  private Message createMessage(HelixManager manager, String resourceName, String partitionName,
-      String instanceName, String currentState, String nextState, String sessionId,
-      String stateModelDefName, String stateModelFactoryName, int bucketSize)
-  {
-    String uuid = UUID.randomUUID().toString();
-    Message message = new Message(MessageType.STATE_TRANSITION, uuid);
-    message.setSrcName(manager.getInstanceName());
-    message.setTgtName(instanceName);
-    message.setMsgState(MessageState.NEW);
-    message.setPartitionName(partitionName);
-    message.setResourceName(resourceName);
-    message.setFromState(currentState);
-    message.setToState(nextState);
-    message.setTgtSessionId(sessionId);
-    message.setSrcSessionId(manager.getSessionId());
-    message.setStateModelDef(stateModelDefName);
-    message.setStateModelFactoryName(stateModelFactoryName);
-    message.setBucketSize(bucketSize);
-
-    return message;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageSelectionStage.java
deleted file mode 100644
index f510d75..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageSelectionStage.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Partition;
-import com.linkedin.helix.model.Resource;
-import com.linkedin.helix.model.StateModelDefinition;
-
-public class MessageSelectionStage extends AbstractBaseStage
-{
-  private static final Logger LOG = Logger.getLogger(MessageSelectionStage.class);
-
-  static class Bounds
-  {
-    private int upper;
-    private int lower;
-
-    public Bounds(int lower, int upper)
-    {
-      this.lower = lower;
-      this.upper = upper;
-    }
-
-    public void increaseUpperBound()
-    {
-      upper++;
-    }
-
-    public void increaseLowerBound()
-    {
-      lower++;
-    }
-
-    public void decreaseUpperBound()
-    {
-      upper--;
-    }
-
-    public void decreaseLowerBound()
-    {
-      lower--;
-    }
-
-    public int getLowerBound()
-    {
-      return lower;
-    }
-
-    public int getUpperBound()
-    {
-      return upper;
-    }
-  }
-
-  @Override
-  public void process(ClusterEvent event) throws Exception
-  {
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    Map<String, Resource> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
-    CurrentStateOutput currentStateOutput =
-        event.getAttribute(AttributeName.CURRENT_STATE.toString());
-    MessageGenerationOutput messageGenOutput =
-        event.getAttribute(AttributeName.MESSAGES_ALL.toString());
-    if (cache == null || resourceMap == null || currentStateOutput == null
-        || messageGenOutput == null)
-    {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
-    }
-
-    MessageSelectionStageOutput output = new MessageSelectionStageOutput();
-
-    for (String resourceName : resourceMap.keySet())
-    {
-      Resource resource = resourceMap.get(resourceName);
-      StateModelDefinition stateModelDef =
-          cache.getStateModelDef(resource.getStateModelDefRef());
-
-      Map<String, Integer> stateTransitionPriorities =
-          getStateTransitionPriorityMap(stateModelDef);
-      IdealState idealState = cache.getIdealState(resourceName);
-      Map<String, Bounds> stateConstraints =
-          computeStateConstraints(stateModelDef, idealState, cache);
-
-      for (Partition partition : resource.getPartitions())
-      {
-        List<Message> messages = messageGenOutput.getMessages(resourceName, partition);
-        List<Message> selectedMessages =
-            selectMessages(cache.getLiveInstances(),
-                           currentStateOutput.getCurrentStateMap(resourceName, partition),
-                           currentStateOutput.getPendingStateMap(resourceName, partition),
-                           messages,
-                           stateConstraints,
-                           stateTransitionPriorities,
-                           stateModelDef.getInitialState());
-        output.addMessages(resourceName, partition, selectedMessages);
-      }
-    }
-    event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
-  }
-
-  // TODO: This method deserves its own class. The class should not understand helix but
-  // just be
-  // able to solve the problem using the algo. I think the method is following that but if
-  // we don't move it to another class its quite easy to break that contract
-  /**
-   * greedy message selection algorithm: 1) calculate CS+PS state lower/upper-bounds 2)
-   * group messages by state transition and sorted by priority 3) from highest priority to
-   * lowest, for each message group with the same transition add message one by one and
-   * make sure state constraint is not violated update state lower/upper-bounds when a new
-   * message is selected
-   *
-   * @param currentStates
-   * @param pendingStates
-   * @param messages
-   * @param stateConstraints
-   *          : STATE -> bound (lower:upper)
-   * @param stateTransitionPriorities
-   *          : FROME_STATE-TO_STATE -> priority
-   * @return: selected messages
-   */
-  List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
-                               Map<String, String> currentStates,
-                               Map<String, String> pendingStates,
-                               List<Message> messages,
-                               Map<String, Bounds> stateConstraints,
-                               final Map<String, Integer> stateTransitionPriorities,
-                               String initialState)
-  {
-    if (messages == null || messages.isEmpty())
-    {
-      return Collections.emptyList();
-    }
-
-    List<Message> selectedMessages = new ArrayList<Message>();
-    Map<String, Bounds> bounds = new HashMap<String, Bounds>();
-
-    // count currentState, if no currentState, count as in initialState
-    for (String instance : liveInstances.keySet())
-    {
-      String state = initialState;
-      if (currentStates.containsKey(instance))
-      {
-        state = currentStates.get(instance);
-      }
-
-      if (!bounds.containsKey(state))
-      {
-        bounds.put(state, new Bounds(0, 0));
-      }
-      bounds.get(state).increaseLowerBound();
-      bounds.get(state).increaseUpperBound();
-    }
-
-    // count pendingStates
-    for (String instance : pendingStates.keySet())
-    {
-      String state = pendingStates.get(instance);
-      if (!bounds.containsKey(state))
-      {
-        bounds.put(state, new Bounds(0, 0));
-      }
-      // TODO: add lower bound, need to refactor pendingState to include fromState also
-      bounds.get(state).increaseUpperBound();
-    }
-
-    // group messages based on state transition priority
-    Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
-        new TreeMap<Integer, List<Message>>();
-    for (Message message : messages)
-    {
-      String fromState = message.getFromState();
-      String toState = message.getToState();
-      String transition = fromState + "-" + toState;
-      int priority = Integer.MAX_VALUE;
-
-      if (stateTransitionPriorities.containsKey(transition))
-      {
-        priority = stateTransitionPriorities.get(transition);
-      }
-
-      if (!messagesGroupByStateTransitPriority.containsKey(priority))
-      {
-        messagesGroupByStateTransitPriority.put(priority, new ArrayList<Message>());
-      }
-      messagesGroupByStateTransitPriority.get(priority).add(message);
-    }
-
-    // select messages
-    for (List<Message> messageList : messagesGroupByStateTransitPriority.values())
-    {
-      for (Message message : messageList)
-      {
-        String fromState = message.getFromState();
-        String toState = message.getToState();
-
-        if (!bounds.containsKey(fromState))
-        {
-          LOG.error("Message's fromState is not in currentState. message: " + message);
-          continue;
-        }
-
-        if (!bounds.containsKey(toState))
-        {
-          bounds.put(toState, new Bounds(0, 0));
-        }
-
-        // check lower bound of fromState
-        if (stateConstraints.containsKey(fromState))
-        {
-          int newLowerBound = bounds.get(fromState).getLowerBound() - 1;
-          if (newLowerBound < 0)
-          {
-            LOG.error("Number of currentState in " + fromState
-                + " is less than number of messages transiting from " + fromState);
-            continue;
-          }
-
-          if (newLowerBound < stateConstraints.get(fromState).getLowerBound())
-          {
-            continue;
-          }
-        }
-
-        // check upper bound of toState
-        if (stateConstraints.containsKey(toState))
-        {
-          int newUpperBound = bounds.get(toState).getUpperBound() + 1;
-          if (newUpperBound > stateConstraints.get(toState).getUpperBound())
-          {
-            continue;
-          }
-        }
-
-        selectedMessages.add(message);
-        bounds.get(fromState).increaseLowerBound();
-        bounds.get(toState).increaseUpperBound();
-      }
-    }
-
-    return selectedMessages;
-  }
-
-  /**
-   * TODO: This code is duplicate in multiple places. Can we do it in to one place in the
-   * beginning and compute the stateConstraint instance once and re use at other places.
-   * Each IdealState must have a constraint object associated with it
-   */
-  private Map<String, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
-                                                      IdealState idealState,
-                                                      ClusterDataCache cache)
-  {
-    Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
-
-    List<String> statePriorityList = stateModelDefinition.getStatesPriorityList();
-    for (String state : statePriorityList)
-    {
-      String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state);
-      int max = -1;
-      if ("N".equals(numInstancesPerState))
-      {
-        max = cache.getLiveInstances().size();
-      }
-      else if ("R".equals(numInstancesPerState))
-      {
-        // idealState is null when resource has been dropped,
-        // R can't be evaluated and ignore state constraints
-        if (idealState != null)
-        {
-          max = cache.getReplicas(idealState.getResourceName());
-        }
-      }
-      else
-      {
-        try
-        {
-          max = Integer.parseInt(numInstancesPerState);
-        }
-        catch (Exception e)
-        {
-          // use -1
-        }
-      }
-
-      if (max > -1)
-      {
-        // if state has no constraint, will not put in map
-        stateConstraints.put(state, new Bounds(0, max));
-      }
-    }
-
-    return stateConstraints;
-  }
-
-  // TODO: if state transition priority is not provided then use lexicographical sorting
-  // so that behavior is consistent
-  private Map<String, Integer> getStateTransitionPriorityMap(StateModelDefinition stateModelDef)
-  {
-    Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
-    List<String> stateTransitionPriorityList =
-        stateModelDef.getStateTransitionPriorityList();
-    for (int i = 0; i < stateTransitionPriorityList.size(); i++)
-    {
-      stateTransitionPriorities.put(stateTransitionPriorityList.get(i), i);
-    }
-
-    return stateTransitionPriorities;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageSelectionStageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageSelectionStageOutput.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageSelectionStageOutput.java
deleted file mode 100644
index 5e91905..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageSelectionStageOutput.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Partition;
-
-public class MessageSelectionStageOutput
-{
-  private final Map<String, Map<Partition, List<Message>>> _messagesMap;
-
-  public MessageSelectionStageOutput()
-  {
-    _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
-  }
-
-  public void addMessages(String resourceName, Partition partition,
-      List<Message> selectedMessages)
-  {
-    if (!_messagesMap.containsKey(resourceName))
-    {
-      _messagesMap.put(resourceName,
-          new HashMap<Partition, List<Message>>());
-    }
-    _messagesMap.get(resourceName).put(partition, selectedMessages);
-
-  }
-
-  public List<Message> getMessages(String resourceName,
-      Partition partition)
-  {
-    Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
-    if (map != null)
-    {
-      return map.get(partition);
-    }
-    return Collections.emptyList();
-
-  }
-
-  @Override
-  public String toString()
-  {
-    return _messagesMap.toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageThrottleStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageThrottleStage.java
deleted file mode 100644
index 8f19bcb..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageThrottleStage.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.model.ClusterConstraints;
-import com.linkedin.helix.model.ClusterConstraints.ConstraintAttribute;
-import com.linkedin.helix.model.ClusterConstraints.ConstraintItem;
-import com.linkedin.helix.model.ClusterConstraints.ConstraintType;
-import com.linkedin.helix.model.ClusterConstraints.ConstraintValue;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Partition;
-import com.linkedin.helix.model.Resource;
-
-public class MessageThrottleStage extends AbstractBaseStage
-{
-  private static final Logger LOG =
-                                      Logger.getLogger(MessageThrottleStage.class.getName());
-
-  int valueOf(String valueStr)
-  {
-    int value = Integer.MAX_VALUE;
-
-    try
-    {
-      ConstraintValue valueToken = ConstraintValue.valueOf(valueStr);
-      switch (valueToken)
-      {
-      case ANY:
-        value = Integer.MAX_VALUE;
-        break;
-      default:
-        LOG.error("Invalid constraintValue token:" + valueStr + ". Use default value:"
-            + Integer.MAX_VALUE);
-        break;
-      }
-    }
-    catch (Exception e)
-    {
-      try
-      {
-        value = Integer.parseInt(valueStr);
-      }
-      catch (NumberFormatException ne)
-      {
-        LOG.error("Invalid constraintValue string:" + valueStr + ". Use default value:"
-            + Integer.MAX_VALUE);
-      }
-    }
-    return value;
-  }
-
-  /**
-   * constraints are selected in the order of the following rules: 1) don't select
-   * constraints with CONSTRAINT_VALUE=ANY; 2) if one constraint is more specific than the
-   * other, select the most specific one 3) if a message matches multiple constraints of
-   * incomparable specificity, select the one with the minimum value 4) if a message
-   * matches multiple constraints of incomparable specificity, and they all have the same
-   * value, select the first in alphabetic order
-   */
-  Set<ConstraintItem> selectConstraints(Set<ConstraintItem> items,
-                                        Map<ConstraintAttribute, String> attributes)
-  {
-    Map<String, ConstraintItem> selectedItems = new HashMap<String, ConstraintItem>();
-    for (ConstraintItem item : items)
-    {
-      // don't select constraints with CONSTRAINT_VALUE=ANY
-      if (item.getConstraintValue().equals(ConstraintValue.ANY.toString()))
-      {
-        continue;
-      }
-
-      String key = item.filter(attributes).toString();
-      if (!selectedItems.containsKey(key))
-      {
-        selectedItems.put(key, item);
-      }
-      else
-      {
-        ConstraintItem existingItem = selectedItems.get(key);
-        if (existingItem.match(item.getAttributes()))
-        {
-          // item is more specific than existingItem
-          selectedItems.put(key, item);
-        }
-        else if (!item.match(existingItem.getAttributes()))
-        {
-          // existingItem and item are of incomparable specificity
-          int value = valueOf(item.getConstraintValue());
-          int existingValue = valueOf(existingItem.getConstraintValue());
-          if (value < existingValue)
-          {
-            // item's constraint value is less than that of existingItem
-            selectedItems.put(key, item);
-          }
-          else if (value == existingValue)
-          {
-            if (item.toString().compareTo(existingItem.toString()) < 0)
-            {
-              // item is ahead of existingItem in alphabetic order
-              selectedItems.put(key, item);
-            }
-          }
-        }
-      }
-    }
-    return new HashSet<ConstraintItem>(selectedItems.values());
-  }
-
-  @Override
-  public void process(ClusterEvent event) throws Exception
-  {
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    MessageSelectionStageOutput msgSelectionOutput =
-        event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
-    Map<String, Resource> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
-
-    if (cache == null || resourceMap == null || msgSelectionOutput == null)
-    {
-      throw new StageException("Missing attributes in event: " + event
-          + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
-    }
-
-    MessageThrottleStageOutput output = new MessageThrottleStageOutput();
-
-        ClusterConstraints constraint = cache.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
-    Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
-
-    if (constraint != null)
-    {
-      // go through all pending messages, they should be counted but not throttled
-      for (String instance : cache.getLiveInstances().keySet())
-      {
-        throttle(throttleCounterMap,
-                 constraint,
-                 new ArrayList<Message>(cache.getMessages(instance).values()),
-                 false);
-      }
-    }
-
-    // go through all new messages, throttle if necessary
-    // assume messages should be sorted by state transition priority in messageSelection stage
-    for (String resourceName : resourceMap.keySet())
-    {
-      Resource resource = resourceMap.get(resourceName);
-      for (Partition partition : resource.getPartitions())
-      {
-        List<Message> messages = msgSelectionOutput.getMessages(resourceName, partition);
-        if (constraint != null && messages != null && messages.size() > 0)
-        {
-          messages = throttle(throttleCounterMap, constraint, messages, true);
-        }
-        output.addMessages(resourceName, partition, messages);
-      }
-    }
-
-    event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output);
-  }
-
-  private List<Message> throttle(Map<String, Integer> throttleMap,
-                                 ClusterConstraints constraint,
-                                 List<Message> messages,
-                                 final boolean needThrottle)
-  {
-  
-    List<Message> throttleOutputMsgs = new ArrayList<Message>();
-    for (Message message : messages)
-    {
-      Map<ConstraintAttribute, String> msgAttr = ClusterConstraints.toConstraintAttributes(message);
-
-      Set<ConstraintItem> matches = constraint.match(msgAttr);
-      matches = selectConstraints(matches, msgAttr);
-
-      boolean msgThrottled = false;
-      for (ConstraintItem item : matches)
-      {
-        String key = item.filter(msgAttr).toString();
-        if (!throttleMap.containsKey(key))
-        {
-          throttleMap.put(key, valueOf(item.getConstraintValue()));
-        }
-        int value = throttleMap.get(key);
-        throttleMap.put(key, --value);
-
-        if (needThrottle && value < 0)
-        {
-          msgThrottled = true;
-          
-          if (LOG.isDebugEnabled())
-          {
-            // TODO: printout constraint item that throttles the message
-            LOG.debug("message: " + message + " is throttled by constraint: " + item);
-          }
-        }
-      }
-
-      if (!msgThrottled)
-      {
-        throttleOutputMsgs.add(message);
-      }
-    }
-
-    return throttleOutputMsgs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageThrottleStageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageThrottleStageOutput.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageThrottleStageOutput.java
deleted file mode 100644
index 73a5d81..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageThrottleStageOutput.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Partition;
-
-public class MessageThrottleStageOutput
-{
-  private final Map<String, Map<Partition, List<Message>>> _messagesMap;
-
-  public MessageThrottleStageOutput()
-  {
-    _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
-  }
-
-  public void addMessages(String resourceName,
-                          Partition partition,
-                          List<Message> selectedMessages)
-  {
-    if (!_messagesMap.containsKey(resourceName))
-    {
-      _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>());
-    }
-    _messagesMap.get(resourceName).put(partition, selectedMessages);
-
-  }
-
-  public List<Message> getMessages(String resourceName, Partition partition)
-  {
-    Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
-    if (map != null)
-    {
-      return map.get(partition);
-    }
-    return Collections.emptyList();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/ReadClusterDataStage.java
deleted file mode 100644
index fb22100..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ReadClusterDataStage.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.model.InstanceConfig;
-import com.linkedin.helix.monitoring.mbeans.ClusterStatusMonitor;
-
-public class ReadClusterDataStage extends AbstractBaseStage
-{
-  private static final Logger logger = Logger
-      .getLogger(ReadClusterDataStage.class.getName());
-  ClusterDataCache _cache;
-
-  public ReadClusterDataStage()
-  {
-    _cache = new ClusterDataCache();
-  }
-
-  @Override
-  public void process(ClusterEvent event) throws Exception
-  {
-    long startTime = System.currentTimeMillis();
-    logger.info("START ReadClusterDataStage.process()");
-
-    
-    HelixManager manager = event.getAttribute("helixmanager");
-    if (manager == null)
-    {
-      throw new StageException("HelixManager attribute value is null");
-    }
-    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
-    _cache.refresh(dataAccessor);
-    
-    ClusterStatusMonitor clusterStatusMonitor = (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
-    if(clusterStatusMonitor != null)
-    {
-      int disabledInstances = 0;
-      int disabledPartitions = 0;
-      for(InstanceConfig  config : _cache._instanceConfigMap.values())
-      {
-        if(config.getInstanceEnabled() == false)
-        {
-          disabledInstances ++;
-        }
-        if(config.getDisabledPartitionMap() != null)
-        {
-          disabledPartitions += config.getDisabledPartitionMap().size();
-        }
-      }
-      clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(), _cache._instanceConfigMap.size(), 
-          disabledInstances, disabledPartitions);
-    }
-
-    event.addAttribute("ClusterDataCache", _cache);
-    
-    long endTime = System.currentTimeMillis();
-    logger.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/ReadHealthDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ReadHealthDataStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/ReadHealthDataStage.java
deleted file mode 100644
index f6139e7..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ReadHealthDataStage.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-
-public class ReadHealthDataStage extends AbstractBaseStage
-{
-  private static final Logger LOG = Logger.getLogger(ReadHealthDataStage.class.getName());
-  HealthDataCache _cache;
-
-  public ReadHealthDataStage()
-  {
-    _cache = new HealthDataCache();
-  }
-
-  @Override
-  public void process(ClusterEvent event) throws Exception
-  {
-    long startTime = System.currentTimeMillis();
-
-    HelixManager manager = event.getAttribute("helixmanager");
-    if (manager == null)
-    {
-      throw new StageException("HelixManager attribute value is null");
-    }
-    // DataAccessor dataAccessor = manager.getDataAccessor();
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    _cache.refresh(accessor);
-
-    event.addAttribute("HealthDataCache", _cache);
-
-    long processLatency = System.currentTimeMillis() - startTime;
-    addLatencyToMonitor(event, processLatency);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/ResourceComputationStage.java
deleted file mode 100644
index 7980ac7..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ResourceComputationStage.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Resource;
-
-/**
- * This stage computes all the resources in a cluster. The resources are
- * computed from IdealStates -> this gives all the resources currently active
- * CurrentState for liveInstance-> Helps in finding resources that are inactive
- * and needs to be dropped
- *
- * @author kgopalak
- *
- */
-public class ResourceComputationStage extends AbstractBaseStage
-{
-  private static Logger LOG = Logger.getLogger(ResourceComputationStage.class);
-
-  @Override
-  public void process(ClusterEvent event) throws Exception
-  {
-    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-    if (cache == null)
-    {
-      throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
-    }
-
-    Map<String, IdealState> idealStates = cache.getIdealStates();
-
-    Map<String, Resource> resourceMap = new LinkedHashMap<String, Resource>();
-
-    if (idealStates != null && idealStates.size() > 0)
-    {
-      for (IdealState idealState : idealStates.values())
-      {
-        Set<String> partitionSet = idealState.getPartitionSet();
-        String resourceName = idealState.getResourceName();
-
-        for (String partition : partitionSet)
-        {
-          addPartition(partition, resourceName, resourceMap);
-          Resource resource = resourceMap.get(resourceName);
-          resource.setStateModelDefRef(idealState.getStateModelDefRef());
-          resource.setStateModelFactoryName(idealState.getStateModelFactoryName());
-          resource.setBucketSize(idealState.getBucketSize());
-          resource.setGroupMessageMode(idealState.getGroupMessageMode());
-        }
-      }
-    }
-
-    // It's important to get partitions from CurrentState as well since the
-    // idealState might be removed.
-    Map<String, LiveInstance> availableInstances = cache.getLiveInstances();
-
-    if (availableInstances != null && availableInstances.size() > 0)
-    {
-      for (LiveInstance instance : availableInstances.values())
-      {
-        String instanceName = instance.getInstanceName();
-        String clientSessionId = instance.getSessionId();
-
-        Map<String, CurrentState> currentStateMap = cache.getCurrentState(instanceName,
-            clientSessionId);
-        if (currentStateMap == null || currentStateMap.size() == 0)
-        {
-          continue;
-        }
-        for (CurrentState currentState : currentStateMap.values())
-        {
-
-          String resourceName = currentState.getResourceName();
-          Map<String, String> resourceStateMap = currentState.getPartitionStateMap();
-
-          // don't overwrite ideal state settings
-          if (!resourceMap.containsKey(resourceName))
-          {
-            addResource(resourceName, resourceMap);
-            Resource resource = resourceMap.get(resourceName);
-            resource.setStateModelDefRef(currentState.getStateModelDefRef());
-            resource.setStateModelFactoryName(currentState.getStateModelFactoryName());
-            resource.setBucketSize(currentState.getBucketSize());
-            resource.setGroupMessageMode(currentState.getGroupMessageMode());
-          }
-          
-          if (currentState.getStateModelDefRef() == null)
-          {
-            LOG.error("state model def is null." + "resource:" + currentState.getResourceName()
-                + ", partitions: " + currentState.getPartitionStateMap().keySet() + ", states: "
-                + currentState.getPartitionStateMap().values());
-            throw new StageException("State model def is null for resource:"
-                + currentState.getResourceName());
-          }
-
-          for (String partition : resourceStateMap.keySet())
-          {
-            addPartition(partition, resourceName, resourceMap);
-          }
-        }
-      }
-    }
-
-    event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
-  }
-
-  private void addResource(String resource, Map<String, Resource> resourceMap)
-  {
-    if (resource == null || resourceMap == null)
-    {
-      return;
-    }
-    if (!resourceMap.containsKey(resource))
-    {
-      resourceMap.put(resource, new Resource(resource));
-    }
-  }
-
-  private void addPartition(String partition, String resourceName, Map<String, Resource> resourceMap)
-  {
-    if (resourceName == null || partition == null || resourceMap == null)
-    {
-      return;
-    }
-    if (!resourceMap.containsKey(resourceName))
-    {
-      resourceMap.put(resourceName, new Resource(resourceName));
-    }
-    Resource resource = resourceMap.get(resourceName);
-    resource.addPartition(partition);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/StatsAggregationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/StatsAggregationStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/StatsAggregationStage.java
deleted file mode 100644
index 0e8a5ea..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/StatsAggregationStage.java
+++ /dev/null
@@ -1,457 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixProperty;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.alerts.AlertParser;
-import com.linkedin.helix.alerts.AlertProcessor;
-import com.linkedin.helix.alerts.AlertValueAndStatus;
-import com.linkedin.helix.alerts.AlertsHolder;
-import com.linkedin.helix.alerts.ExpressionParser;
-import com.linkedin.helix.alerts.StatsHolder;
-import com.linkedin.helix.alerts.Tuple;
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageContext;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.healthcheck.StatHealthReportProvider;
-import com.linkedin.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory.ActionOnError;
-import com.linkedin.helix.model.AlertHistory;
-import com.linkedin.helix.model.HealthStat;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.PersistentStats;
-import com.linkedin.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
-
-/**
- * For each LiveInstances select currentState and message whose sessionId matches
- * sessionId from LiveInstance Get Partition,State for all the resources computed in
- * previous State [ResourceComputationStage]
- *
- * @author asilbers
- *
- */
-public class StatsAggregationStage extends AbstractBaseStage
-{
-
-  public static final int ALERT_HISTORY_SIZE = 30;
-
-  private static final Logger logger =
-      Logger.getLogger(StatsAggregationStage.class.getName());
-
-  StatsHolder _statsHolder = null;
-  AlertsHolder _alertsHolder = null;
-  Map<String, Map<String, AlertValueAndStatus>> _alertStatus;
-  Map<String, Tuple<String>> _statStatus;
-  ClusterAlertMBeanCollection _alertBeanCollection = new ClusterAlertMBeanCollection();
-  Map<String, String> _alertActionTaken = new HashMap<String, String>();
-
-  public final String PARTICIPANT_STAT_REPORT_NAME = StatHealthReportProvider.REPORT_NAME;
-  public final String ESPRESSO_STAT_REPORT_NAME = "RestQueryStats";
-  public final String REPORT_NAME = "AggStats";
-  // public final String DEFAULT_AGG_TYPE = "decay";
-  // public final String DEFAULT_DECAY_PARAM = "0.1";
-  // public final String DEFAULT_AGG_TYPE = "window";
-  // public final String DEFAULT_DECAY_PARAM = "5";
-
-  public StatHealthReportProvider _aggStatsProvider;
-
-  // public AggregationType _defaultAggType;
-
-  public Map<String, Map<String, AlertValueAndStatus>> getAlertStatus()
-  {
-    return _alertStatus;
-  }
-
-  public Map<String, Tuple<String>> getStatStatus()
-  {
-    return _statStatus;
-  }
-
-  public void persistAggStats(HelixManager manager)
-  {
-    Map<String, String> report = _aggStatsProvider.getRecentHealthReport();
-    Map<String, Map<String, String>> partitionReport =
-        _aggStatsProvider.getRecentPartitionHealthReport();
-    ZNRecord record = new ZNRecord(_aggStatsProvider.getReportName());
-    if (report != null)
-    {
-      record.setSimpleFields(report);
-    }
-    if (partitionReport != null)
-    {
-      record.setMapFields(partitionReport);
-    }
-
-//    DataAccessor accessor = manager.getDataAccessor();
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-//    boolean retVal = accessor.setProperty(PropertyType.PERSISTENTSTATS, record);
-    Builder keyBuilder = accessor.keyBuilder();
-    boolean retVal = accessor.setProperty(keyBuilder.persistantStat(), new PersistentStats(record));
-    if (retVal == false)
-    {
-      logger.error("attempt to persist derived stats failed");
-    }
-  }
-
-  @Override
-  public void init(StageContext context)
-  {
-  }
-
-  public String getAgeStatName(String instance)
-  {
-    return instance + ExpressionParser.statFieldDelim + "reportingage";
-  }
-
-  // currTime in seconds
-  public void reportAgeStat(LiveInstance instance, long modifiedTime, long currTime)
-  {
-    String statName = getAgeStatName(instance.getInstanceName());
-    long age = (currTime - modifiedTime) / 1000; // XXX: ensure this is in
-                                                 // seconds
-    Map<String, String> ageStatMap = new HashMap<String, String>();
-    ageStatMap.put(StatsHolder.TIMESTAMP_NAME, String.valueOf(currTime));
-    ageStatMap.put(StatsHolder.VALUE_NAME, String.valueOf(age));
-    // note that applyStat will only work if alert already added
-    _statsHolder.applyStat(statName, ageStatMap);
-  }
-
-  @Override
-  public void process(ClusterEvent event) throws Exception
-  {
-    long startTime = System.currentTimeMillis();
-    // String aggTypeName =
-    // DEFAULT_AGG_TYPE+AggregationType.DELIM+DEFAULT_DECAY_PARAM;
-    // _defaultAggType = AggregationTypeFactory.getAggregationType(aggTypeName);
-
-    HelixManager manager = event.getAttribute("helixmanager");
-    HealthDataCache cache = event.getAttribute("HealthDataCache");
-
-    if (manager == null || cache == null)
-    {
-      throw new StageException("helixmanager|HealthDataCache attribute value is null");
-    }
-    if(_alertsHolder == null)
-    {
-      _statsHolder = new StatsHolder(manager, cache);
-      _alertsHolder = new AlertsHolder(manager, cache, _statsHolder);
-    }
-    else
-    {
-      _statsHolder.updateCache(cache);
-      _alertsHolder.updateCache(cache);
-    }
-    if (_statsHolder.getStatsList().size() == 0)
-    {
-      logger.info("stat holder is empty");
-      return;
-    }
-
-    // init agg stats from cache
-    // initAggStats(cache);
-
-    Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-
-    long currTime = System.currentTimeMillis();
-    // for each live node, read node's stats
-    long readInstancesStart = System.currentTimeMillis();
-    for (LiveInstance instance : liveInstances.values())
-    {
-      String instanceName = instance.getInstanceName();
-      logger.debug("instanceName: " + instanceName);
-      // XXX: now have map of HealthStats, so no need to traverse them...verify
-      // correctness
-      Map<String, HealthStat> stats;
-      stats = cache.getHealthStats(instanceName);
-      // find participants stats
-      long modTime = -1;
-      // TODO: get healthreport child node modified time and reportAgeStat based on that
-      boolean reportedAge = false;
-      for (HealthStat participantStat : stats.values())
-      {
-        if (participantStat != null && !reportedAge)
-        {
-          // generate and report stats for how old this node's report is
-          modTime = participantStat.getLastModifiedTimeStamp();
-          reportAgeStat(instance, modTime, currTime);
-          reportedAge = true;
-        }
-        // System.out.println(modTime);
-        // XXX: need to convert participantStat to a better format
-        // need to get instanceName in here
-
-        if (participantStat != null)
-        {
-          // String timestamp = String.valueOf(instance.getModifiedTime()); WANT
-          // REPORT LEVEL TS
-          Map<String, Map<String, String>> statMap =
-              participantStat.getHealthFields(instanceName);
-          for (String key : statMap.keySet())
-          {
-            _statsHolder.applyStat(key, statMap.get(key));
-          }
-        }
-      }
-    }
-    // Call _statsHolder.persistStats() once per pipeline. This will
-    // write the updated persisted stats into zookeeper
-    _statsHolder.persistStats();
-    logger.info("Done processing stats: "
-        + (System.currentTimeMillis() - readInstancesStart));
-    // populate _statStatus
-    _statStatus = _statsHolder.getStatsMap();
-
-    for (String statKey : _statStatus.keySet())
-    {
-      logger.debug("Stat key, value: " + statKey + ": " + _statStatus.get(statKey));
-    }
-
-    long alertExecuteStartTime = System.currentTimeMillis();
-    // execute alerts, populate _alertStatus
-    _alertStatus =
-        AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(),
-                                        _statsHolder.getStatsList());
-    logger.info("done executing alerts: "
-        + (System.currentTimeMillis() - alertExecuteStartTime));
-    for (String originAlertName : _alertStatus.keySet())
-    {
-      _alertBeanCollection.setAlerts(originAlertName,
-                                     _alertStatus.get(originAlertName),
-                                     manager.getClusterName());
-    }
-
-    executeAlertActions(manager);
-    // Write alert fire history to zookeeper
-    updateAlertHistory(manager);
-    long writeAlertStartTime = System.currentTimeMillis();
-    // write out alert status (to zk)
-    _alertsHolder.addAlertStatusSet(_alertStatus);
-    logger.info("done writing alerts: "
-        + (System.currentTimeMillis() - writeAlertStartTime));
-
-    // TODO: access the 2 status variables from somewhere to populate graphs
-
-    long logAlertStartTime = System.currentTimeMillis();
-    // logging alert status
-    for (String alertOuterKey : _alertStatus.keySet())
-    {
-      logger.debug("Alert Outer Key: " + alertOuterKey);
-      Map<String, AlertValueAndStatus> alertInnerMap = _alertStatus.get(alertOuterKey);
-      if (alertInnerMap == null)
-      {
-        logger.debug(alertOuterKey + " has no alerts to report.");
-        continue;
-      }
-      for (String alertInnerKey : alertInnerMap.keySet())
-      {
-        logger.debug("  " + alertInnerKey + " value: "
-            + alertInnerMap.get(alertInnerKey).getValue() + ", status: "
-            + alertInnerMap.get(alertInnerKey).isFired());
-      }
-    }
-
-    logger.info("done logging alerts: "
-        + (System.currentTimeMillis() - logAlertStartTime));
-
-    long processLatency = System.currentTimeMillis() - startTime;
-    addLatencyToMonitor(event, processLatency);
-    logger.info("process end: " + processLatency);
-  }
-
-  /**
-   * Go through the _alertStatus, and call executeAlertAction for those actual alerts that
-   * has been fired
-   */
-
-  void executeAlertActions( HelixManager manager)
-  {
-    _alertActionTaken.clear();
-    // Go through the original alert strings
-    for(String originAlertName : _alertStatus.keySet())
-    {
-      Map<String, String> alertFields = _alertsHolder.getAlertsMap().get(originAlertName);
-      if(alertFields != null && alertFields.containsKey(AlertParser.ACTION_NAME))
-      {
-        String actionValue = alertFields.get(AlertParser.ACTION_NAME);
-        Map<String, AlertValueAndStatus> alertResultMap = _alertStatus.get(originAlertName);
-        if(alertResultMap == null)
-        {
-          logger.info("Alert "+ originAlertName + " does not have alert status map");
-          continue;
-        }
-        // For each original alert, iterate all actual alerts that it expands into
-        for(String actualStatName : alertResultMap.keySet())
-        {
-          // if the actual alert is fired, execute the action
-          if(alertResultMap.get(actualStatName).isFired())
-          {
-            logger.warn("Alert " + originAlertName + " action " + actionValue + " is triggered by " + actualStatName);
-            _alertActionTaken.put(actualStatName, actionValue);
-            // move functionalities into a seperate class
-            executeAlertAction(actualStatName, actionValue, manager);
-          }
-        }
-      }
-    }
-  }
-  /**
-   * Execute the action if an alert is fired, and the alert has an action associated with it.
-   * NOTE: consider unify this with DefaultParticipantErrorMessageHandler.handleMessage()
-   */
-  void executeAlertAction(String actualStatName, String actionValue, HelixManager manager)
-  {
-    if(actionValue.equals(ActionOnError.DISABLE_INSTANCE.toString()))
-    {
-      String instanceName = parseInstanceName(actualStatName, manager);
-      if(instanceName != null)
-      {
-        logger.info("Disabling instance " + instanceName);
-        manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), instanceName, false);
-      }
-    }
-    else if(actionValue.equals(ActionOnError.DISABLE_PARTITION.toString()))
-    {
-      String instanceName = parseInstanceName(actualStatName, manager);
-      String resourceName = parseResourceName(actualStatName, manager);
-      String partitionName = parsePartitionName(actualStatName, manager);
-      if(instanceName != null && resourceName != null && partitionName != null)
-      {
-        logger.info("Disabling partition " + partitionName + " instanceName " +  instanceName);
-        manager.getClusterManagmentTool().enablePartition(false, manager.getClusterName(), instanceName,
-            resourceName, Arrays.asList(partitionName));
-      }
-    }
-    else if(actionValue.equals(ActionOnError.DISABLE_RESOURCE.toString()))
-    {
-      String instanceName = parseInstanceName(actualStatName, manager);
-      String resourceName = parseResourceName(actualStatName, manager);
-      logger.info("Disabling resource " + resourceName + " instanceName " +  instanceName + " not implemented");
-
-    }
-  }
-
-  public static String parseResourceName(String actualStatName, HelixManager manager)
-  {
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    Builder kb = accessor.keyBuilder();
-    List<IdealState> idealStates = accessor.getChildValues(kb.idealStates());
-    for (IdealState idealState : idealStates)
-    {
-      String resourceName = idealState.getResourceName();
-      if(actualStatName.contains("=" + resourceName + ".") || actualStatName.contains("=" + resourceName + ";"))
-      {
-        return resourceName;
-      }
-    }
-    return null;
-  }
-
-  public static String parsePartitionName(String actualStatName, HelixManager manager)
-  {
-    String resourceName = parseResourceName(actualStatName, manager);
-    if(resourceName != null)
-    {
-      String partitionKey = "=" + resourceName + "_";
-      if(actualStatName.contains(partitionKey))
-      {
-        int pos = actualStatName.indexOf(partitionKey);
-        int nextDotPos = actualStatName.indexOf('.', pos + partitionKey.length());
-        int nextCommaPos = actualStatName.indexOf(';', pos + partitionKey.length());
-        if(nextCommaPos > 0 && nextCommaPos < nextDotPos)
-        {
-          nextDotPos = nextCommaPos;
-        }
-
-        String partitionName = actualStatName.substring(pos + 1, nextDotPos);
-        return partitionName;
-      }
-    }
-    return null;
-  }
-
-  public static String parseInstanceName(String actualStatName, HelixManager manager)
-  {
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    Builder kb = accessor.keyBuilder();
-    List<LiveInstance> liveInstances = accessor.getChildValues(kb.liveInstances());
-    for (LiveInstance instance : liveInstances)
-    {
-      String instanceName = instance.getInstanceName();
-      if(actualStatName.startsWith(instanceName))
-      {
-        return instanceName;
-      }
-    }
-    return null;
-  }
-
-  void updateAlertHistory(HelixManager manager)
-  {
-   // Write alert fire history to zookeeper
-    _alertBeanCollection.refreshAlertDelta(manager.getClusterName());
-    Map<String, String> delta = _alertBeanCollection.getRecentAlertDelta();
-    // Update history only when some beans has changed
-    if(delta.size() > 0)
-    {
-      delta.putAll(_alertActionTaken);
-      SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh:mm:ss:SSS");
-      String date = dateFormat.format(new Date());
-
-      HelixDataAccessor accessor = manager.getHelixDataAccessor();
-      Builder keyBuilder = accessor.keyBuilder();
-
-      HelixProperty property = accessor.getProperty(keyBuilder.alertHistory());
-      ZNRecord alertFiredHistory;
-      if(property == null)
-      {
-        alertFiredHistory = new ZNRecord(PropertyType.ALERT_HISTORY.toString());
-      }
-      else
-      {
-        alertFiredHistory = property.getRecord();
-      }
-      while(alertFiredHistory.getMapFields().size() >= ALERT_HISTORY_SIZE)
-      {
-        // ZNRecord uses TreeMap which is sorted ascending internally
-        String firstKey = (String)(alertFiredHistory.getMapFields().keySet().toArray()[0]);
-        alertFiredHistory.getMapFields().remove(firstKey);
-      }
-      alertFiredHistory.setMapField(date, delta);
-//      manager.getDataAccessor().setProperty(PropertyType.ALERT_HISTORY, alertFiredHistory);
-      accessor.setProperty(keyBuilder.alertHistory(), new AlertHistory(alertFiredHistory));
-      _alertBeanCollection.setAlertHistory(alertFiredHistory);
-    }
-  }
-
-  public ClusterAlertMBeanCollection getClusterAlertMBeanCollection()
-  {
-    return _alertBeanCollection;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/TaskAssignmentStage.java
deleted file mode 100644
index c986cec..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/TaskAssignmentStage.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Partition;
-import com.linkedin.helix.model.Resource;
-
-public class TaskAssignmentStage extends AbstractBaseStage
-{
-  private static Logger logger = Logger.getLogger(TaskAssignmentStage.class);
-
-  @Override
-  public void process(ClusterEvent event) throws Exception
-  {
-    long startTime = System.currentTimeMillis();
-    logger.info("START TaskAssignmentStage.process()");
-
-    HelixManager manager = event.getAttribute("helixmanager");
-    Map<String, Resource> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES.toString());
-    MessageThrottleStageOutput messageOutput =
-        event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
-
-    if (manager == null || resourceMap == null || messageOutput == null)
-    {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache");
-    }
-
-    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
-    List<Message> messagesToSend = new ArrayList<Message>();
-    for (String resourceName : resourceMap.keySet())
-    {
-      Resource resource = resourceMap.get(resourceName);
-      for (Partition partition : resource.getPartitions())
-      {
-        List<Message> messages = messageOutput.getMessages(resourceName, partition);
-        messagesToSend.addAll(messages);
-      }
-    }
-
-    List<Message> outputMessages =
-        groupMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap);
-    sendMessages(dataAccessor, outputMessages);
-
-    long endTime = System.currentTimeMillis();
-    logger.info("END TaskAssignmentStage.process(). took: " + (endTime - startTime)
-        + " ms");
-
-  }
-
-  List<Message> groupMessage(Builder keyBuilder,
-                             List<Message> messages,
-                             Map<String, Resource> resourceMap)
-  {
-    // group messages by its CurrentState path + "/" + fromState + "/" + toState
-    Map<String, Message> groupMessages = new HashMap<String, Message>();
-    List<Message> outputMessages = new ArrayList<Message>();
-
-    Iterator<Message> iter = messages.iterator();
-    while (iter.hasNext())
-    {
-      Message message = iter.next();
-      String resourceName = message.getResourceName();
-      Resource resource = resourceMap.get(resourceName);
-      if (resource == null || !resource.getGroupMessageMode())
-      {
-        outputMessages.add(message);
-        continue;
-      }
-
-      String key =
-          keyBuilder.currentState(message.getTgtName(),
-                                  message.getTgtSessionId(),
-                                  message.getResourceName()).getPath()
-              + "/" + message.getFromState() + "/" + message.getToState();
-
-      if (!groupMessages.containsKey(key))
-      {
-        Message groupMessage = new Message(message.getRecord());
-        groupMessage.setGroupMessageMode(true);
-        outputMessages.add(groupMessage);
-        groupMessages.put(key, groupMessage);
-      }
-      groupMessages.get(key).addPartitionName(message.getPartitionName());
-    }
-
-    return outputMessages;
-  }
-
-  protected void sendMessages(HelixDataAccessor dataAccessor, List<Message> messages)
-  {
-    if (messages == null || messages.isEmpty())
-    {
-      return;
-    }
-
-    Builder keyBuilder = dataAccessor.keyBuilder();
-
-    List<PropertyKey> keys = new ArrayList<PropertyKey>();
-    for (Message message : messages)
-    {
-      logger.info("Sending Message " + message.getMsgId() + " to " + message.getTgtName()
-          + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
-          + " from:" + message.getFromState() + " to:" + message.getToState());
-
-      keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
-    }
-
-    dataAccessor.createChildren(keys, new ArrayList<Message>(messages));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/package-info.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/package-info.java
deleted file mode 100644
index baef0ff..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Stages in Helix controller pipelines 
- * 
- */
-package com.linkedin.helix.controller.stages;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/examples/BootstrapHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/examples/BootstrapHandler.java b/helix-core/src/main/java/com/linkedin/helix/examples/BootstrapHandler.java
deleted file mode 100644
index 43f4b85..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/examples/BootstrapHandler.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.examples;
-
-import java.util.UUID;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.ClusterMessagingService;
-import com.linkedin.helix.Criteria;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageState;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.participant.statemachine.StateModel;
-import com.linkedin.helix.participant.statemachine.StateModelFactory;
-import com.linkedin.helix.participant.statemachine.StateModelInfo;
-import com.linkedin.helix.participant.statemachine.Transition;
-
-public class BootstrapHandler extends StateModelFactory<StateModel>
-{
-
-  @Override
-  public StateModel createNewStateModel(String stateUnitKey)
-  {
-    return new BootstrapStateModel(stateUnitKey);
-  }
-
-  @StateModelInfo(initialState = "OFFLINE", states = "{'OFFLINE','SLAVE','MASTER'}")
-  public static class BootstrapStateModel extends StateModel
-  {
-
-    private final String _stateUnitKey;
-
-    public BootstrapStateModel(String stateUnitKey)
-    {
-      _stateUnitKey = stateUnitKey;
-
-    }
-    @Transition(from = "MASTER", to = "SLAVE")
-    public void masterToSlave(Message message, NotificationContext context)
-    {
-      
-    }
-    @Transition(from = "OFFLINE", to = "SLAVE")
-    public void offlineToSlave(Message message, NotificationContext context)
-    {
-      System.out
-          .println("BootstrapProcess.BootstrapStateModel.offlineToSlave()");
-      HelixManager manager = context.getManager();
-      ClusterMessagingService messagingService = manager.getMessagingService();
-      Message requestBackupUriRequest = new Message(
-          MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
-      requestBackupUriRequest
-          .setMsgSubType(BootstrapProcess.REQUEST_BOOTSTRAP_URL);
-      requestBackupUriRequest.setMsgState(MessageState.NEW);
-      Criteria recipientCriteria = new Criteria();
-      recipientCriteria.setInstanceName("*");
-      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-      recipientCriteria.setResource(message.getResourceName());
-      recipientCriteria.setPartition(message.getPartitionName());
-      recipientCriteria.setSessionSpecific(true);
-      // wait for 30 seconds
-      int timeout = 30000;
-      BootstrapReplyHandler responseHandler = new BootstrapReplyHandler();
-
-      int sentMessageCount = messagingService.sendAndWait(recipientCriteria,
-          requestBackupUriRequest, responseHandler, timeout);
-      if (sentMessageCount == 0)
-      {
-        // could not find any other node hosting the partition
-      } else if (responseHandler.getBootstrapUrl() != null)
-      {
-        System.out.println("Got bootstrap url:"+ responseHandler.getBootstrapUrl() );
-        System.out.println("Got backup time:"+ responseHandler.getBootstrapTime() );
-        // Got the url fetch it
-      } else
-      {
-        // Either go to error state
-        // throw new Exception("Cant find backup/bootstrap data");
-        // Request some node to start backup process
-      }
-    }
-    @Transition(from = "SLAVE", to = "OFFLINE")
-    public void slaveToOffline(Message message, NotificationContext context)
-    {
-      System.out
-          .println("BootstrapProcess.BootstrapStateModel.slaveToOffline()");
-    }
-    @Transition(from = "SLAVE", to = "MASTER")
-    public void slaveToMaster(Message message, NotificationContext context)
-    {
-      System.out
-          .println("BootstrapProcess.BootstrapStateModel.slaveToMaster()");
-    }
-
-  }
-}
\ No newline at end of file