You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC
[9/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
new file mode 100644
index 0000000..c090835
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -0,0 +1,184 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Compares the currentState,pendingState with IdealState and generate messages
+ *
+ * @author kgopalak
+ *
+ */
+public class MessageGenerationPhase extends AbstractBaseStage
+{
+ private static Logger logger = Logger.getLogger(MessageGenerationPhase.class);
+
+ @Override
+ public void process(ClusterEvent event) throws Exception
+ {
+ HelixManager manager = event.getAttribute("helixmanager");
+ ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+ Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE
+ .toString());
+ BestPossibleStateOutput bestPossibleStateOutput = event
+ .getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+ if (manager == null || cache == null || resourceMap == null || currentStateOutput == null
+ || bestPossibleStateOutput == null)
+ {
+ throw new StageException("Missing attributes in event:" + event
+ + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
+ }
+
+ Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+ Map<String, String> sessionIdMap = new HashMap<String, String>();
+
+ for (LiveInstance liveInstance : liveInstances.values())
+ {
+ sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
+ }
+ MessageGenerationOutput output = new MessageGenerationOutput();
+
+ for (String resourceName : resourceMap.keySet())
+ {
+ Resource resource = resourceMap.get(resourceName);
+ int bucketSize = resource.getBucketSize();
+
+ StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
+
+ for (Partition partition : resource.getPartitions())
+ {
+ Map<String, String> instanceStateMap = bestPossibleStateOutput.getInstanceStateMap(
+ resourceName, partition);
+
+ for (String instanceName : instanceStateMap.keySet())
+ {
+ String desiredState = instanceStateMap.get(instanceName);
+
+ String currentState = currentStateOutput.getCurrentState(resourceName, partition,
+ instanceName);
+ if (currentState == null)
+ {
+ currentState = stateModelDef.getInitialState();
+ }
+
+ if (desiredState.equalsIgnoreCase(currentState))
+ {
+ continue;
+ }
+
+ String pendingState = currentStateOutput.getPendingState(resourceName, partition,
+ instanceName);
+
+ String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
+ if (nextState == null)
+ {
+ logger.error("Unable to find a next state for partition: "
+ + partition.getPartitionName() + " from stateModelDefinition"
+ + stateModelDef.getClass() + " from:" + currentState + " to:" + desiredState);
+ continue;
+ }
+
+ if (pendingState != null)
+ {
+ if (nextState.equalsIgnoreCase(pendingState))
+ {
+ logger.debug("Message already exists for " + instanceName + " to transit "
+ + partition.getPartitionName() + " from " + currentState + " to " + nextState);
+ } else if (currentState.equalsIgnoreCase(pendingState))
+ {
+ logger.info("Message hasn't been removed for " + instanceName + " to transit"
+ + partition.getPartitionName() + " to " + pendingState + ", desiredState: "
+ + desiredState);
+ } else
+ {
+ logger.info("IdealState changed before state transition completes for "
+ + partition.getPartitionName() + " on " + instanceName + ", pendingState: "
+ + pendingState + ", currentState: " + currentState + ", nextState: " + nextState);
+ }
+ } else
+ {
+ Message message = createMessage(manager, resourceName, partition.getPartitionName(),
+ instanceName, currentState, nextState, sessionIdMap.get(instanceName),
+ stateModelDef.getId(), resource.getStateModelFactoryname(), bucketSize);
+ IdealState idealState = cache.getIdealState(resourceName);
+ // Set timeout of needed
+ String stateTransition = currentState + "-" + nextState + "_"
+ + Message.Attributes.TIMEOUT;
+ if (idealState != null
+ && idealState.getRecord().getSimpleField(stateTransition) != null)
+ {
+ try
+ {
+ int timeout = Integer.parseInt(idealState.getRecord().getSimpleField(
+ stateTransition));
+ if (timeout > 0)
+ {
+ message.setExecutionTimeout(timeout);
+ }
+ } catch (Exception e)
+ {
+ logger.error("", e);
+ }
+ }
+ message.getRecord().setSimpleField("ClusterEventName", event.getName());
+ output.addMessage(resourceName, partition, message);
+ }
+ }
+ }
+ }
+ event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
+ }
+
+ private Message createMessage(HelixManager manager, String resourceName, String partitionName,
+ String instanceName, String currentState, String nextState, String sessionId,
+ String stateModelDefName, String stateModelFactoryName, int bucketSize)
+ {
+ String uuid = UUID.randomUUID().toString();
+ Message message = new Message(MessageType.STATE_TRANSITION, uuid);
+ message.setSrcName(manager.getInstanceName());
+ message.setTgtName(instanceName);
+ message.setMsgState(MessageState.NEW);
+ message.setPartitionName(partitionName);
+ message.setResourceName(resourceName);
+ message.setFromState(currentState);
+ message.setToState(nextState);
+ message.setTgtSessionId(sessionId);
+ message.setSrcSessionId(manager.getSessionId());
+ message.setStateModelDef(stateModelDefName);
+ message.setStateModelFactoryName(stateModelFactoryName);
+ message.setBucketSize(bucketSize);
+
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
new file mode 100644
index 0000000..b3a7015
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -0,0 +1,337 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+
+public class MessageSelectionStage extends AbstractBaseStage
+{
+ private static final Logger LOG = Logger.getLogger(MessageSelectionStage.class);
+
+ static class Bounds
+ {
+ private int upper;
+ private int lower;
+
+ public Bounds(int lower, int upper)
+ {
+ this.lower = lower;
+ this.upper = upper;
+ }
+
+ public void increaseUpperBound()
+ {
+ upper++;
+ }
+
+ public void increaseLowerBound()
+ {
+ lower++;
+ }
+
+ public void decreaseUpperBound()
+ {
+ upper--;
+ }
+
+ public void decreaseLowerBound()
+ {
+ lower--;
+ }
+
+ public int getLowerBound()
+ {
+ return lower;
+ }
+
+ public int getUpperBound()
+ {
+ return upper;
+ }
+ }
+
+ @Override
+ public void process(ClusterEvent event) throws Exception
+ {
+ ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+ Map<String, Resource> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ CurrentStateOutput currentStateOutput =
+ event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ MessageGenerationOutput messageGenOutput =
+ event.getAttribute(AttributeName.MESSAGES_ALL.toString());
+ if (cache == null || resourceMap == null || currentStateOutput == null
+ || messageGenOutput == null)
+ {
+ throw new StageException("Missing attributes in event:" + event
+ + ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
+ }
+
+ MessageSelectionStageOutput output = new MessageSelectionStageOutput();
+
+ for (String resourceName : resourceMap.keySet())
+ {
+ Resource resource = resourceMap.get(resourceName);
+ StateModelDefinition stateModelDef =
+ cache.getStateModelDef(resource.getStateModelDefRef());
+
+ Map<String, Integer> stateTransitionPriorities =
+ getStateTransitionPriorityMap(stateModelDef);
+ IdealState idealState = cache.getIdealState(resourceName);
+ Map<String, Bounds> stateConstraints =
+ computeStateConstraints(stateModelDef, idealState, cache);
+
+ for (Partition partition : resource.getPartitions())
+ {
+ List<Message> messages = messageGenOutput.getMessages(resourceName, partition);
+ List<Message> selectedMessages =
+ selectMessages(cache.getLiveInstances(),
+ currentStateOutput.getCurrentStateMap(resourceName, partition),
+ currentStateOutput.getPendingStateMap(resourceName, partition),
+ messages,
+ stateConstraints,
+ stateTransitionPriorities,
+ stateModelDef.getInitialState());
+ output.addMessages(resourceName, partition, selectedMessages);
+ }
+ }
+ event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
+ }
+
+ // TODO: This method deserves its own class. The class should not understand helix but
+ // just be
+ // able to solve the problem using the algo. I think the method is following that but if
+ // we don't move it to another class its quite easy to break that contract
+ /**
+ * greedy message selection algorithm: 1) calculate CS+PS state lower/upper-bounds 2)
+ * group messages by state transition and sorted by priority 3) from highest priority to
+ * lowest, for each message group with the same transition add message one by one and
+ * make sure state constraint is not violated update state lower/upper-bounds when a new
+ * message is selected
+ *
+ * @param currentStates
+ * @param pendingStates
+ * @param messages
+ * @param stateConstraints
+ * : STATE -> bound (lower:upper)
+ * @param stateTransitionPriorities
+ * : FROME_STATE-TO_STATE -> priority
+ * @return: selected messages
+ */
+ List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
+ Map<String, String> currentStates,
+ Map<String, String> pendingStates,
+ List<Message> messages,
+ Map<String, Bounds> stateConstraints,
+ final Map<String, Integer> stateTransitionPriorities,
+ String initialState)
+ {
+ if (messages == null || messages.isEmpty())
+ {
+ return Collections.emptyList();
+ }
+
+ List<Message> selectedMessages = new ArrayList<Message>();
+ Map<String, Bounds> bounds = new HashMap<String, Bounds>();
+
+ // count currentState, if no currentState, count as in initialState
+ for (String instance : liveInstances.keySet())
+ {
+ String state = initialState;
+ if (currentStates.containsKey(instance))
+ {
+ state = currentStates.get(instance);
+ }
+
+ if (!bounds.containsKey(state))
+ {
+ bounds.put(state, new Bounds(0, 0));
+ }
+ bounds.get(state).increaseLowerBound();
+ bounds.get(state).increaseUpperBound();
+ }
+
+ // count pendingStates
+ for (String instance : pendingStates.keySet())
+ {
+ String state = pendingStates.get(instance);
+ if (!bounds.containsKey(state))
+ {
+ bounds.put(state, new Bounds(0, 0));
+ }
+ // TODO: add lower bound, need to refactor pendingState to include fromState also
+ bounds.get(state).increaseUpperBound();
+ }
+
+ // group messages based on state transition priority
+ Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
+ new TreeMap<Integer, List<Message>>();
+ for (Message message : messages)
+ {
+ String fromState = message.getFromState();
+ String toState = message.getToState();
+ String transition = fromState + "-" + toState;
+ int priority = Integer.MAX_VALUE;
+
+ if (stateTransitionPriorities.containsKey(transition))
+ {
+ priority = stateTransitionPriorities.get(transition);
+ }
+
+ if (!messagesGroupByStateTransitPriority.containsKey(priority))
+ {
+ messagesGroupByStateTransitPriority.put(priority, new ArrayList<Message>());
+ }
+ messagesGroupByStateTransitPriority.get(priority).add(message);
+ }
+
+ // select messages
+ for (List<Message> messageList : messagesGroupByStateTransitPriority.values())
+ {
+ for (Message message : messageList)
+ {
+ String fromState = message.getFromState();
+ String toState = message.getToState();
+
+ if (!bounds.containsKey(fromState))
+ {
+ LOG.error("Message's fromState is not in currentState. message: " + message);
+ continue;
+ }
+
+ if (!bounds.containsKey(toState))
+ {
+ bounds.put(toState, new Bounds(0, 0));
+ }
+
+ // check lower bound of fromState
+ if (stateConstraints.containsKey(fromState))
+ {
+ int newLowerBound = bounds.get(fromState).getLowerBound() - 1;
+ if (newLowerBound < 0)
+ {
+ LOG.error("Number of currentState in " + fromState
+ + " is less than number of messages transiting from " + fromState);
+ continue;
+ }
+
+ if (newLowerBound < stateConstraints.get(fromState).getLowerBound())
+ {
+ continue;
+ }
+ }
+
+ // check upper bound of toState
+ if (stateConstraints.containsKey(toState))
+ {
+ int newUpperBound = bounds.get(toState).getUpperBound() + 1;
+ if (newUpperBound > stateConstraints.get(toState).getUpperBound())
+ {
+ continue;
+ }
+ }
+
+ selectedMessages.add(message);
+ bounds.get(fromState).increaseLowerBound();
+ bounds.get(toState).increaseUpperBound();
+ }
+ }
+
+ return selectedMessages;
+ }
+
+ /**
+ * TODO: This code is duplicate in multiple places. Can we do it in to one place in the
+ * beginning and compute the stateConstraint instance once and re use at other places.
+ * Each IdealState must have a constraint object associated with it
+ */
+ private Map<String, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
+ IdealState idealState,
+ ClusterDataCache cache)
+ {
+ Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
+
+ List<String> statePriorityList = stateModelDefinition.getStatesPriorityList();
+ for (String state : statePriorityList)
+ {
+ String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state);
+ int max = -1;
+ if ("N".equals(numInstancesPerState))
+ {
+ max = cache.getLiveInstances().size();
+ }
+ else if ("R".equals(numInstancesPerState))
+ {
+ // idealState is null when resource has been dropped,
+ // R can't be evaluated and ignore state constraints
+ if (idealState != null)
+ {
+ max = cache.getReplicas(idealState.getResourceName());
+ }
+ }
+ else
+ {
+ try
+ {
+ max = Integer.parseInt(numInstancesPerState);
+ }
+ catch (Exception e)
+ {
+ // use -1
+ }
+ }
+
+ if (max > -1)
+ {
+ // if state has no constraint, will not put in map
+ stateConstraints.put(state, new Bounds(0, max));
+ }
+ }
+
+ return stateConstraints;
+ }
+
+ // TODO: if state transition priority is not provided then use lexicographical sorting
+ // so that behavior is consistent
+ private Map<String, Integer> getStateTransitionPriorityMap(StateModelDefinition stateModelDef)
+ {
+ Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
+ List<String> stateTransitionPriorityList =
+ stateModelDef.getStateTransitionPriorityList();
+ for (int i = 0; i < stateTransitionPriorityList.size(); i++)
+ {
+ stateTransitionPriorities.put(stateTransitionPriorityList.get(i), i);
+ }
+
+ return stateTransitionPriorities;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java
new file mode 100644
index 0000000..e5f46c9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+
+
+public class MessageSelectionStageOutput
+{
+ private final Map<String, Map<Partition, List<Message>>> _messagesMap;
+
+ public MessageSelectionStageOutput()
+ {
+ _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
+ }
+
+ public void addMessages(String resourceName, Partition partition,
+ List<Message> selectedMessages)
+ {
+ if (!_messagesMap.containsKey(resourceName))
+ {
+ _messagesMap.put(resourceName,
+ new HashMap<Partition, List<Message>>());
+ }
+ _messagesMap.get(resourceName).put(partition, selectedMessages);
+
+ }
+
+ public List<Message> getMessages(String resourceName,
+ Partition partition)
+ {
+ Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
+ if (map != null)
+ {
+ return map.get(partition);
+ }
+ return Collections.emptyList();
+
+ }
+
+ @Override
+ public String toString()
+ {
+ return _messagesMap.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
new file mode 100644
index 0000000..d5447b9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
@@ -0,0 +1,229 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
+import org.apache.helix.model.ClusterConstraints.ConstraintItem;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.ClusterConstraints.ConstraintValue;
+import org.apache.log4j.Logger;
+
+
+public class MessageThrottleStage extends AbstractBaseStage
+{
+ private static final Logger LOG =
+ Logger.getLogger(MessageThrottleStage.class.getName());
+
+ int valueOf(String valueStr)
+ {
+ int value = Integer.MAX_VALUE;
+
+ try
+ {
+ ConstraintValue valueToken = ConstraintValue.valueOf(valueStr);
+ switch (valueToken)
+ {
+ case ANY:
+ value = Integer.MAX_VALUE;
+ break;
+ default:
+ LOG.error("Invalid constraintValue token:" + valueStr + ". Use default value:"
+ + Integer.MAX_VALUE);
+ break;
+ }
+ }
+ catch (Exception e)
+ {
+ try
+ {
+ value = Integer.parseInt(valueStr);
+ }
+ catch (NumberFormatException ne)
+ {
+ LOG.error("Invalid constraintValue string:" + valueStr + ". Use default value:"
+ + Integer.MAX_VALUE);
+ }
+ }
+ return value;
+ }
+
+ /**
+ * constraints are selected in the order of the following rules: 1) don't select
+ * constraints with CONSTRAINT_VALUE=ANY; 2) if one constraint is more specific than the
+ * other, select the most specific one 3) if a message matches multiple constraints of
+ * incomparable specificity, select the one with the minimum value 4) if a message
+ * matches multiple constraints of incomparable specificity, and they all have the same
+ * value, select the first in alphabetic order
+ */
+ Set<ConstraintItem> selectConstraints(Set<ConstraintItem> items,
+ Map<ConstraintAttribute, String> attributes)
+ {
+ Map<String, ConstraintItem> selectedItems = new HashMap<String, ConstraintItem>();
+ for (ConstraintItem item : items)
+ {
+ // don't select constraints with CONSTRAINT_VALUE=ANY
+ if (item.getConstraintValue().equals(ConstraintValue.ANY.toString()))
+ {
+ continue;
+ }
+
+ String key = item.filter(attributes).toString();
+ if (!selectedItems.containsKey(key))
+ {
+ selectedItems.put(key, item);
+ }
+ else
+ {
+ ConstraintItem existingItem = selectedItems.get(key);
+ if (existingItem.match(item.getAttributes()))
+ {
+ // item is more specific than existingItem
+ selectedItems.put(key, item);
+ }
+ else if (!item.match(existingItem.getAttributes()))
+ {
+ // existingItem and item are of incomparable specificity
+ int value = valueOf(item.getConstraintValue());
+ int existingValue = valueOf(existingItem.getConstraintValue());
+ if (value < existingValue)
+ {
+ // item's constraint value is less than that of existingItem
+ selectedItems.put(key, item);
+ }
+ else if (value == existingValue)
+ {
+ if (item.toString().compareTo(existingItem.toString()) < 0)
+ {
+ // item is ahead of existingItem in alphabetic order
+ selectedItems.put(key, item);
+ }
+ }
+ }
+ }
+ }
+ return new HashSet<ConstraintItem>(selectedItems.values());
+ }
+
+ @Override
+ public void process(ClusterEvent event) throws Exception
+ {
+ ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+ MessageSelectionStageOutput msgSelectionOutput =
+ event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ Map<String, Resource> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+
+ if (cache == null || resourceMap == null || msgSelectionOutput == null)
+ {
+ throw new StageException("Missing attributes in event: " + event
+ + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
+ }
+
+ MessageThrottleStageOutput output = new MessageThrottleStageOutput();
+
+ ClusterConstraints constraint = cache.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
+ Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
+
+ if (constraint != null)
+ {
+ // go through all pending messages, they should be counted but not throttled
+ for (String instance : cache.getLiveInstances().keySet())
+ {
+ throttle(throttleCounterMap,
+ constraint,
+ new ArrayList<Message>(cache.getMessages(instance).values()),
+ false);
+ }
+ }
+
+ // go through all new messages, throttle if necessary
+ // assume messages should be sorted by state transition priority in messageSelection stage
+ for (String resourceName : resourceMap.keySet())
+ {
+ Resource resource = resourceMap.get(resourceName);
+ for (Partition partition : resource.getPartitions())
+ {
+ List<Message> messages = msgSelectionOutput.getMessages(resourceName, partition);
+ if (constraint != null && messages != null && messages.size() > 0)
+ {
+ messages = throttle(throttleCounterMap, constraint, messages, true);
+ }
+ output.addMessages(resourceName, partition, messages);
+ }
+ }
+
+ event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output);
+ }
+
+ private List<Message> throttle(Map<String, Integer> throttleMap,
+ ClusterConstraints constraint,
+ List<Message> messages,
+ final boolean needThrottle)
+ {
+
+ List<Message> throttleOutputMsgs = new ArrayList<Message>();
+ for (Message message : messages)
+ {
+ Map<ConstraintAttribute, String> msgAttr = ClusterConstraints.toConstraintAttributes(message);
+
+ Set<ConstraintItem> matches = constraint.match(msgAttr);
+ matches = selectConstraints(matches, msgAttr);
+
+ boolean msgThrottled = false;
+ for (ConstraintItem item : matches)
+ {
+ String key = item.filter(msgAttr).toString();
+ if (!throttleMap.containsKey(key))
+ {
+ throttleMap.put(key, valueOf(item.getConstraintValue()));
+ }
+ int value = throttleMap.get(key);
+ throttleMap.put(key, --value);
+
+ if (needThrottle && value < 0)
+ {
+ msgThrottled = true;
+
+ if (LOG.isDebugEnabled())
+ {
+ // TODO: printout constraint item that throttles the message
+ LOG.debug("message: " + message + " is throttled by constraint: " + item);
+ }
+ }
+ }
+
+ if (!msgThrottled)
+ {
+ throttleOutputMsgs.add(message);
+ }
+ }
+
+ return throttleOutputMsgs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java
new file mode 100644
index 0000000..73694fa
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+
+
+public class MessageThrottleStageOutput
+{
+ private final Map<String, Map<Partition, List<Message>>> _messagesMap;
+
+ public MessageThrottleStageOutput()
+ {
+ _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
+ }
+
+ public void addMessages(String resourceName,
+ Partition partition,
+ List<Message> selectedMessages)
+ {
+ if (!_messagesMap.containsKey(resourceName))
+ {
+ _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>());
+ }
+ _messagesMap.get(resourceName).put(partition, selectedMessages);
+
+ }
+
+ public List<Message> getMessages(String resourceName, Partition partition)
+ {
+ Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
+ if (map != null)
+ {
+ return map.get(partition);
+ }
+ return Collections.emptyList();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
new file mode 100644
index 0000000..b6891b7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.log4j.Logger;
+
+
+public class ReadClusterDataStage extends AbstractBaseStage
+{
+ private static final Logger logger = Logger
+ .getLogger(ReadClusterDataStage.class.getName());
+ ClusterDataCache _cache;
+
+ public ReadClusterDataStage()
+ {
+ _cache = new ClusterDataCache();
+ }
+
+ @Override
+ public void process(ClusterEvent event) throws Exception
+ {
+ long startTime = System.currentTimeMillis();
+ logger.info("START ReadClusterDataStage.process()");
+
+
+ HelixManager manager = event.getAttribute("helixmanager");
+ if (manager == null)
+ {
+ throw new StageException("HelixManager attribute value is null");
+ }
+ HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+ _cache.refresh(dataAccessor);
+
+ ClusterStatusMonitor clusterStatusMonitor = (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+ if(clusterStatusMonitor != null)
+ {
+ int disabledInstances = 0;
+ int disabledPartitions = 0;
+ for(InstanceConfig config : _cache._instanceConfigMap.values())
+ {
+ if(config.getInstanceEnabled() == false)
+ {
+ disabledInstances ++;
+ }
+ if(config.getDisabledPartitionMap() != null)
+ {
+ disabledPartitions += config.getDisabledPartitionMap().size();
+ }
+ }
+ clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(), _cache._instanceConfigMap.size(),
+ disabledInstances, disabledPartitions);
+ }
+
+ event.addAttribute("ClusterDataCache", _cache);
+
+ long endTime = System.currentTimeMillis();
+ logger.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
new file mode 100644
index 0000000..1922073
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
@@ -0,0 +1,55 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.log4j.Logger;
+
+
+public class ReadHealthDataStage extends AbstractBaseStage
+{
+ private static final Logger LOG = Logger.getLogger(ReadHealthDataStage.class.getName());
+ HealthDataCache _cache;
+
+ public ReadHealthDataStage()
+ {
+ _cache = new HealthDataCache();
+ }
+
+ @Override
+ public void process(ClusterEvent event) throws Exception
+ {
+ long startTime = System.currentTimeMillis();
+
+ HelixManager manager = event.getAttribute("helixmanager");
+ if (manager == null)
+ {
+ throw new StageException("HelixManager attribute value is null");
+ }
+ // DataAccessor dataAccessor = manager.getDataAccessor();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ _cache.refresh(accessor);
+
+ event.addAttribute("HealthDataCache", _cache);
+
+ long processLatency = System.currentTimeMillis() - startTime;
+ addLatencyToMonitor(event, processLatency);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
new file mode 100644
index 0000000..72b4e63
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -0,0 +1,156 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Resource;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This stage computes all the resources in a cluster. The resources are
+ * computed from IdealStates -> this gives all the resources currently active
+ * CurrentState for liveInstance-> Helps in finding resources that are inactive
+ * and needs to be dropped
+ *
+ * @author kgopalak
+ *
+ */
+public class ResourceComputationStage extends AbstractBaseStage
+{
+ private static Logger LOG = Logger.getLogger(ResourceComputationStage.class);
+
+ @Override
+ public void process(ClusterEvent event) throws Exception
+ {
+ ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+ if (cache == null)
+ {
+ throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
+ }
+
+ Map<String, IdealState> idealStates = cache.getIdealStates();
+
+ Map<String, Resource> resourceMap = new LinkedHashMap<String, Resource>();
+
+ if (idealStates != null && idealStates.size() > 0)
+ {
+ for (IdealState idealState : idealStates.values())
+ {
+ Set<String> partitionSet = idealState.getPartitionSet();
+ String resourceName = idealState.getResourceName();
+
+ for (String partition : partitionSet)
+ {
+ addPartition(partition, resourceName, resourceMap);
+ Resource resource = resourceMap.get(resourceName);
+ resource.setStateModelDefRef(idealState.getStateModelDefRef());
+ resource.setStateModelFactoryName(idealState.getStateModelFactoryName());
+ resource.setBucketSize(idealState.getBucketSize());
+ resource.setGroupMessageMode(idealState.getGroupMessageMode());
+ }
+ }
+ }
+
+ // It's important to get partitions from CurrentState as well since the
+ // idealState might be removed.
+ Map<String, LiveInstance> availableInstances = cache.getLiveInstances();
+
+ if (availableInstances != null && availableInstances.size() > 0)
+ {
+ for (LiveInstance instance : availableInstances.values())
+ {
+ String instanceName = instance.getInstanceName();
+ String clientSessionId = instance.getSessionId();
+
+ Map<String, CurrentState> currentStateMap = cache.getCurrentState(instanceName,
+ clientSessionId);
+ if (currentStateMap == null || currentStateMap.size() == 0)
+ {
+ continue;
+ }
+ for (CurrentState currentState : currentStateMap.values())
+ {
+
+ String resourceName = currentState.getResourceName();
+ Map<String, String> resourceStateMap = currentState.getPartitionStateMap();
+
+ // don't overwrite ideal state settings
+ if (!resourceMap.containsKey(resourceName))
+ {
+ addResource(resourceName, resourceMap);
+ Resource resource = resourceMap.get(resourceName);
+ resource.setStateModelDefRef(currentState.getStateModelDefRef());
+ resource.setStateModelFactoryName(currentState.getStateModelFactoryName());
+ resource.setBucketSize(currentState.getBucketSize());
+ resource.setGroupMessageMode(currentState.getGroupMessageMode());
+ }
+
+ if (currentState.getStateModelDefRef() == null)
+ {
+ LOG.error("state model def is null." + "resource:" + currentState.getResourceName()
+ + ", partitions: " + currentState.getPartitionStateMap().keySet() + ", states: "
+ + currentState.getPartitionStateMap().values());
+ throw new StageException("State model def is null for resource:"
+ + currentState.getResourceName());
+ }
+
+ for (String partition : resourceStateMap.keySet())
+ {
+ addPartition(partition, resourceName, resourceMap);
+ }
+ }
+ }
+ }
+
+ event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
+ }
+
+ private void addResource(String resource, Map<String, Resource> resourceMap)
+ {
+ if (resource == null || resourceMap == null)
+ {
+ return;
+ }
+ if (!resourceMap.containsKey(resource))
+ {
+ resourceMap.put(resource, new Resource(resource));
+ }
+ }
+
+ private void addPartition(String partition, String resourceName, Map<String, Resource> resourceMap)
+ {
+ if (resourceName == null || partition == null || resourceMap == null)
+ {
+ return;
+ }
+ if (!resourceMap.containsKey(resourceName))
+ {
+ resourceMap.put(resourceName, new Resource(resourceName));
+ }
+ Resource resource = resourceMap.get(resourceName);
+ resource.addPartition(partition);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
new file mode 100644
index 0000000..52711e9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
@@ -0,0 +1,457 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.alerts.AlertParser;
+import org.apache.helix.alerts.AlertProcessor;
+import org.apache.helix.alerts.AlertValueAndStatus;
+import org.apache.helix.alerts.AlertsHolder;
+import org.apache.helix.alerts.ExpressionParser;
+import org.apache.helix.alerts.StatsHolder;
+import org.apache.helix.alerts.Tuple;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.healthcheck.StatHealthReportProvider;
+import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory.ActionOnError;
+import org.apache.helix.model.AlertHistory;
+import org.apache.helix.model.HealthStat;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.PersistentStats;
+import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
+import org.apache.log4j.Logger;
+
+
+/**
+ * For each LiveInstances select currentState and message whose sessionId matches
+ * sessionId from LiveInstance Get Partition,State for all the resources computed in
+ * previous State [ResourceComputationStage]
+ *
+ * @author asilbers
+ *
+ */
+public class StatsAggregationStage extends AbstractBaseStage
+{
+
+ public static final int ALERT_HISTORY_SIZE = 30;
+
+ private static final Logger logger =
+ Logger.getLogger(StatsAggregationStage.class.getName());
+
+ StatsHolder _statsHolder = null;
+ AlertsHolder _alertsHolder = null;
+ Map<String, Map<String, AlertValueAndStatus>> _alertStatus;
+ Map<String, Tuple<String>> _statStatus;
+ ClusterAlertMBeanCollection _alertBeanCollection = new ClusterAlertMBeanCollection();
+ Map<String, String> _alertActionTaken = new HashMap<String, String>();
+
+ public final String PARTICIPANT_STAT_REPORT_NAME = StatHealthReportProvider.REPORT_NAME;
+ public final String ESPRESSO_STAT_REPORT_NAME = "RestQueryStats";
+ public final String REPORT_NAME = "AggStats";
+ // public final String DEFAULT_AGG_TYPE = "decay";
+ // public final String DEFAULT_DECAY_PARAM = "0.1";
+ // public final String DEFAULT_AGG_TYPE = "window";
+ // public final String DEFAULT_DECAY_PARAM = "5";
+
+ public StatHealthReportProvider _aggStatsProvider;
+
+ // public AggregationType _defaultAggType;
+
+ public Map<String, Map<String, AlertValueAndStatus>> getAlertStatus()
+ {
+ return _alertStatus;
+ }
+
+ public Map<String, Tuple<String>> getStatStatus()
+ {
+ return _statStatus;
+ }
+
+ public void persistAggStats(HelixManager manager)
+ {
+ Map<String, String> report = _aggStatsProvider.getRecentHealthReport();
+ Map<String, Map<String, String>> partitionReport =
+ _aggStatsProvider.getRecentPartitionHealthReport();
+ ZNRecord record = new ZNRecord(_aggStatsProvider.getReportName());
+ if (report != null)
+ {
+ record.setSimpleFields(report);
+ }
+ if (partitionReport != null)
+ {
+ record.setMapFields(partitionReport);
+ }
+
+// DataAccessor accessor = manager.getDataAccessor();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+// boolean retVal = accessor.setProperty(PropertyType.PERSISTENTSTATS, record);
+ Builder keyBuilder = accessor.keyBuilder();
+ boolean retVal = accessor.setProperty(keyBuilder.persistantStat(), new PersistentStats(record));
+ if (retVal == false)
+ {
+ logger.error("attempt to persist derived stats failed");
+ }
+ }
+
+ @Override
+ public void init(StageContext context)
+ {
+ }
+
+ public String getAgeStatName(String instance)
+ {
+ return instance + ExpressionParser.statFieldDelim + "reportingage";
+ }
+
+ // currTime in seconds
+ public void reportAgeStat(LiveInstance instance, long modifiedTime, long currTime)
+ {
+ String statName = getAgeStatName(instance.getInstanceName());
+ long age = (currTime - modifiedTime) / 1000; // XXX: ensure this is in
+ // seconds
+ Map<String, String> ageStatMap = new HashMap<String, String>();
+ ageStatMap.put(StatsHolder.TIMESTAMP_NAME, String.valueOf(currTime));
+ ageStatMap.put(StatsHolder.VALUE_NAME, String.valueOf(age));
+ // note that applyStat will only work if alert already added
+ _statsHolder.applyStat(statName, ageStatMap);
+ }
+
+ @Override
+ public void process(ClusterEvent event) throws Exception
+ {
+ long startTime = System.currentTimeMillis();
+ // String aggTypeName =
+ // DEFAULT_AGG_TYPE+AggregationType.DELIM+DEFAULT_DECAY_PARAM;
+ // _defaultAggType = AggregationTypeFactory.getAggregationType(aggTypeName);
+
+ HelixManager manager = event.getAttribute("helixmanager");
+ HealthDataCache cache = event.getAttribute("HealthDataCache");
+
+ if (manager == null || cache == null)
+ {
+ throw new StageException("helixmanager|HealthDataCache attribute value is null");
+ }
+ if(_alertsHolder == null)
+ {
+ _statsHolder = new StatsHolder(manager, cache);
+ _alertsHolder = new AlertsHolder(manager, cache, _statsHolder);
+ }
+ else
+ {
+ _statsHolder.updateCache(cache);
+ _alertsHolder.updateCache(cache);
+ }
+ if (_statsHolder.getStatsList().size() == 0)
+ {
+ logger.info("stat holder is empty");
+ return;
+ }
+
+ // init agg stats from cache
+ // initAggStats(cache);
+
+ Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+
+ long currTime = System.currentTimeMillis();
+ // for each live node, read node's stats
+ long readInstancesStart = System.currentTimeMillis();
+ for (LiveInstance instance : liveInstances.values())
+ {
+ String instanceName = instance.getInstanceName();
+ logger.debug("instanceName: " + instanceName);
+ // XXX: now have map of HealthStats, so no need to traverse them...verify
+ // correctness
+ Map<String, HealthStat> stats;
+ stats = cache.getHealthStats(instanceName);
+ // find participants stats
+ long modTime = -1;
+ // TODO: get healthreport child node modified time and reportAgeStat based on that
+ boolean reportedAge = false;
+ for (HealthStat participantStat : stats.values())
+ {
+ if (participantStat != null && !reportedAge)
+ {
+ // generate and report stats for how old this node's report is
+ modTime = participantStat.getLastModifiedTimeStamp();
+ reportAgeStat(instance, modTime, currTime);
+ reportedAge = true;
+ }
+ // System.out.println(modTime);
+ // XXX: need to convert participantStat to a better format
+ // need to get instanceName in here
+
+ if (participantStat != null)
+ {
+ // String timestamp = String.valueOf(instance.getModifiedTime()); WANT
+ // REPORT LEVEL TS
+ Map<String, Map<String, String>> statMap =
+ participantStat.getHealthFields(instanceName);
+ for (String key : statMap.keySet())
+ {
+ _statsHolder.applyStat(key, statMap.get(key));
+ }
+ }
+ }
+ }
+ // Call _statsHolder.persistStats() once per pipeline. This will
+ // write the updated persisted stats into zookeeper
+ _statsHolder.persistStats();
+ logger.info("Done processing stats: "
+ + (System.currentTimeMillis() - readInstancesStart));
+ // populate _statStatus
+ _statStatus = _statsHolder.getStatsMap();
+
+ for (String statKey : _statStatus.keySet())
+ {
+ logger.debug("Stat key, value: " + statKey + ": " + _statStatus.get(statKey));
+ }
+
+ long alertExecuteStartTime = System.currentTimeMillis();
+ // execute alerts, populate _alertStatus
+ _alertStatus =
+ AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(),
+ _statsHolder.getStatsList());
+ logger.info("done executing alerts: "
+ + (System.currentTimeMillis() - alertExecuteStartTime));
+ for (String originAlertName : _alertStatus.keySet())
+ {
+ _alertBeanCollection.setAlerts(originAlertName,
+ _alertStatus.get(originAlertName),
+ manager.getClusterName());
+ }
+
+ executeAlertActions(manager);
+ // Write alert fire history to zookeeper
+ updateAlertHistory(manager);
+ long writeAlertStartTime = System.currentTimeMillis();
+ // write out alert status (to zk)
+ _alertsHolder.addAlertStatusSet(_alertStatus);
+ logger.info("done writing alerts: "
+ + (System.currentTimeMillis() - writeAlertStartTime));
+
+ // TODO: access the 2 status variables from somewhere to populate graphs
+
+ long logAlertStartTime = System.currentTimeMillis();
+ // logging alert status
+ for (String alertOuterKey : _alertStatus.keySet())
+ {
+ logger.debug("Alert Outer Key: " + alertOuterKey);
+ Map<String, AlertValueAndStatus> alertInnerMap = _alertStatus.get(alertOuterKey);
+ if (alertInnerMap == null)
+ {
+ logger.debug(alertOuterKey + " has no alerts to report.");
+ continue;
+ }
+ for (String alertInnerKey : alertInnerMap.keySet())
+ {
+ logger.debug(" " + alertInnerKey + " value: "
+ + alertInnerMap.get(alertInnerKey).getValue() + ", status: "
+ + alertInnerMap.get(alertInnerKey).isFired());
+ }
+ }
+
+ logger.info("done logging alerts: "
+ + (System.currentTimeMillis() - logAlertStartTime));
+
+ long processLatency = System.currentTimeMillis() - startTime;
+ addLatencyToMonitor(event, processLatency);
+ logger.info("process end: " + processLatency);
+ }
+
+ /**
+ * Go through the _alertStatus, and call executeAlertAction for those actual alerts that
+ * has been fired
+ */
+
+ void executeAlertActions( HelixManager manager)
+ {
+ _alertActionTaken.clear();
+ // Go through the original alert strings
+ for(String originAlertName : _alertStatus.keySet())
+ {
+ Map<String, String> alertFields = _alertsHolder.getAlertsMap().get(originAlertName);
+ if(alertFields != null && alertFields.containsKey(AlertParser.ACTION_NAME))
+ {
+ String actionValue = alertFields.get(AlertParser.ACTION_NAME);
+ Map<String, AlertValueAndStatus> alertResultMap = _alertStatus.get(originAlertName);
+ if(alertResultMap == null)
+ {
+ logger.info("Alert "+ originAlertName + " does not have alert status map");
+ continue;
+ }
+ // For each original alert, iterate all actual alerts that it expands into
+ for(String actualStatName : alertResultMap.keySet())
+ {
+ // if the actual alert is fired, execute the action
+ if(alertResultMap.get(actualStatName).isFired())
+ {
+ logger.warn("Alert " + originAlertName + " action " + actionValue + " is triggered by " + actualStatName);
+ _alertActionTaken.put(actualStatName, actionValue);
+ // move functionalities into a seperate class
+ executeAlertAction(actualStatName, actionValue, manager);
+ }
+ }
+ }
+ }
+ }
+ /**
+ * Execute the action if an alert is fired, and the alert has an action associated with it.
+ * NOTE: consider unify this with DefaultParticipantErrorMessageHandler.handleMessage()
+ */
+ void executeAlertAction(String actualStatName, String actionValue, HelixManager manager)
+ {
+ if(actionValue.equals(ActionOnError.DISABLE_INSTANCE.toString()))
+ {
+ String instanceName = parseInstanceName(actualStatName, manager);
+ if(instanceName != null)
+ {
+ logger.info("Disabling instance " + instanceName);
+ manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), instanceName, false);
+ }
+ }
+ else if(actionValue.equals(ActionOnError.DISABLE_PARTITION.toString()))
+ {
+ String instanceName = parseInstanceName(actualStatName, manager);
+ String resourceName = parseResourceName(actualStatName, manager);
+ String partitionName = parsePartitionName(actualStatName, manager);
+ if(instanceName != null && resourceName != null && partitionName != null)
+ {
+ logger.info("Disabling partition " + partitionName + " instanceName " + instanceName);
+ manager.getClusterManagmentTool().enablePartition(false, manager.getClusterName(), instanceName,
+ resourceName, Arrays.asList(partitionName));
+ }
+ }
+ else if(actionValue.equals(ActionOnError.DISABLE_RESOURCE.toString()))
+ {
+ String instanceName = parseInstanceName(actualStatName, manager);
+ String resourceName = parseResourceName(actualStatName, manager);
+ logger.info("Disabling resource " + resourceName + " instanceName " + instanceName + " not implemented");
+
+ }
+ }
+
+ public static String parseResourceName(String actualStatName, HelixManager manager)
+ {
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder kb = accessor.keyBuilder();
+ List<IdealState> idealStates = accessor.getChildValues(kb.idealStates());
+ for (IdealState idealState : idealStates)
+ {
+ String resourceName = idealState.getResourceName();
+ if(actualStatName.contains("=" + resourceName + ".") || actualStatName.contains("=" + resourceName + ";"))
+ {
+ return resourceName;
+ }
+ }
+ return null;
+ }
+
+ public static String parsePartitionName(String actualStatName, HelixManager manager)
+ {
+ String resourceName = parseResourceName(actualStatName, manager);
+ if(resourceName != null)
+ {
+ String partitionKey = "=" + resourceName + "_";
+ if(actualStatName.contains(partitionKey))
+ {
+ int pos = actualStatName.indexOf(partitionKey);
+ int nextDotPos = actualStatName.indexOf('.', pos + partitionKey.length());
+ int nextCommaPos = actualStatName.indexOf(';', pos + partitionKey.length());
+ if(nextCommaPos > 0 && nextCommaPos < nextDotPos)
+ {
+ nextDotPos = nextCommaPos;
+ }
+
+ String partitionName = actualStatName.substring(pos + 1, nextDotPos);
+ return partitionName;
+ }
+ }
+ return null;
+ }
+
+ public static String parseInstanceName(String actualStatName, HelixManager manager)
+ {
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder kb = accessor.keyBuilder();
+ List<LiveInstance> liveInstances = accessor.getChildValues(kb.liveInstances());
+ for (LiveInstance instance : liveInstances)
+ {
+ String instanceName = instance.getInstanceName();
+ if(actualStatName.startsWith(instanceName))
+ {
+ return instanceName;
+ }
+ }
+ return null;
+ }
+
+ void updateAlertHistory(HelixManager manager)
+ {
+ // Write alert fire history to zookeeper
+ _alertBeanCollection.refreshAlertDelta(manager.getClusterName());
+ Map<String, String> delta = _alertBeanCollection.getRecentAlertDelta();
+ // Update history only when some beans has changed
+ if(delta.size() > 0)
+ {
+ delta.putAll(_alertActionTaken);
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh:mm:ss:SSS");
+ String date = dateFormat.format(new Date());
+
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ HelixProperty property = accessor.getProperty(keyBuilder.alertHistory());
+ ZNRecord alertFiredHistory;
+ if(property == null)
+ {
+ alertFiredHistory = new ZNRecord(PropertyType.ALERT_HISTORY.toString());
+ }
+ else
+ {
+ alertFiredHistory = property.getRecord();
+ }
+ while(alertFiredHistory.getMapFields().size() >= ALERT_HISTORY_SIZE)
+ {
+ // ZNRecord uses TreeMap which is sorted ascending internally
+ String firstKey = (String)(alertFiredHistory.getMapFields().keySet().toArray()[0]);
+ alertFiredHistory.getMapFields().remove(firstKey);
+ }
+ alertFiredHistory.setMapField(date, delta);
+// manager.getDataAccessor().setProperty(PropertyType.ALERT_HISTORY, alertFiredHistory);
+ accessor.setProperty(keyBuilder.alertHistory(), new AlertHistory(alertFiredHistory));
+ _alertBeanCollection.setAlertHistory(alertFiredHistory);
+ }
+ }
+
+ public ClusterAlertMBeanCollection getClusterAlertMBeanCollection()
+ {
+ return _alertBeanCollection;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
new file mode 100644
index 0000000..80db4d7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -0,0 +1,140 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.log4j.Logger;
+
+
+public class TaskAssignmentStage extends AbstractBaseStage
+{
+ private static Logger logger = Logger.getLogger(TaskAssignmentStage.class);
+
+ @Override
+ public void process(ClusterEvent event) throws Exception
+ {
+ long startTime = System.currentTimeMillis();
+ logger.info("START TaskAssignmentStage.process()");
+
+ HelixManager manager = event.getAttribute("helixmanager");
+ Map<String, Resource> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ MessageThrottleStageOutput messageOutput =
+ event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+
+ if (manager == null || resourceMap == null || messageOutput == null)
+ {
+ throw new StageException("Missing attributes in event:" + event
+ + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache");
+ }
+
+ HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+ List<Message> messagesToSend = new ArrayList<Message>();
+ for (String resourceName : resourceMap.keySet())
+ {
+ Resource resource = resourceMap.get(resourceName);
+ for (Partition partition : resource.getPartitions())
+ {
+ List<Message> messages = messageOutput.getMessages(resourceName, partition);
+ messagesToSend.addAll(messages);
+ }
+ }
+
+ List<Message> outputMessages =
+ groupMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap);
+ sendMessages(dataAccessor, outputMessages);
+
+ long endTime = System.currentTimeMillis();
+ logger.info("END TaskAssignmentStage.process(). took: " + (endTime - startTime)
+ + " ms");
+
+ }
+
+ List<Message> groupMessage(Builder keyBuilder,
+ List<Message> messages,
+ Map<String, Resource> resourceMap)
+ {
+ // group messages by its CurrentState path + "/" + fromState + "/" + toState
+ Map<String, Message> groupMessages = new HashMap<String, Message>();
+ List<Message> outputMessages = new ArrayList<Message>();
+
+ Iterator<Message> iter = messages.iterator();
+ while (iter.hasNext())
+ {
+ Message message = iter.next();
+ String resourceName = message.getResourceName();
+ Resource resource = resourceMap.get(resourceName);
+ if (resource == null || !resource.getGroupMessageMode())
+ {
+ outputMessages.add(message);
+ continue;
+ }
+
+ String key =
+ keyBuilder.currentState(message.getTgtName(),
+ message.getTgtSessionId(),
+ message.getResourceName()).getPath()
+ + "/" + message.getFromState() + "/" + message.getToState();
+
+ if (!groupMessages.containsKey(key))
+ {
+ Message groupMessage = new Message(message.getRecord());
+ groupMessage.setGroupMessageMode(true);
+ outputMessages.add(groupMessage);
+ groupMessages.put(key, groupMessage);
+ }
+ groupMessages.get(key).addPartitionName(message.getPartitionName());
+ }
+
+ return outputMessages;
+ }
+
+ protected void sendMessages(HelixDataAccessor dataAccessor, List<Message> messages)
+ {
+ if (messages == null || messages.isEmpty())
+ {
+ return;
+ }
+
+ Builder keyBuilder = dataAccessor.keyBuilder();
+
+ List<PropertyKey> keys = new ArrayList<PropertyKey>();
+ for (Message message : messages)
+ {
+ logger.info("Sending Message " + message.getMsgId() + " to " + message.getTgtName()
+ + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
+ + " from:" + message.getFromState() + " to:" + message.getToState());
+
+ keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
+ }
+
+ dataAccessor.createChildren(keys, new ArrayList<Message>(messages));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/controller/stages/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/package-info.java b/helix-core/src/main/java/org/apache/helix/controller/stages/package-info.java
new file mode 100644
index 0000000..6dd3780
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Stages in Helix controller pipelines
+ *
+ */
+package org.apache.helix.controller.stages;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/BootstrapHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/BootstrapHandler.java b/helix-core/src/main/java/org/apache/helix/examples/BootstrapHandler.java
new file mode 100644
index 0000000..73f0151
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/BootstrapHandler.java
@@ -0,0 +1,112 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.examples;
+
+import java.util.UUID;
+
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+
+
+public class BootstrapHandler extends StateModelFactory<StateModel>
+{
+
+ @Override
+ public StateModel createNewStateModel(String stateUnitKey)
+ {
+ return new BootstrapStateModel(stateUnitKey);
+ }
+
+ @StateModelInfo(initialState = "OFFLINE", states = "{'OFFLINE','SLAVE','MASTER'}")
+ public static class BootstrapStateModel extends StateModel
+ {
+
+ private final String _stateUnitKey;
+
+ public BootstrapStateModel(String stateUnitKey)
+ {
+ _stateUnitKey = stateUnitKey;
+
+ }
+ @Transition(from = "MASTER", to = "SLAVE")
+ public void masterToSlave(Message message, NotificationContext context)
+ {
+
+ }
+ @Transition(from = "OFFLINE", to = "SLAVE")
+ public void offlineToSlave(Message message, NotificationContext context)
+ {
+ System.out
+ .println("BootstrapProcess.BootstrapStateModel.offlineToSlave()");
+ HelixManager manager = context.getManager();
+ ClusterMessagingService messagingService = manager.getMessagingService();
+ Message requestBackupUriRequest = new Message(
+ MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+ requestBackupUriRequest
+ .setMsgSubType(BootstrapProcess.REQUEST_BOOTSTRAP_URL);
+ requestBackupUriRequest.setMsgState(MessageState.NEW);
+ Criteria recipientCriteria = new Criteria();
+ recipientCriteria.setInstanceName("*");
+ recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ recipientCriteria.setResource(message.getResourceName());
+ recipientCriteria.setPartition(message.getPartitionName());
+ recipientCriteria.setSessionSpecific(true);
+ // wait for 30 seconds
+ int timeout = 30000;
+ BootstrapReplyHandler responseHandler = new BootstrapReplyHandler();
+
+ int sentMessageCount = messagingService.sendAndWait(recipientCriteria,
+ requestBackupUriRequest, responseHandler, timeout);
+ if (sentMessageCount == 0)
+ {
+ // could not find any other node hosting the partition
+ } else if (responseHandler.getBootstrapUrl() != null)
+ {
+ System.out.println("Got bootstrap url:"+ responseHandler.getBootstrapUrl() );
+ System.out.println("Got backup time:"+ responseHandler.getBootstrapTime() );
+ // Got the url fetch it
+ } else
+ {
+ // Either go to error state
+ // throw new Exception("Cant find backup/bootstrap data");
+ // Request some node to start backup process
+ }
+ }
+ @Transition(from = "SLAVE", to = "OFFLINE")
+ public void slaveToOffline(Message message, NotificationContext context)
+ {
+ System.out
+ .println("BootstrapProcess.BootstrapStateModel.slaveToOffline()");
+ }
+ @Transition(from = "SLAVE", to = "MASTER")
+ public void slaveToMaster(Message message, NotificationContext context)
+ {
+ System.out
+ .println("BootstrapProcess.BootstrapStateModel.slaveToMaster()");
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java b/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
new file mode 100644
index 0000000..b8e3bfe
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
@@ -0,0 +1,405 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.examples;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Date;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.tools.ClusterStateVerifier;
+
+
+/**
+ * This process does little more than handling the state transition messages.
+ * This is generally the case when the server needs to bootstrap when it comes
+ * up.<br>
+ * Flow for a typical Master-slave state model<br>
+ * <ul>
+ * <li>Gets OFFLINE-SLAVE transition</li>
+ * <li>Figure out if it has any data and how old it is for the SLAVE partition</li>
+ * <li>If the data is fresh enough it can probably catch up from the replication
+ * stream of the master</li>
+ * <li>If not, then it can use the messaging service provided by cluster manager
+ * to talk other nodes to figure out if they have any backup</li>
+ * </li>
+ * <li>Once it gets a response from other nodes in the cluster the process can
+ * decide which back up it wants to use to bootstrap</li>
+ * </ul>
+ *
+ * @author kgopalak
+ *
+ */
+public class BootstrapProcess
+{
+ static final String REQUEST_BOOTSTRAP_URL = "REQUEST_BOOTSTRAP_URL";
+ public static final String zkServer = "zkSvr";
+ public static final String cluster = "cluster";
+ public static final String hostAddress = "host";
+ public static final String hostPort = "port";
+ public static final String relayCluster = "relayCluster";
+ public static final String help = "help";
+ public static final String configFile = "configFile";
+ public static final String stateModel = "stateModelType";
+ public static final String transDelay = "transDelay";
+
+ private final String zkConnectString;
+ private final String clusterName;
+ private final String instanceName;
+ private final String stateModelType;
+ private HelixManager manager;
+
+// private StateMachineEngine genericStateMachineHandler;
+
+ private String _file = null;
+ private StateModelFactory<StateModel> stateModelFactory;
+ private final int delay;
+
+ public BootstrapProcess(String zkConnectString, String clusterName,
+ String instanceName, String file, String stateModel, int delay)
+ {
+ this.zkConnectString = zkConnectString;
+ this.clusterName = clusterName;
+ this.instanceName = instanceName;
+ this._file = file;
+ stateModelType = stateModel;
+ this.delay = delay;
+ }
+
+ public void start() throws Exception
+ {
+ if (_file == null)
+ {
+ manager = HelixManagerFactory.getZKHelixManager(clusterName,
+ instanceName,
+ InstanceType.PARTICIPANT,
+ zkConnectString);
+
+ }
+ else
+ {
+ manager = HelixManagerFactory.getStaticFileHelixManager(clusterName,
+ instanceName,
+ InstanceType.PARTICIPANT,
+ _file);
+
+ }
+ stateModelFactory = new BootstrapHandler();
+// genericStateMachineHandler = new StateMachineEngine();
+// genericStateMachineHandler.registerStateModelFactory("MasterSlave", stateModelFactory);
+
+ StateMachineEngine stateMach = manager.getStateMachineEngine();
+ stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
+
+ manager.getMessagingService().registerMessageHandlerFactory(
+ MessageType.STATE_TRANSITION.toString(), stateMach);
+ manager.getMessagingService().registerMessageHandlerFactory(
+ MessageType.USER_DEFINE_MSG.toString(),
+ new CustomMessageHandlerFactory());
+ manager.connect();
+ if (_file != null)
+ {
+ ClusterStateVerifier.verifyFileBasedClusterStates(_file, instanceName,
+ stateModelFactory);
+
+ }
+ }
+
+ public static class CustomMessageHandlerFactory implements
+ MessageHandlerFactory
+ {
+
+ @Override
+ public MessageHandler createHandler(Message message,
+ NotificationContext context)
+ {
+
+ return new CustomMessageHandler(message, context);
+ }
+
+ @Override
+ public String getMessageType()
+ {
+ return MessageType.USER_DEFINE_MSG.toString();
+ }
+
+ @Override
+ public void reset()
+ {
+
+ }
+
+ static class CustomMessageHandler extends MessageHandler
+ {
+
+ public CustomMessageHandler(Message message, NotificationContext context)
+ {
+ super(message, context);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException
+ {
+ String hostName;
+ HelixTaskResult result = new HelixTaskResult();
+ try
+ {
+ hostName = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e)
+ {
+ hostName = "UNKNOWN";
+ }
+ String port = "2134";
+ String msgSubType = _message.getMsgSubType();
+ if (msgSubType.equals(REQUEST_BOOTSTRAP_URL))
+ {
+ result.getTaskResultMap().put(
+ "BOOTSTRAP_URL",
+ "http://" + hostName + ":" + port
+ + "/getFile?path=/data/bootstrap/"
+ + _message.getResourceName() + "/"
+ + _message.getPartitionName() + ".tar");
+
+ result.getTaskResultMap().put(
+ "BOOTSTRAP_TIME",
+ ""+new Date().getTime());
+ }
+
+ result.setSuccess(true);
+ return result;
+ }
+
+ @Override
+ public void onError( Exception e, ErrorCode code, ErrorType type)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+
+ @SuppressWarnings("static-access")
+ private static Options constructCommandLineOptions()
+ {
+ Option helpOption = OptionBuilder.withLongOpt(help)
+ .withDescription("Prints command-line options info").create();
+
+ Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
+ .withDescription("Provide zookeeper address").create();
+ zkServerOption.setArgs(1);
+ zkServerOption.setRequired(true);
+ zkServerOption.setArgName("ZookeeperServerAddress(Required)");
+
+ Option clusterOption = OptionBuilder.withLongOpt(cluster)
+ .withDescription("Provide cluster name").create();
+ clusterOption.setArgs(1);
+ clusterOption.setRequired(true);
+ clusterOption.setArgName("Cluster name (Required)");
+
+ Option hostOption = OptionBuilder.withLongOpt(hostAddress)
+ .withDescription("Provide host name").create();
+ hostOption.setArgs(1);
+ hostOption.setRequired(true);
+ hostOption.setArgName("Host name (Required)");
+
+ Option portOption = OptionBuilder.withLongOpt(hostPort)
+ .withDescription("Provide host port").create();
+ portOption.setArgs(1);
+ portOption.setRequired(true);
+ portOption.setArgName("Host port (Required)");
+
+ Option stateModelOption = OptionBuilder.withLongOpt(stateModel)
+ .withDescription("StateModel Type").create();
+ stateModelOption.setArgs(1);
+ stateModelOption.setRequired(true);
+ stateModelOption.setArgName("StateModel Type (Required)");
+
+ // add an option group including either --zkSvr or --configFile
+ Option fileOption = OptionBuilder.withLongOpt(configFile)
+ .withDescription("Provide file to read states/messages").create();
+ fileOption.setArgs(1);
+ fileOption.setRequired(true);
+ fileOption.setArgName("File to read states/messages (Optional)");
+
+ Option transDelayOption = OptionBuilder.withLongOpt(transDelay)
+ .withDescription("Provide state trans delay").create();
+ transDelayOption.setArgs(1);
+ transDelayOption.setRequired(false);
+ transDelayOption.setArgName("Delay time in state transition, in MS");
+
+ OptionGroup optionGroup = new OptionGroup();
+ optionGroup.addOption(zkServerOption);
+ optionGroup.addOption(fileOption);
+
+ Options options = new Options();
+ options.addOption(helpOption);
+ // options.addOption(zkServerOption);
+ options.addOption(clusterOption);
+ options.addOption(hostOption);
+ options.addOption(portOption);
+ options.addOption(stateModelOption);
+ options.addOption(transDelayOption);
+
+ options.addOptionGroup(optionGroup);
+
+ return options;
+ }
+
+ public static void printUsage(Options cliOptions)
+ {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.printHelp("java " + BootstrapProcess.class.getName(), cliOptions);
+ }
+
+ public static CommandLine processCommandLineArgs(String[] cliArgs)
+ throws Exception
+ {
+ CommandLineParser cliParser = new GnuParser();
+ Options cliOptions = constructCommandLineOptions();
+ try
+ {
+ return cliParser.parse(cliOptions, cliArgs);
+ } catch (ParseException pe)
+ {
+ System.err
+ .println("CommandLineClient: failed to parse command-line options: "
+ + pe.toString());
+ printUsage(cliOptions);
+ System.exit(1);
+ }
+ return null;
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ String zkConnectString = "localhost:2181";
+ String clusterName = "storage-integration-cluster";
+ String instanceName = "localhost_8905";
+ String file = null;
+ String stateModelValue = "MasterSlave";
+ int delay = 0;
+ boolean skipZeroArgs = true;// false is for dev testing
+ if (!skipZeroArgs || args.length > 0)
+ {
+ CommandLine cmd = processCommandLineArgs(args);
+ zkConnectString = cmd.getOptionValue(zkServer);
+ clusterName = cmd.getOptionValue(cluster);
+
+ String host = cmd.getOptionValue(hostAddress);
+ String portString = cmd.getOptionValue(hostPort);
+ int port = Integer.parseInt(portString);
+ instanceName = host + "_" + port;
+
+ file = cmd.getOptionValue(configFile);
+ if (file != null)
+ {
+ File f = new File(file);
+ if (!f.exists())
+ {
+ System.err.println("static config file doesn't exist");
+ System.exit(1);
+ }
+ }
+
+ stateModelValue = cmd.getOptionValue(stateModel);
+ if (cmd.hasOption(transDelay))
+ {
+ try
+ {
+ delay = Integer.parseInt(cmd.getOptionValue(transDelay));
+ if (delay < 0)
+ {
+ throw new Exception("delay must be positive");
+ }
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ delay = 0;
+ }
+ }
+ }
+ // Espresso_driver.py will consume this
+ System.out.println("Starting Process with ZK:" + zkConnectString);
+
+ BootstrapProcess process = new BootstrapProcess(zkConnectString,
+ clusterName, instanceName, file, stateModelValue, delay);
+
+ process.start();
+ Thread.currentThread().join();
+ }
+}
+
+class BootstrapReplyHandler extends AsyncCallback
+{
+
+ public BootstrapReplyHandler()
+ {
+ }
+
+ private String bootstrapUrl;
+ private String bootstrapTime;
+
+ @Override
+ public void onTimeOut()
+ {
+ System.out.println("Timed out");
+ }
+
+ public String getBootstrapUrl()
+ {
+ return bootstrapUrl;
+ }
+
+ public String getBootstrapTime()
+ {
+ return bootstrapTime;
+ }
+
+ @Override
+ public void onReplyMessage(Message message)
+ {
+ String time = message.getResultMap().get("BOOTSTRAP_TIME");
+ if (bootstrapTime == null || time.compareTo(bootstrapTime) > -1)
+ {
+ bootstrapTime = message.getResultMap().get("BOOTSTRAP_TIME");
+ bootstrapUrl = message.getResultMap().get("BOOTSTRAP_URL");
+ }
+ }
+
+}