You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC

[9/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
new file mode 100644
index 0000000..c090835
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -0,0 +1,184 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Compares the currentState,pendingState with IdealState and generate messages
+ * 
+ * @author kgopalak
+ * 
+ */
+public class MessageGenerationPhase extends AbstractBaseStage
+{
+  private static Logger logger = Logger.getLogger(MessageGenerationPhase.class);
+
+  @Override
+  public void process(ClusterEvent event) throws Exception
+  {
+    HelixManager manager = event.getAttribute("helixmanager");
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE
+        .toString());
+    BestPossibleStateOutput bestPossibleStateOutput = event
+        .getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+    if (manager == null || cache == null || resourceMap == null || currentStateOutput == null
+        || bestPossibleStateOutput == null)
+    {
+      throw new StageException("Missing attributes in event:" + event
+          + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
+    }
+
+    Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+    Map<String, String> sessionIdMap = new HashMap<String, String>();
+
+    for (LiveInstance liveInstance : liveInstances.values())
+    {
+      sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
+    }
+    MessageGenerationOutput output = new MessageGenerationOutput();
+
+    for (String resourceName : resourceMap.keySet())
+    {
+      Resource resource = resourceMap.get(resourceName);
+      int bucketSize = resource.getBucketSize();
+
+      StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
+
+      for (Partition partition : resource.getPartitions())
+      {
+        Map<String, String> instanceStateMap = bestPossibleStateOutput.getInstanceStateMap(
+            resourceName, partition);
+
+        for (String instanceName : instanceStateMap.keySet())
+        {
+          String desiredState = instanceStateMap.get(instanceName);
+
+          String currentState = currentStateOutput.getCurrentState(resourceName, partition,
+              instanceName);
+          if (currentState == null)
+          {
+            currentState = stateModelDef.getInitialState();
+          }
+
+          if (desiredState.equalsIgnoreCase(currentState))
+          {
+            continue;
+          }
+
+          String pendingState = currentStateOutput.getPendingState(resourceName, partition,
+              instanceName);
+
+          String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
+          if (nextState == null)
+          {
+            logger.error("Unable to find a next state for partition: "
+                + partition.getPartitionName() + " from stateModelDefinition"
+                + stateModelDef.getClass() + " from:" + currentState + " to:" + desiredState);
+            continue;
+          }
+
+          if (pendingState != null)
+          {
+            if (nextState.equalsIgnoreCase(pendingState))
+            {
+              logger.debug("Message already exists for " + instanceName + " to transit "
+                  + partition.getPartitionName() + " from " + currentState + " to " + nextState);
+            } else if (currentState.equalsIgnoreCase(pendingState))
+            {
+              logger.info("Message hasn't been removed for " + instanceName + " to transit"
+                  + partition.getPartitionName() + " to " + pendingState + ", desiredState: "
+                  + desiredState);
+            } else
+            {
+              logger.info("IdealState changed before state transition completes for "
+                  + partition.getPartitionName() + " on " + instanceName + ", pendingState: "
+                  + pendingState + ", currentState: " + currentState + ", nextState: " + nextState);
+            }
+          } else
+          {
+            Message message = createMessage(manager, resourceName, partition.getPartitionName(),
+                instanceName, currentState, nextState, sessionIdMap.get(instanceName),
+                stateModelDef.getId(), resource.getStateModelFactoryname(), bucketSize);
+            IdealState idealState = cache.getIdealState(resourceName);
+            // Set timeout of needed
+            String stateTransition = currentState + "-" + nextState + "_"
+                + Message.Attributes.TIMEOUT;
+            if (idealState != null
+                && idealState.getRecord().getSimpleField(stateTransition) != null)
+            {
+              try
+              {
+                int timeout = Integer.parseInt(idealState.getRecord().getSimpleField(
+                    stateTransition));
+                if (timeout > 0)
+                {
+                  message.setExecutionTimeout(timeout);
+                }
+              } catch (Exception e)
+              {
+                logger.error("", e);
+              }
+            }
+            message.getRecord().setSimpleField("ClusterEventName", event.getName());
+            output.addMessage(resourceName, partition, message);
+          }
+        }
+      }
+    }
+    event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
+  }
+
+  private Message createMessage(HelixManager manager, String resourceName, String partitionName,
+      String instanceName, String currentState, String nextState, String sessionId,
+      String stateModelDefName, String stateModelFactoryName, int bucketSize)
+  {
+    String uuid = UUID.randomUUID().toString();
+    Message message = new Message(MessageType.STATE_TRANSITION, uuid);
+    message.setSrcName(manager.getInstanceName());
+    message.setTgtName(instanceName);
+    message.setMsgState(MessageState.NEW);
+    message.setPartitionName(partitionName);
+    message.setResourceName(resourceName);
+    message.setFromState(currentState);
+    message.setToState(nextState);
+    message.setTgtSessionId(sessionId);
+    message.setSrcSessionId(manager.getSessionId());
+    message.setStateModelDef(stateModelDefName);
+    message.setStateModelFactoryName(stateModelFactoryName);
+    message.setBucketSize(bucketSize);
+
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
new file mode 100644
index 0000000..b3a7015
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -0,0 +1,337 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+
+public class MessageSelectionStage extends AbstractBaseStage
+{
+  private static final Logger LOG = Logger.getLogger(MessageSelectionStage.class);
+
+  static class Bounds
+  {
+    private int upper;
+    private int lower;
+
+    public Bounds(int lower, int upper)
+    {
+      this.lower = lower;
+      this.upper = upper;
+    }
+
+    public void increaseUpperBound()
+    {
+      upper++;
+    }
+
+    public void increaseLowerBound()
+    {
+      lower++;
+    }
+
+    public void decreaseUpperBound()
+    {
+      upper--;
+    }
+
+    public void decreaseLowerBound()
+    {
+      lower--;
+    }
+
+    public int getLowerBound()
+    {
+      return lower;
+    }
+
+    public int getUpperBound()
+    {
+      return upper;
+    }
+  }
+
+  @Override
+  public void process(ClusterEvent event) throws Exception
+  {
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+    Map<String, Resource> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
+    CurrentStateOutput currentStateOutput =
+        event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    MessageGenerationOutput messageGenOutput =
+        event.getAttribute(AttributeName.MESSAGES_ALL.toString());
+    if (cache == null || resourceMap == null || currentStateOutput == null
+        || messageGenOutput == null)
+    {
+      throw new StageException("Missing attributes in event:" + event
+          + ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
+    }
+
+    MessageSelectionStageOutput output = new MessageSelectionStageOutput();
+
+    for (String resourceName : resourceMap.keySet())
+    {
+      Resource resource = resourceMap.get(resourceName);
+      StateModelDefinition stateModelDef =
+          cache.getStateModelDef(resource.getStateModelDefRef());
+
+      Map<String, Integer> stateTransitionPriorities =
+          getStateTransitionPriorityMap(stateModelDef);
+      IdealState idealState = cache.getIdealState(resourceName);
+      Map<String, Bounds> stateConstraints =
+          computeStateConstraints(stateModelDef, idealState, cache);
+
+      for (Partition partition : resource.getPartitions())
+      {
+        List<Message> messages = messageGenOutput.getMessages(resourceName, partition);
+        List<Message> selectedMessages =
+            selectMessages(cache.getLiveInstances(),
+                           currentStateOutput.getCurrentStateMap(resourceName, partition),
+                           currentStateOutput.getPendingStateMap(resourceName, partition),
+                           messages,
+                           stateConstraints,
+                           stateTransitionPriorities,
+                           stateModelDef.getInitialState());
+        output.addMessages(resourceName, partition, selectedMessages);
+      }
+    }
+    event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
+  }
+
+  // TODO: This method deserves its own class. The class should not understand helix but
+  // just be
+  // able to solve the problem using the algo. I think the method is following that but if
+  // we don't move it to another class its quite easy to break that contract
+  /**
+   * greedy message selection algorithm: 1) calculate CS+PS state lower/upper-bounds 2)
+   * group messages by state transition and sorted by priority 3) from highest priority to
+   * lowest, for each message group with the same transition add message one by one and
+   * make sure state constraint is not violated update state lower/upper-bounds when a new
+   * message is selected
+   *
+   * @param currentStates
+   * @param pendingStates
+   * @param messages
+   * @param stateConstraints
+   *          : STATE -> bound (lower:upper)
+   * @param stateTransitionPriorities
+   *          : FROME_STATE-TO_STATE -> priority
+   * @return: selected messages
+   */
+  List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
+                               Map<String, String> currentStates,
+                               Map<String, String> pendingStates,
+                               List<Message> messages,
+                               Map<String, Bounds> stateConstraints,
+                               final Map<String, Integer> stateTransitionPriorities,
+                               String initialState)
+  {
+    if (messages == null || messages.isEmpty())
+    {
+      return Collections.emptyList();
+    }
+
+    List<Message> selectedMessages = new ArrayList<Message>();
+    Map<String, Bounds> bounds = new HashMap<String, Bounds>();
+
+    // count currentState, if no currentState, count as in initialState
+    for (String instance : liveInstances.keySet())
+    {
+      String state = initialState;
+      if (currentStates.containsKey(instance))
+      {
+        state = currentStates.get(instance);
+      }
+
+      if (!bounds.containsKey(state))
+      {
+        bounds.put(state, new Bounds(0, 0));
+      }
+      bounds.get(state).increaseLowerBound();
+      bounds.get(state).increaseUpperBound();
+    }
+
+    // count pendingStates
+    for (String instance : pendingStates.keySet())
+    {
+      String state = pendingStates.get(instance);
+      if (!bounds.containsKey(state))
+      {
+        bounds.put(state, new Bounds(0, 0));
+      }
+      // TODO: add lower bound, need to refactor pendingState to include fromState also
+      bounds.get(state).increaseUpperBound();
+    }
+
+    // group messages based on state transition priority
+    Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
+        new TreeMap<Integer, List<Message>>();
+    for (Message message : messages)
+    {
+      String fromState = message.getFromState();
+      String toState = message.getToState();
+      String transition = fromState + "-" + toState;
+      int priority = Integer.MAX_VALUE;
+
+      if (stateTransitionPriorities.containsKey(transition))
+      {
+        priority = stateTransitionPriorities.get(transition);
+      }
+
+      if (!messagesGroupByStateTransitPriority.containsKey(priority))
+      {
+        messagesGroupByStateTransitPriority.put(priority, new ArrayList<Message>());
+      }
+      messagesGroupByStateTransitPriority.get(priority).add(message);
+    }
+
+    // select messages
+    for (List<Message> messageList : messagesGroupByStateTransitPriority.values())
+    {
+      for (Message message : messageList)
+      {
+        String fromState = message.getFromState();
+        String toState = message.getToState();
+
+        if (!bounds.containsKey(fromState))
+        {
+          LOG.error("Message's fromState is not in currentState. message: " + message);
+          continue;
+        }
+
+        if (!bounds.containsKey(toState))
+        {
+          bounds.put(toState, new Bounds(0, 0));
+        }
+
+        // check lower bound of fromState
+        if (stateConstraints.containsKey(fromState))
+        {
+          int newLowerBound = bounds.get(fromState).getLowerBound() - 1;
+          if (newLowerBound < 0)
+          {
+            LOG.error("Number of currentState in " + fromState
+                + " is less than number of messages transiting from " + fromState);
+            continue;
+          }
+
+          if (newLowerBound < stateConstraints.get(fromState).getLowerBound())
+          {
+            continue;
+          }
+        }
+
+        // check upper bound of toState
+        if (stateConstraints.containsKey(toState))
+        {
+          int newUpperBound = bounds.get(toState).getUpperBound() + 1;
+          if (newUpperBound > stateConstraints.get(toState).getUpperBound())
+          {
+            continue;
+          }
+        }
+
+        selectedMessages.add(message);
+        bounds.get(fromState).increaseLowerBound();
+        bounds.get(toState).increaseUpperBound();
+      }
+    }
+
+    return selectedMessages;
+  }
+
+  /**
+   * TODO: This code is duplicate in multiple places. Can we do it in to one place in the
+   * beginning and compute the stateConstraint instance once and re use at other places.
+   * Each IdealState must have a constraint object associated with it
+   */
+  private Map<String, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
+                                                      IdealState idealState,
+                                                      ClusterDataCache cache)
+  {
+    Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
+
+    List<String> statePriorityList = stateModelDefinition.getStatesPriorityList();
+    for (String state : statePriorityList)
+    {
+      String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state);
+      int max = -1;
+      if ("N".equals(numInstancesPerState))
+      {
+        max = cache.getLiveInstances().size();
+      }
+      else if ("R".equals(numInstancesPerState))
+      {
+        // idealState is null when resource has been dropped,
+        // R can't be evaluated and ignore state constraints
+        if (idealState != null)
+        {
+          max = cache.getReplicas(idealState.getResourceName());
+        }
+      }
+      else
+      {
+        try
+        {
+          max = Integer.parseInt(numInstancesPerState);
+        }
+        catch (Exception e)
+        {
+          // use -1
+        }
+      }
+
+      if (max > -1)
+      {
+        // if state has no constraint, will not put in map
+        stateConstraints.put(state, new Bounds(0, max));
+      }
+    }
+
+    return stateConstraints;
+  }
+
+  // TODO: if state transition priority is not provided then use lexicographical sorting
+  // so that behavior is consistent
+  private Map<String, Integer> getStateTransitionPriorityMap(StateModelDefinition stateModelDef)
+  {
+    Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
+    List<String> stateTransitionPriorityList =
+        stateModelDef.getStateTransitionPriorityList();
+    for (int i = 0; i < stateTransitionPriorityList.size(); i++)
+    {
+      stateTransitionPriorities.put(stateTransitionPriorityList.get(i), i);
+    }
+
+    return stateTransitionPriorities;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java
new file mode 100644
index 0000000..e5f46c9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+
+
+public class MessageSelectionStageOutput
+{
+  private final Map<String, Map<Partition, List<Message>>> _messagesMap;
+
+  public MessageSelectionStageOutput()
+  {
+    _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
+  }
+
+  public void addMessages(String resourceName, Partition partition,
+      List<Message> selectedMessages)
+  {
+    if (!_messagesMap.containsKey(resourceName))
+    {
+      _messagesMap.put(resourceName,
+          new HashMap<Partition, List<Message>>());
+    }
+    _messagesMap.get(resourceName).put(partition, selectedMessages);
+
+  }
+
+  public List<Message> getMessages(String resourceName,
+      Partition partition)
+  {
+    Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
+    if (map != null)
+    {
+      return map.get(partition);
+    }
+    return Collections.emptyList();
+
+  }
+
+  @Override
+  public String toString()
+  {
+    return _messagesMap.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
new file mode 100644
index 0000000..d5447b9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
@@ -0,0 +1,229 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
+import org.apache.helix.model.ClusterConstraints.ConstraintItem;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.ClusterConstraints.ConstraintValue;
+import org.apache.log4j.Logger;
+
+
+public class MessageThrottleStage extends AbstractBaseStage
+{
+  private static final Logger LOG =
+                                      Logger.getLogger(MessageThrottleStage.class.getName());
+
+  int valueOf(String valueStr)
+  {
+    int value = Integer.MAX_VALUE;
+
+    try
+    {
+      ConstraintValue valueToken = ConstraintValue.valueOf(valueStr);
+      switch (valueToken)
+      {
+      case ANY:
+        value = Integer.MAX_VALUE;
+        break;
+      default:
+        LOG.error("Invalid constraintValue token:" + valueStr + ". Use default value:"
+            + Integer.MAX_VALUE);
+        break;
+      }
+    }
+    catch (Exception e)
+    {
+      try
+      {
+        value = Integer.parseInt(valueStr);
+      }
+      catch (NumberFormatException ne)
+      {
+        LOG.error("Invalid constraintValue string:" + valueStr + ". Use default value:"
+            + Integer.MAX_VALUE);
+      }
+    }
+    return value;
+  }
+
+  /**
+   * constraints are selected in the order of the following rules: 1) don't select
+   * constraints with CONSTRAINT_VALUE=ANY; 2) if one constraint is more specific than the
+   * other, select the most specific one 3) if a message matches multiple constraints of
+   * incomparable specificity, select the one with the minimum value 4) if a message
+   * matches multiple constraints of incomparable specificity, and they all have the same
+   * value, select the first in alphabetic order
+   */
+  Set<ConstraintItem> selectConstraints(Set<ConstraintItem> items,
+                                        Map<ConstraintAttribute, String> attributes)
+  {
+    Map<String, ConstraintItem> selectedItems = new HashMap<String, ConstraintItem>();
+    for (ConstraintItem item : items)
+    {
+      // don't select constraints with CONSTRAINT_VALUE=ANY
+      if (item.getConstraintValue().equals(ConstraintValue.ANY.toString()))
+      {
+        continue;
+      }
+
+      String key = item.filter(attributes).toString();
+      if (!selectedItems.containsKey(key))
+      {
+        selectedItems.put(key, item);
+      }
+      else
+      {
+        ConstraintItem existingItem = selectedItems.get(key);
+        if (existingItem.match(item.getAttributes()))
+        {
+          // item is more specific than existingItem
+          selectedItems.put(key, item);
+        }
+        else if (!item.match(existingItem.getAttributes()))
+        {
+          // existingItem and item are of incomparable specificity
+          int value = valueOf(item.getConstraintValue());
+          int existingValue = valueOf(existingItem.getConstraintValue());
+          if (value < existingValue)
+          {
+            // item's constraint value is less than that of existingItem
+            selectedItems.put(key, item);
+          }
+          else if (value == existingValue)
+          {
+            if (item.toString().compareTo(existingItem.toString()) < 0)
+            {
+              // item is ahead of existingItem in alphabetic order
+              selectedItems.put(key, item);
+            }
+          }
+        }
+      }
+    }
+    return new HashSet<ConstraintItem>(selectedItems.values());
+  }
+
+  @Override
+  public void process(ClusterEvent event) throws Exception
+  {
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+    MessageSelectionStageOutput msgSelectionOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+    Map<String, Resource> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
+
+    if (cache == null || resourceMap == null || msgSelectionOutput == null)
+    {
+      throw new StageException("Missing attributes in event: " + event
+          + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
+    }
+
+    MessageThrottleStageOutput output = new MessageThrottleStageOutput();
+
+        ClusterConstraints constraint = cache.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
+    Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
+
+    if (constraint != null)
+    {
+      // go through all pending messages, they should be counted but not throttled
+      for (String instance : cache.getLiveInstances().keySet())
+      {
+        throttle(throttleCounterMap,
+                 constraint,
+                 new ArrayList<Message>(cache.getMessages(instance).values()),
+                 false);
+      }
+    }
+
+    // go through all new messages, throttle if necessary
+    // assume messages should be sorted by state transition priority in messageSelection stage
+    for (String resourceName : resourceMap.keySet())
+    {
+      Resource resource = resourceMap.get(resourceName);
+      for (Partition partition : resource.getPartitions())
+      {
+        List<Message> messages = msgSelectionOutput.getMessages(resourceName, partition);
+        if (constraint != null && messages != null && messages.size() > 0)
+        {
+          messages = throttle(throttleCounterMap, constraint, messages, true);
+        }
+        output.addMessages(resourceName, partition, messages);
+      }
+    }
+
+    event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output);
+  }
+
+  private List<Message> throttle(Map<String, Integer> throttleMap,
+                                 ClusterConstraints constraint,
+                                 List<Message> messages,
+                                 final boolean needThrottle)
+  {
+  
+    List<Message> throttleOutputMsgs = new ArrayList<Message>();
+    for (Message message : messages)
+    {
+      Map<ConstraintAttribute, String> msgAttr = ClusterConstraints.toConstraintAttributes(message);
+
+      Set<ConstraintItem> matches = constraint.match(msgAttr);
+      matches = selectConstraints(matches, msgAttr);
+
+      boolean msgThrottled = false;
+      for (ConstraintItem item : matches)
+      {
+        String key = item.filter(msgAttr).toString();
+        if (!throttleMap.containsKey(key))
+        {
+          throttleMap.put(key, valueOf(item.getConstraintValue()));
+        }
+        int value = throttleMap.get(key);
+        throttleMap.put(key, --value);
+
+        if (needThrottle && value < 0)
+        {
+          msgThrottled = true;
+          
+          if (LOG.isDebugEnabled())
+          {
+            // TODO: printout constraint item that throttles the message
+            LOG.debug("message: " + message + " is throttled by constraint: " + item);
+          }
+        }
+      }
+
+      if (!msgThrottled)
+      {
+        throttleOutputMsgs.add(message);
+      }
+    }
+
+    return throttleOutputMsgs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java
new file mode 100644
index 0000000..73694fa
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+
+
+public class MessageThrottleStageOutput
+{
+  private final Map<String, Map<Partition, List<Message>>> _messagesMap;
+
+  public MessageThrottleStageOutput()
+  {
+    _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
+  }
+
+  public void addMessages(String resourceName,
+                          Partition partition,
+                          List<Message> selectedMessages)
+  {
+    if (!_messagesMap.containsKey(resourceName))
+    {
+      _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>());
+    }
+    _messagesMap.get(resourceName).put(partition, selectedMessages);
+
+  }
+
+  public List<Message> getMessages(String resourceName, Partition partition)
+  {
+    Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
+    if (map != null)
+    {
+      return map.get(partition);
+    }
+    return Collections.emptyList();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
new file mode 100644
index 0000000..b6891b7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.log4j.Logger;
+
+
+public class ReadClusterDataStage extends AbstractBaseStage
+{
+  private static final Logger logger = Logger
+      .getLogger(ReadClusterDataStage.class.getName());
+  ClusterDataCache _cache;
+
+  public ReadClusterDataStage()
+  {
+    _cache = new ClusterDataCache();
+  }
+
+  @Override
+  public void process(ClusterEvent event) throws Exception
+  {
+    long startTime = System.currentTimeMillis();
+    logger.info("START ReadClusterDataStage.process()");
+
+    
+    HelixManager manager = event.getAttribute("helixmanager");
+    if (manager == null)
+    {
+      throw new StageException("HelixManager attribute value is null");
+    }
+    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+    _cache.refresh(dataAccessor);
+    
+    ClusterStatusMonitor clusterStatusMonitor = (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+    if(clusterStatusMonitor != null)
+    {
+      int disabledInstances = 0;
+      int disabledPartitions = 0;
+      for(InstanceConfig  config : _cache._instanceConfigMap.values())
+      {
+        if(config.getInstanceEnabled() == false)
+        {
+          disabledInstances ++;
+        }
+        if(config.getDisabledPartitionMap() != null)
+        {
+          disabledPartitions += config.getDisabledPartitionMap().size();
+        }
+      }
+      clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(), _cache._instanceConfigMap.size(), 
+          disabledInstances, disabledPartitions);
+    }
+
+    event.addAttribute("ClusterDataCache", _cache);
+    
+    long endTime = System.currentTimeMillis();
+    logger.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
new file mode 100644
index 0000000..1922073
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
@@ -0,0 +1,55 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.log4j.Logger;
+
+
+public class ReadHealthDataStage extends AbstractBaseStage
+{
+  private static final Logger LOG = Logger.getLogger(ReadHealthDataStage.class.getName());
+  HealthDataCache _cache;
+
+  public ReadHealthDataStage()
+  {
+    _cache = new HealthDataCache();
+  }
+
+  @Override
+  public void process(ClusterEvent event) throws Exception
+  {
+    long startTime = System.currentTimeMillis();
+
+    HelixManager manager = event.getAttribute("helixmanager");
+    if (manager == null)
+    {
+      throw new StageException("HelixManager attribute value is null");
+    }
+    // DataAccessor dataAccessor = manager.getDataAccessor();
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    _cache.refresh(accessor);
+
+    event.addAttribute("HealthDataCache", _cache);
+
+    long processLatency = System.currentTimeMillis() - startTime;
+    addLatencyToMonitor(event, processLatency);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
new file mode 100644
index 0000000..72b4e63
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -0,0 +1,156 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Resource;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This stage computes all the resources in a cluster. The resources are
+ * computed from IdealStates -> this gives all the resources currently active
+ * CurrentState for liveInstance-> Helps in finding resources that are inactive
+ * and needs to be dropped
+ *
+ * @author kgopalak
+ *
+ */
+public class ResourceComputationStage extends AbstractBaseStage
+{
+  private static Logger LOG = Logger.getLogger(ResourceComputationStage.class);
+
+  @Override
+  public void process(ClusterEvent event) throws Exception
+  {
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+    if (cache == null)
+    {
+      throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
+    }
+
+    Map<String, IdealState> idealStates = cache.getIdealStates();
+
+    Map<String, Resource> resourceMap = new LinkedHashMap<String, Resource>();
+
+    if (idealStates != null && idealStates.size() > 0)
+    {
+      for (IdealState idealState : idealStates.values())
+      {
+        Set<String> partitionSet = idealState.getPartitionSet();
+        String resourceName = idealState.getResourceName();
+
+        for (String partition : partitionSet)
+        {
+          addPartition(partition, resourceName, resourceMap);
+          Resource resource = resourceMap.get(resourceName);
+          resource.setStateModelDefRef(idealState.getStateModelDefRef());
+          resource.setStateModelFactoryName(idealState.getStateModelFactoryName());
+          resource.setBucketSize(idealState.getBucketSize());
+          resource.setGroupMessageMode(idealState.getGroupMessageMode());
+        }
+      }
+    }
+
+    // It's important to get partitions from CurrentState as well since the
+    // idealState might be removed.
+    Map<String, LiveInstance> availableInstances = cache.getLiveInstances();
+
+    if (availableInstances != null && availableInstances.size() > 0)
+    {
+      for (LiveInstance instance : availableInstances.values())
+      {
+        String instanceName = instance.getInstanceName();
+        String clientSessionId = instance.getSessionId();
+
+        Map<String, CurrentState> currentStateMap = cache.getCurrentState(instanceName,
+            clientSessionId);
+        if (currentStateMap == null || currentStateMap.size() == 0)
+        {
+          continue;
+        }
+        for (CurrentState currentState : currentStateMap.values())
+        {
+
+          String resourceName = currentState.getResourceName();
+          Map<String, String> resourceStateMap = currentState.getPartitionStateMap();
+
+          // don't overwrite ideal state settings
+          if (!resourceMap.containsKey(resourceName))
+          {
+            addResource(resourceName, resourceMap);
+            Resource resource = resourceMap.get(resourceName);
+            resource.setStateModelDefRef(currentState.getStateModelDefRef());
+            resource.setStateModelFactoryName(currentState.getStateModelFactoryName());
+            resource.setBucketSize(currentState.getBucketSize());
+            resource.setGroupMessageMode(currentState.getGroupMessageMode());
+          }
+          
+          if (currentState.getStateModelDefRef() == null)
+          {
+            LOG.error("state model def is null." + "resource:" + currentState.getResourceName()
+                + ", partitions: " + currentState.getPartitionStateMap().keySet() + ", states: "
+                + currentState.getPartitionStateMap().values());
+            throw new StageException("State model def is null for resource:"
+                + currentState.getResourceName());
+          }
+
+          for (String partition : resourceStateMap.keySet())
+          {
+            addPartition(partition, resourceName, resourceMap);
+          }
+        }
+      }
+    }
+
+    event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
+  }
+
+  private void addResource(String resource, Map<String, Resource> resourceMap)
+  {
+    if (resource == null || resourceMap == null)
+    {
+      return;
+    }
+    if (!resourceMap.containsKey(resource))
+    {
+      resourceMap.put(resource, new Resource(resource));
+    }
+  }
+
+  private void addPartition(String partition, String resourceName, Map<String, Resource> resourceMap)
+  {
+    if (resourceName == null || partition == null || resourceMap == null)
+    {
+      return;
+    }
+    if (!resourceMap.containsKey(resourceName))
+    {
+      resourceMap.put(resourceName, new Resource(resourceName));
+    }
+    Resource resource = resourceMap.get(resourceName);
+    resource.addPartition(partition);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
new file mode 100644
index 0000000..52711e9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
@@ -0,0 +1,457 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.alerts.AlertParser;
+import org.apache.helix.alerts.AlertProcessor;
+import org.apache.helix.alerts.AlertValueAndStatus;
+import org.apache.helix.alerts.AlertsHolder;
+import org.apache.helix.alerts.ExpressionParser;
+import org.apache.helix.alerts.StatsHolder;
+import org.apache.helix.alerts.Tuple;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.healthcheck.StatHealthReportProvider;
+import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory.ActionOnError;
+import org.apache.helix.model.AlertHistory;
+import org.apache.helix.model.HealthStat;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.PersistentStats;
+import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
+import org.apache.log4j.Logger;
+
+
+/**
+ * For each LiveInstances select currentState and message whose sessionId matches
+ * sessionId from LiveInstance Get Partition,State for all the resources computed in
+ * previous State [ResourceComputationStage]
+ *
+ * @author asilbers
+ *
+ */
+public class StatsAggregationStage extends AbstractBaseStage
+{
+
+  public static final int ALERT_HISTORY_SIZE = 30;
+
+  private static final Logger logger =
+      Logger.getLogger(StatsAggregationStage.class.getName());
+
+  StatsHolder _statsHolder = null;
+  AlertsHolder _alertsHolder = null;
+  Map<String, Map<String, AlertValueAndStatus>> _alertStatus;
+  Map<String, Tuple<String>> _statStatus;
+  ClusterAlertMBeanCollection _alertBeanCollection = new ClusterAlertMBeanCollection();
+  Map<String, String> _alertActionTaken = new HashMap<String, String>();
+
+  public final String PARTICIPANT_STAT_REPORT_NAME = StatHealthReportProvider.REPORT_NAME;
+  public final String ESPRESSO_STAT_REPORT_NAME = "RestQueryStats";
+  public final String REPORT_NAME = "AggStats";
+  // public final String DEFAULT_AGG_TYPE = "decay";
+  // public final String DEFAULT_DECAY_PARAM = "0.1";
+  // public final String DEFAULT_AGG_TYPE = "window";
+  // public final String DEFAULT_DECAY_PARAM = "5";
+
+  public StatHealthReportProvider _aggStatsProvider;
+
+  // public AggregationType _defaultAggType;
+
+  public Map<String, Map<String, AlertValueAndStatus>> getAlertStatus()
+  {
+    return _alertStatus;
+  }
+
+  public Map<String, Tuple<String>> getStatStatus()
+  {
+    return _statStatus;
+  }
+
+  public void persistAggStats(HelixManager manager)
+  {
+    Map<String, String> report = _aggStatsProvider.getRecentHealthReport();
+    Map<String, Map<String, String>> partitionReport =
+        _aggStatsProvider.getRecentPartitionHealthReport();
+    ZNRecord record = new ZNRecord(_aggStatsProvider.getReportName());
+    if (report != null)
+    {
+      record.setSimpleFields(report);
+    }
+    if (partitionReport != null)
+    {
+      record.setMapFields(partitionReport);
+    }
+
+//    DataAccessor accessor = manager.getDataAccessor();
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+//    boolean retVal = accessor.setProperty(PropertyType.PERSISTENTSTATS, record);
+    Builder keyBuilder = accessor.keyBuilder();
+    boolean retVal = accessor.setProperty(keyBuilder.persistantStat(), new PersistentStats(record));
+    if (retVal == false)
+    {
+      logger.error("attempt to persist derived stats failed");
+    }
+  }
+
+  @Override
+  public void init(StageContext context)
+  {
+  }
+
+  public String getAgeStatName(String instance)
+  {
+    return instance + ExpressionParser.statFieldDelim + "reportingage";
+  }
+
+  // currTime in seconds
+  public void reportAgeStat(LiveInstance instance, long modifiedTime, long currTime)
+  {
+    String statName = getAgeStatName(instance.getInstanceName());
+    long age = (currTime - modifiedTime) / 1000; // XXX: ensure this is in
+                                                 // seconds
+    Map<String, String> ageStatMap = new HashMap<String, String>();
+    ageStatMap.put(StatsHolder.TIMESTAMP_NAME, String.valueOf(currTime));
+    ageStatMap.put(StatsHolder.VALUE_NAME, String.valueOf(age));
+    // note that applyStat will only work if alert already added
+    _statsHolder.applyStat(statName, ageStatMap);
+  }
+
+  @Override
+  public void process(ClusterEvent event) throws Exception
+  {
+    long startTime = System.currentTimeMillis();
+    // String aggTypeName =
+    // DEFAULT_AGG_TYPE+AggregationType.DELIM+DEFAULT_DECAY_PARAM;
+    // _defaultAggType = AggregationTypeFactory.getAggregationType(aggTypeName);
+
+    HelixManager manager = event.getAttribute("helixmanager");
+    HealthDataCache cache = event.getAttribute("HealthDataCache");
+
+    if (manager == null || cache == null)
+    {
+      throw new StageException("helixmanager|HealthDataCache attribute value is null");
+    }
+    if(_alertsHolder == null)
+    {
+      _statsHolder = new StatsHolder(manager, cache);
+      _alertsHolder = new AlertsHolder(manager, cache, _statsHolder);
+    }
+    else
+    {
+      _statsHolder.updateCache(cache);
+      _alertsHolder.updateCache(cache);
+    }
+    if (_statsHolder.getStatsList().size() == 0)
+    {
+      logger.info("stat holder is empty");
+      return;
+    }
+
+    // init agg stats from cache
+    // initAggStats(cache);
+
+    Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+
+    long currTime = System.currentTimeMillis();
+    // for each live node, read node's stats
+    long readInstancesStart = System.currentTimeMillis();
+    for (LiveInstance instance : liveInstances.values())
+    {
+      String instanceName = instance.getInstanceName();
+      logger.debug("instanceName: " + instanceName);
+      // XXX: now have map of HealthStats, so no need to traverse them...verify
+      // correctness
+      Map<String, HealthStat> stats;
+      stats = cache.getHealthStats(instanceName);
+      // find participants stats
+      long modTime = -1;
+      // TODO: get healthreport child node modified time and reportAgeStat based on that
+      boolean reportedAge = false;
+      for (HealthStat participantStat : stats.values())
+      {
+        if (participantStat != null && !reportedAge)
+        {
+          // generate and report stats for how old this node's report is
+          modTime = participantStat.getLastModifiedTimeStamp();
+          reportAgeStat(instance, modTime, currTime);
+          reportedAge = true;
+        }
+        // System.out.println(modTime);
+        // XXX: need to convert participantStat to a better format
+        // need to get instanceName in here
+
+        if (participantStat != null)
+        {
+          // String timestamp = String.valueOf(instance.getModifiedTime()); WANT
+          // REPORT LEVEL TS
+          Map<String, Map<String, String>> statMap =
+              participantStat.getHealthFields(instanceName);
+          for (String key : statMap.keySet())
+          {
+            _statsHolder.applyStat(key, statMap.get(key));
+          }
+        }
+      }
+    }
+    // Call _statsHolder.persistStats() once per pipeline. This will
+    // write the updated persisted stats into zookeeper
+    _statsHolder.persistStats();
+    logger.info("Done processing stats: "
+        + (System.currentTimeMillis() - readInstancesStart));
+    // populate _statStatus
+    _statStatus = _statsHolder.getStatsMap();
+
+    for (String statKey : _statStatus.keySet())
+    {
+      logger.debug("Stat key, value: " + statKey + ": " + _statStatus.get(statKey));
+    }
+
+    long alertExecuteStartTime = System.currentTimeMillis();
+    // execute alerts, populate _alertStatus
+    _alertStatus =
+        AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(),
+                                        _statsHolder.getStatsList());
+    logger.info("done executing alerts: "
+        + (System.currentTimeMillis() - alertExecuteStartTime));
+    for (String originAlertName : _alertStatus.keySet())
+    {
+      _alertBeanCollection.setAlerts(originAlertName,
+                                     _alertStatus.get(originAlertName),
+                                     manager.getClusterName());
+    }
+
+    executeAlertActions(manager);
+    // Write alert fire history to zookeeper
+    updateAlertHistory(manager);
+    long writeAlertStartTime = System.currentTimeMillis();
+    // write out alert status (to zk)
+    _alertsHolder.addAlertStatusSet(_alertStatus);
+    logger.info("done writing alerts: "
+        + (System.currentTimeMillis() - writeAlertStartTime));
+
+    // TODO: access the 2 status variables from somewhere to populate graphs
+
+    long logAlertStartTime = System.currentTimeMillis();
+    // logging alert status
+    for (String alertOuterKey : _alertStatus.keySet())
+    {
+      logger.debug("Alert Outer Key: " + alertOuterKey);
+      Map<String, AlertValueAndStatus> alertInnerMap = _alertStatus.get(alertOuterKey);
+      if (alertInnerMap == null)
+      {
+        logger.debug(alertOuterKey + " has no alerts to report.");
+        continue;
+      }
+      for (String alertInnerKey : alertInnerMap.keySet())
+      {
+        logger.debug("  " + alertInnerKey + " value: "
+            + alertInnerMap.get(alertInnerKey).getValue() + ", status: "
+            + alertInnerMap.get(alertInnerKey).isFired());
+      }
+    }
+
+    logger.info("done logging alerts: "
+        + (System.currentTimeMillis() - logAlertStartTime));
+
+    long processLatency = System.currentTimeMillis() - startTime;
+    addLatencyToMonitor(event, processLatency);
+    logger.info("process end: " + processLatency);
+  }
+
+  /**
+   * Go through the _alertStatus, and call executeAlertAction for those actual alerts that
+   * has been fired
+   */
+
+  void executeAlertActions( HelixManager manager)
+  {
+    _alertActionTaken.clear();
+    // Go through the original alert strings
+    for(String originAlertName : _alertStatus.keySet())
+    {
+      Map<String, String> alertFields = _alertsHolder.getAlertsMap().get(originAlertName);
+      if(alertFields != null && alertFields.containsKey(AlertParser.ACTION_NAME))
+      {
+        String actionValue = alertFields.get(AlertParser.ACTION_NAME);
+        Map<String, AlertValueAndStatus> alertResultMap = _alertStatus.get(originAlertName);
+        if(alertResultMap == null)
+        {
+          logger.info("Alert "+ originAlertName + " does not have alert status map");
+          continue;
+        }
+        // For each original alert, iterate all actual alerts that it expands into
+        for(String actualStatName : alertResultMap.keySet())
+        {
+          // if the actual alert is fired, execute the action
+          if(alertResultMap.get(actualStatName).isFired())
+          {
+            logger.warn("Alert " + originAlertName + " action " + actionValue + " is triggered by " + actualStatName);
+            _alertActionTaken.put(actualStatName, actionValue);
+            // move functionalities into a seperate class
+            executeAlertAction(actualStatName, actionValue, manager);
+          }
+        }
+      }
+    }
+  }
+  /**
+   * Execute the action if an alert is fired, and the alert has an action associated with it.
+   * NOTE: consider unify this with DefaultParticipantErrorMessageHandler.handleMessage()
+   */
+  void executeAlertAction(String actualStatName, String actionValue, HelixManager manager)
+  {
+    if(actionValue.equals(ActionOnError.DISABLE_INSTANCE.toString()))
+    {
+      String instanceName = parseInstanceName(actualStatName, manager);
+      if(instanceName != null)
+      {
+        logger.info("Disabling instance " + instanceName);
+        manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), instanceName, false);
+      }
+    }
+    else if(actionValue.equals(ActionOnError.DISABLE_PARTITION.toString()))
+    {
+      String instanceName = parseInstanceName(actualStatName, manager);
+      String resourceName = parseResourceName(actualStatName, manager);
+      String partitionName = parsePartitionName(actualStatName, manager);
+      if(instanceName != null && resourceName != null && partitionName != null)
+      {
+        logger.info("Disabling partition " + partitionName + " instanceName " +  instanceName);
+        manager.getClusterManagmentTool().enablePartition(false, manager.getClusterName(), instanceName,
+            resourceName, Arrays.asList(partitionName));
+      }
+    }
+    else if(actionValue.equals(ActionOnError.DISABLE_RESOURCE.toString()))
+    {
+      String instanceName = parseInstanceName(actualStatName, manager);
+      String resourceName = parseResourceName(actualStatName, manager);
+      logger.info("Disabling resource " + resourceName + " instanceName " +  instanceName + " not implemented");
+
+    }
+  }
+
+  public static String parseResourceName(String actualStatName, HelixManager manager)
+  {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    Builder kb = accessor.keyBuilder();
+    List<IdealState> idealStates = accessor.getChildValues(kb.idealStates());
+    for (IdealState idealState : idealStates)
+    {
+      String resourceName = idealState.getResourceName();
+      if(actualStatName.contains("=" + resourceName + ".") || actualStatName.contains("=" + resourceName + ";"))
+      {
+        return resourceName;
+      }
+    }
+    return null;
+  }
+
+  public static String parsePartitionName(String actualStatName, HelixManager manager)
+  {
+    String resourceName = parseResourceName(actualStatName, manager);
+    if(resourceName != null)
+    {
+      String partitionKey = "=" + resourceName + "_";
+      if(actualStatName.contains(partitionKey))
+      {
+        int pos = actualStatName.indexOf(partitionKey);
+        int nextDotPos = actualStatName.indexOf('.', pos + partitionKey.length());
+        int nextCommaPos = actualStatName.indexOf(';', pos + partitionKey.length());
+        if(nextCommaPos > 0 && nextCommaPos < nextDotPos)
+        {
+          nextDotPos = nextCommaPos;
+        }
+
+        String partitionName = actualStatName.substring(pos + 1, nextDotPos);
+        return partitionName;
+      }
+    }
+    return null;
+  }
+
+  public static String parseInstanceName(String actualStatName, HelixManager manager)
+  {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    Builder kb = accessor.keyBuilder();
+    List<LiveInstance> liveInstances = accessor.getChildValues(kb.liveInstances());
+    for (LiveInstance instance : liveInstances)
+    {
+      String instanceName = instance.getInstanceName();
+      if(actualStatName.startsWith(instanceName))
+      {
+        return instanceName;
+      }
+    }
+    return null;
+  }
+
+  void updateAlertHistory(HelixManager manager)
+  {
+   // Write alert fire history to zookeeper
+    _alertBeanCollection.refreshAlertDelta(manager.getClusterName());
+    Map<String, String> delta = _alertBeanCollection.getRecentAlertDelta();
+    // Update history only when some beans has changed
+    if(delta.size() > 0)
+    {
+      delta.putAll(_alertActionTaken);
+      SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh:mm:ss:SSS");
+      String date = dateFormat.format(new Date());
+
+      HelixDataAccessor accessor = manager.getHelixDataAccessor();
+      Builder keyBuilder = accessor.keyBuilder();
+
+      HelixProperty property = accessor.getProperty(keyBuilder.alertHistory());
+      ZNRecord alertFiredHistory;
+      if(property == null)
+      {
+        alertFiredHistory = new ZNRecord(PropertyType.ALERT_HISTORY.toString());
+      }
+      else
+      {
+        alertFiredHistory = property.getRecord();
+      }
+      while(alertFiredHistory.getMapFields().size() >= ALERT_HISTORY_SIZE)
+      {
+        // ZNRecord uses TreeMap which is sorted ascending internally
+        String firstKey = (String)(alertFiredHistory.getMapFields().keySet().toArray()[0]);
+        alertFiredHistory.getMapFields().remove(firstKey);
+      }
+      alertFiredHistory.setMapField(date, delta);
+//      manager.getDataAccessor().setProperty(PropertyType.ALERT_HISTORY, alertFiredHistory);
+      accessor.setProperty(keyBuilder.alertHistory(), new AlertHistory(alertFiredHistory));
+      _alertBeanCollection.setAlertHistory(alertFiredHistory);
+    }
+  }
+
+  public ClusterAlertMBeanCollection getClusterAlertMBeanCollection()
+  {
+    return _alertBeanCollection;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
new file mode 100644
index 0000000..80db4d7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -0,0 +1,140 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.log4j.Logger;
+
+
+public class TaskAssignmentStage extends AbstractBaseStage
+{
+  private static Logger logger = Logger.getLogger(TaskAssignmentStage.class);
+
+  @Override
+  public void process(ClusterEvent event) throws Exception
+  {
+    long startTime = System.currentTimeMillis();
+    logger.info("START TaskAssignmentStage.process()");
+
+    HelixManager manager = event.getAttribute("helixmanager");
+    Map<String, Resource> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES.toString());
+    MessageThrottleStageOutput messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+
+    if (manager == null || resourceMap == null || messageOutput == null)
+    {
+      throw new StageException("Missing attributes in event:" + event
+          + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache");
+    }
+
+    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+    List<Message> messagesToSend = new ArrayList<Message>();
+    for (String resourceName : resourceMap.keySet())
+    {
+      Resource resource = resourceMap.get(resourceName);
+      for (Partition partition : resource.getPartitions())
+      {
+        List<Message> messages = messageOutput.getMessages(resourceName, partition);
+        messagesToSend.addAll(messages);
+      }
+    }
+
+    List<Message> outputMessages =
+        groupMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap);
+    sendMessages(dataAccessor, outputMessages);
+
+    long endTime = System.currentTimeMillis();
+    logger.info("END TaskAssignmentStage.process(). took: " + (endTime - startTime)
+        + " ms");
+
+  }
+
+  List<Message> groupMessage(Builder keyBuilder,
+                             List<Message> messages,
+                             Map<String, Resource> resourceMap)
+  {
+    // group messages by its CurrentState path + "/" + fromState + "/" + toState
+    Map<String, Message> groupMessages = new HashMap<String, Message>();
+    List<Message> outputMessages = new ArrayList<Message>();
+
+    Iterator<Message> iter = messages.iterator();
+    while (iter.hasNext())
+    {
+      Message message = iter.next();
+      String resourceName = message.getResourceName();
+      Resource resource = resourceMap.get(resourceName);
+      if (resource == null || !resource.getGroupMessageMode())
+      {
+        outputMessages.add(message);
+        continue;
+      }
+
+      String key =
+          keyBuilder.currentState(message.getTgtName(),
+                                  message.getTgtSessionId(),
+                                  message.getResourceName()).getPath()
+              + "/" + message.getFromState() + "/" + message.getToState();
+
+      if (!groupMessages.containsKey(key))
+      {
+        Message groupMessage = new Message(message.getRecord());
+        groupMessage.setGroupMessageMode(true);
+        outputMessages.add(groupMessage);
+        groupMessages.put(key, groupMessage);
+      }
+      groupMessages.get(key).addPartitionName(message.getPartitionName());
+    }
+
+    return outputMessages;
+  }
+
+  protected void sendMessages(HelixDataAccessor dataAccessor, List<Message> messages)
+  {
+    if (messages == null || messages.isEmpty())
+    {
+      return;
+    }
+
+    Builder keyBuilder = dataAccessor.keyBuilder();
+
+    List<PropertyKey> keys = new ArrayList<PropertyKey>();
+    for (Message message : messages)
+    {
+      logger.info("Sending Message " + message.getMsgId() + " to " + message.getTgtName()
+          + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
+          + " from:" + message.getFromState() + " to:" + message.getToState());
+
+      keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
+    }
+
+    dataAccessor.createChildren(keys, new ArrayList<Message>(messages));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/package-info.java b/helix-core/src/main/java/org/apache/helix/controller/stages/package-info.java
new file mode 100644
index 0000000..6dd3780
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Stages in Helix controller pipelines 
+ * 
+ */
+package org.apache.helix.controller.stages;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/BootstrapHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/BootstrapHandler.java b/helix-core/src/main/java/org/apache/helix/examples/BootstrapHandler.java
new file mode 100644
index 0000000..73f0151
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/BootstrapHandler.java
@@ -0,0 +1,112 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.examples;
+
+import java.util.UUID;
+
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+
+
+public class BootstrapHandler extends StateModelFactory<StateModel>
+{
+
+  @Override
+  public StateModel createNewStateModel(String stateUnitKey)
+  {
+    return new BootstrapStateModel(stateUnitKey);
+  }
+
+  @StateModelInfo(initialState = "OFFLINE", states = "{'OFFLINE','SLAVE','MASTER'}")
+  public static class BootstrapStateModel extends StateModel
+  {
+
+    private final String _stateUnitKey;
+
+    public BootstrapStateModel(String stateUnitKey)
+    {
+      _stateUnitKey = stateUnitKey;
+
+    }
+    @Transition(from = "MASTER", to = "SLAVE")
+    public void masterToSlave(Message message, NotificationContext context)
+    {
+      
+    }
+    @Transition(from = "OFFLINE", to = "SLAVE")
+    public void offlineToSlave(Message message, NotificationContext context)
+    {
+      System.out
+          .println("BootstrapProcess.BootstrapStateModel.offlineToSlave()");
+      HelixManager manager = context.getManager();
+      ClusterMessagingService messagingService = manager.getMessagingService();
+      Message requestBackupUriRequest = new Message(
+          MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+      requestBackupUriRequest
+          .setMsgSubType(BootstrapProcess.REQUEST_BOOTSTRAP_URL);
+      requestBackupUriRequest.setMsgState(MessageState.NEW);
+      Criteria recipientCriteria = new Criteria();
+      recipientCriteria.setInstanceName("*");
+      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+      recipientCriteria.setResource(message.getResourceName());
+      recipientCriteria.setPartition(message.getPartitionName());
+      recipientCriteria.setSessionSpecific(true);
+      // wait for 30 seconds
+      int timeout = 30000;
+      BootstrapReplyHandler responseHandler = new BootstrapReplyHandler();
+
+      int sentMessageCount = messagingService.sendAndWait(recipientCriteria,
+          requestBackupUriRequest, responseHandler, timeout);
+      if (sentMessageCount == 0)
+      {
+        // could not find any other node hosting the partition
+      } else if (responseHandler.getBootstrapUrl() != null)
+      {
+        System.out.println("Got bootstrap url:"+ responseHandler.getBootstrapUrl() );
+        System.out.println("Got backup time:"+ responseHandler.getBootstrapTime() );
+        // Got the url fetch it
+      } else
+      {
+        // Either go to error state
+        // throw new Exception("Cant find backup/bootstrap data");
+        // Request some node to start backup process
+      }
+    }
+    @Transition(from = "SLAVE", to = "OFFLINE")
+    public void slaveToOffline(Message message, NotificationContext context)
+    {
+      System.out
+          .println("BootstrapProcess.BootstrapStateModel.slaveToOffline()");
+    }
+    @Transition(from = "SLAVE", to = "MASTER")
+    public void slaveToMaster(Message message, NotificationContext context)
+    {
+      System.out
+          .println("BootstrapProcess.BootstrapStateModel.slaveToMaster()");
+    }
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java b/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
new file mode 100644
index 0000000..b8e3bfe
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
@@ -0,0 +1,405 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.examples;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Date;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.tools.ClusterStateVerifier;
+
+
+/**
+ * This process does little more than handling the state transition messages.
+ * This is generally the case when the server needs to bootstrap when it comes
+ * up.<br>
+ * Flow for a typical Master-slave state model<br>
+ * <ul>
+ * <li>Gets OFFLINE-SLAVE transition</li>
+ * <li>Figure out if it has any data and how old it is for the SLAVE partition</li>
+ * <li>If the data is fresh enough it can probably catch up from the replication
+ * stream of the master</li>
+ * <li>If not, then it can use the messaging service provided by cluster manager
+ * to talk other nodes to figure out if they have any backup</li>
+ * </li>
+ * <li>Once it gets a response from other nodes in the cluster the process can
+ * decide which back up it wants to use to bootstrap</li>
+ * </ul>
+ *
+ * @author kgopalak
+ *
+ */
+public class BootstrapProcess
+{
+  static final String REQUEST_BOOTSTRAP_URL = "REQUEST_BOOTSTRAP_URL";
+  public static final String zkServer = "zkSvr";
+  public static final String cluster = "cluster";
+  public static final String hostAddress = "host";
+  public static final String hostPort = "port";
+  public static final String relayCluster = "relayCluster";
+  public static final String help = "help";
+  public static final String configFile = "configFile";
+  public static final String stateModel = "stateModelType";
+  public static final String transDelay = "transDelay";
+
+  private final String zkConnectString;
+  private final String clusterName;
+  private final String instanceName;
+  private final String stateModelType;
+  private HelixManager manager;
+
+//  private StateMachineEngine genericStateMachineHandler;
+
+  private String _file = null;
+  private StateModelFactory<StateModel> stateModelFactory;
+  private final int delay;
+
+  public BootstrapProcess(String zkConnectString, String clusterName,
+      String instanceName, String file, String stateModel, int delay)
+  {
+    this.zkConnectString = zkConnectString;
+    this.clusterName = clusterName;
+    this.instanceName = instanceName;
+    this._file = file;
+    stateModelType = stateModel;
+    this.delay = delay;
+  }
+
+  public void start() throws Exception
+  {
+    if (_file == null)
+    {
+      manager = HelixManagerFactory.getZKHelixManager(clusterName,
+                                                          instanceName,
+                                                          InstanceType.PARTICIPANT,
+                                                          zkConnectString);
+
+    }
+    else
+    {
+      manager = HelixManagerFactory.getStaticFileHelixManager(clusterName,
+                                                                  instanceName,
+                                                                  InstanceType.PARTICIPANT,
+                                                                  _file);
+
+    }
+    stateModelFactory = new BootstrapHandler();
+//    genericStateMachineHandler = new StateMachineEngine();
+//    genericStateMachineHandler.registerStateModelFactory("MasterSlave", stateModelFactory);
+    
+    StateMachineEngine stateMach = manager.getStateMachineEngine();
+    stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+    
+    manager.getMessagingService().registerMessageHandlerFactory(
+        MessageType.STATE_TRANSITION.toString(), stateMach);
+    manager.getMessagingService().registerMessageHandlerFactory(
+        MessageType.USER_DEFINE_MSG.toString(),
+        new CustomMessageHandlerFactory());
+    manager.connect();
+    if (_file != null)
+    {
+      ClusterStateVerifier.verifyFileBasedClusterStates(_file, instanceName,
+          stateModelFactory);
+
+    }
+  }
+
+  public static class CustomMessageHandlerFactory implements
+      MessageHandlerFactory
+  {
+
+    @Override
+    public MessageHandler createHandler(Message message,
+        NotificationContext context)
+    {
+
+      return new CustomMessageHandler(message, context);
+    }
+
+    @Override
+    public String getMessageType()
+    {
+      return MessageType.USER_DEFINE_MSG.toString();
+    }
+
+    @Override
+    public void reset()
+    {
+
+    }
+
+    static class CustomMessageHandler extends MessageHandler
+    {
+
+      public CustomMessageHandler(Message message, NotificationContext context)
+      {
+        super(message, context);
+        // TODO Auto-generated constructor stub
+      }
+
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException
+      {
+        String hostName;
+        HelixTaskResult result = new HelixTaskResult();
+        try
+        {
+          hostName = InetAddress.getLocalHost().getCanonicalHostName();
+        } catch (UnknownHostException e)
+        {
+          hostName = "UNKNOWN";
+        }
+        String port = "2134";
+        String msgSubType = _message.getMsgSubType();
+        if (msgSubType.equals(REQUEST_BOOTSTRAP_URL))
+        {
+          result.getTaskResultMap().put(
+              "BOOTSTRAP_URL",
+              "http://" + hostName + ":" + port
+                  + "/getFile?path=/data/bootstrap/"
+                  + _message.getResourceName() + "/"
+                  + _message.getPartitionName() + ".tar");
+
+          result.getTaskResultMap().put(
+              "BOOTSTRAP_TIME",
+              ""+new Date().getTime());
+        }
+
+        result.setSuccess(true);
+        return result;
+      }
+
+      @Override
+      public void onError( Exception e, ErrorCode code, ErrorType type)
+      {
+        e.printStackTrace();
+      }
+    }
+  }
+
+
+  @SuppressWarnings("static-access")
+  private static Options constructCommandLineOptions()
+  {
+    Option helpOption = OptionBuilder.withLongOpt(help)
+        .withDescription("Prints command-line options info").create();
+
+    Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
+        .withDescription("Provide zookeeper address").create();
+    zkServerOption.setArgs(1);
+    zkServerOption.setRequired(true);
+    zkServerOption.setArgName("ZookeeperServerAddress(Required)");
+
+    Option clusterOption = OptionBuilder.withLongOpt(cluster)
+        .withDescription("Provide cluster name").create();
+    clusterOption.setArgs(1);
+    clusterOption.setRequired(true);
+    clusterOption.setArgName("Cluster name (Required)");
+
+    Option hostOption = OptionBuilder.withLongOpt(hostAddress)
+        .withDescription("Provide host name").create();
+    hostOption.setArgs(1);
+    hostOption.setRequired(true);
+    hostOption.setArgName("Host name (Required)");
+
+    Option portOption = OptionBuilder.withLongOpt(hostPort)
+        .withDescription("Provide host port").create();
+    portOption.setArgs(1);
+    portOption.setRequired(true);
+    portOption.setArgName("Host port (Required)");
+
+    Option stateModelOption = OptionBuilder.withLongOpt(stateModel)
+        .withDescription("StateModel Type").create();
+    stateModelOption.setArgs(1);
+    stateModelOption.setRequired(true);
+    stateModelOption.setArgName("StateModel Type (Required)");
+
+    // add an option group including either --zkSvr or --configFile
+    Option fileOption = OptionBuilder.withLongOpt(configFile)
+        .withDescription("Provide file to read states/messages").create();
+    fileOption.setArgs(1);
+    fileOption.setRequired(true);
+    fileOption.setArgName("File to read states/messages (Optional)");
+
+    Option transDelayOption = OptionBuilder.withLongOpt(transDelay)
+        .withDescription("Provide state trans delay").create();
+    transDelayOption.setArgs(1);
+    transDelayOption.setRequired(false);
+    transDelayOption.setArgName("Delay time in state transition, in MS");
+
+    OptionGroup optionGroup = new OptionGroup();
+    optionGroup.addOption(zkServerOption);
+    optionGroup.addOption(fileOption);
+
+    Options options = new Options();
+    options.addOption(helpOption);
+    // options.addOption(zkServerOption);
+    options.addOption(clusterOption);
+    options.addOption(hostOption);
+    options.addOption(portOption);
+    options.addOption(stateModelOption);
+    options.addOption(transDelayOption);
+
+    options.addOptionGroup(optionGroup);
+
+    return options;
+  }
+
+  public static void printUsage(Options cliOptions)
+  {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.printHelp("java " + BootstrapProcess.class.getName(), cliOptions);
+  }
+
+  public static CommandLine processCommandLineArgs(String[] cliArgs)
+      throws Exception
+  {
+    CommandLineParser cliParser = new GnuParser();
+    Options cliOptions = constructCommandLineOptions();
+    try
+    {
+      return cliParser.parse(cliOptions, cliArgs);
+    } catch (ParseException pe)
+    {
+      System.err
+          .println("CommandLineClient: failed to parse command-line options: "
+              + pe.toString());
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+    return null;
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    String zkConnectString = "localhost:2181";
+    String clusterName = "storage-integration-cluster";
+    String instanceName = "localhost_8905";
+    String file = null;
+    String stateModelValue = "MasterSlave";
+    int delay = 0;
+    boolean skipZeroArgs = true;// false is for dev testing
+    if (!skipZeroArgs || args.length > 0)
+    {
+      CommandLine cmd = processCommandLineArgs(args);
+      zkConnectString = cmd.getOptionValue(zkServer);
+      clusterName = cmd.getOptionValue(cluster);
+
+      String host = cmd.getOptionValue(hostAddress);
+      String portString = cmd.getOptionValue(hostPort);
+      int port = Integer.parseInt(portString);
+      instanceName = host + "_" + port;
+
+      file = cmd.getOptionValue(configFile);
+      if (file != null)
+      {
+        File f = new File(file);
+        if (!f.exists())
+        {
+          System.err.println("static config file doesn't exist");
+          System.exit(1);
+        }
+      }
+
+      stateModelValue = cmd.getOptionValue(stateModel);
+      if (cmd.hasOption(transDelay))
+      {
+        try
+        {
+          delay = Integer.parseInt(cmd.getOptionValue(transDelay));
+          if (delay < 0)
+          {
+            throw new Exception("delay must be positive");
+          }
+        } catch (Exception e)
+        {
+          e.printStackTrace();
+          delay = 0;
+        }
+      }
+    }
+    // Espresso_driver.py will consume this
+    System.out.println("Starting Process with ZK:" + zkConnectString);
+
+    BootstrapProcess process = new BootstrapProcess(zkConnectString,
+        clusterName, instanceName, file, stateModelValue, delay);
+
+    process.start();
+    Thread.currentThread().join();
+  }
+}
+
+class BootstrapReplyHandler extends AsyncCallback
+{
+
+  public BootstrapReplyHandler()
+  {
+  }
+
+  private String bootstrapUrl;
+  private String bootstrapTime;
+
+  @Override
+  public void onTimeOut()
+  {
+    System.out.println("Timed out");
+  }
+
+  public String getBootstrapUrl()
+  {
+    return bootstrapUrl;
+  }
+
+  public String getBootstrapTime()
+  {
+    return bootstrapTime;
+  }
+
+  @Override
+  public void onReplyMessage(Message message)
+  {
+    String time = message.getResultMap().get("BOOTSTRAP_TIME");
+    if (bootstrapTime == null || time.compareTo(bootstrapTime) > -1)
+    {
+      bootstrapTime = message.getResultMap().get("BOOTSTRAP_TIME");
+      bootstrapUrl = message.getResultMap().get("BOOTSTRAP_URL");
+    }
+  }
+
+}