You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:40 UTC
[11/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/HealthDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/HealthDataCache.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/HealthDataCache.java
deleted file mode 100644
index 1fdc05d..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/HealthDataCache.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixProperty;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.model.AlertStatus;
-import com.linkedin.helix.model.Alerts;
-import com.linkedin.helix.model.HealthStat;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.PersistentStats;
-
-public class HealthDataCache
-{
- Map<String, LiveInstance> _liveInstanceMap;
-
- Map<String, Map<String, HealthStat>> _healthStatMap;
- HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
- PersistentStats _persistentStats;
- Alerts _alerts;
- AlertStatus _alertStatus;
-
- public boolean refresh(DataAccessor accessor)
- {
- _liveInstanceMap = accessor.getChildValuesMap(LiveInstance.class,
- PropertyType.LIVEINSTANCES);
-
- Map<String, Map<String, HealthStat>> hsMap = new HashMap<String, Map<String, HealthStat>>();
-
- for (String instanceName : _liveInstanceMap.keySet())
- {
- // xxx clearly getting znodes for the instance here...so get the
- // timestamp!
-
- hsMap.put(instanceName, accessor.getChildValuesMap(HealthStat.class,
- PropertyType.HEALTHREPORT, instanceName));
- }
- _healthStatMap = Collections.unmodifiableMap(hsMap);
- _persistentStats = accessor.getProperty(PersistentStats.class,
- PropertyType.PERSISTENTSTATS);
- _alerts = accessor.getProperty(Alerts.class, PropertyType.ALERTS);
- _alertStatus = accessor.getProperty(AlertStatus.class,
- PropertyType.ALERT_STATUS);
-
- return true;
- }
-
- public HealthStat getGlobalStats()
- {
- return _globalStats;
- }
-
- public PersistentStats getPersistentStats()
- {
- return _persistentStats;
- }
-
- public Alerts getAlerts()
- {
- return _alerts;
- }
-
- public AlertStatus getAlertStatus()
- {
- return _alertStatus;
- }
-
- public Map<String, HealthStat> getHealthStats(String instanceName)
- {
- Map<String, HealthStat> map = _healthStatMap.get(instanceName);
- if (map != null)
- {
- return map;
- } else
- {
- return Collections.emptyMap();
- }
- }
-
- public Map<String, LiveInstance> getLiveInstances()
- {
- return _liveInstanceMap;
- }
-
- public boolean refresh(HelixDataAccessor accessor)
- {
- Builder keyBuilder = accessor.keyBuilder();
- _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
-
- Map<String, Map<String, HealthStat>> hsMap = new HashMap<String, Map<String, HealthStat>>();
-
- for (String instanceName : _liveInstanceMap.keySet())
- {
- // xxx clearly getting znodes for the instance here...so get the
- // timestamp!
-
- Map<String, HealthStat> childValuesMap = accessor
- .getChildValuesMap(keyBuilder.healthReports(instanceName));
- hsMap.put(instanceName, childValuesMap);
- }
- _healthStatMap = Collections.unmodifiableMap(hsMap);
- _persistentStats = accessor.getProperty(keyBuilder.persistantStat());
- _alerts = accessor.getProperty(keyBuilder.alerts());
- _alertStatus = accessor.getProperty(keyBuilder.alertStatus());
-
- return true;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageGenerationOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageGenerationOutput.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageGenerationOutput.java
deleted file mode 100644
index 61b8319..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageGenerationOutput.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Partition;
-
-public class MessageGenerationOutput
-{
-
- private final Map<String, Map<Partition, List<Message>>> _messagesMap;
-
- public MessageGenerationOutput()
- {
- _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
-
- }
-
- public void addMessage(String resourceName, Partition resource,
- Message message)
- {
- if (!_messagesMap.containsKey(resourceName))
- {
- _messagesMap.put(resourceName,
- new HashMap<Partition, List<Message>>());
- }
- if (!_messagesMap.get(resourceName).containsKey(resource))
- {
- _messagesMap.get(resourceName).put(resource,
- new ArrayList<Message>());
-
- }
- _messagesMap.get(resourceName).get(resource).add(message);
-
- }
-
- public List<Message> getMessages(String resourceName,
- Partition resource)
- {
- Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
- if (map != null)
- {
- return map.get(resource);
- }
- return Collections.emptyList();
-
- }
-
- @Override
- public String toString()
- {
- return _messagesMap.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageGenerationPhase.java
deleted file mode 100644
index 1ffc14f..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageGenerationPhase.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageState;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.model.Partition;
-import com.linkedin.helix.model.Resource;
-import com.linkedin.helix.model.StateModelDefinition;
-
-/**
- * Compares the currentState,pendingState with IdealState and generate messages
- *
- * @author kgopalak
- *
- */
-public class MessageGenerationPhase extends AbstractBaseStage
-{
- private static Logger logger = Logger.getLogger(MessageGenerationPhase.class);
-
- @Override
- public void process(ClusterEvent event) throws Exception
- {
- HelixManager manager = event.getAttribute("helixmanager");
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
- CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE
- .toString());
- BestPossibleStateOutput bestPossibleStateOutput = event
- .getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
- if (manager == null || cache == null || resourceMap == null || currentStateOutput == null
- || bestPossibleStateOutput == null)
- {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
- }
-
- Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
- Map<String, String> sessionIdMap = new HashMap<String, String>();
-
- for (LiveInstance liveInstance : liveInstances.values())
- {
- sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
- }
- MessageGenerationOutput output = new MessageGenerationOutput();
-
- for (String resourceName : resourceMap.keySet())
- {
- Resource resource = resourceMap.get(resourceName);
- int bucketSize = resource.getBucketSize();
-
- StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
-
- for (Partition partition : resource.getPartitions())
- {
- Map<String, String> instanceStateMap = bestPossibleStateOutput.getInstanceStateMap(
- resourceName, partition);
-
- for (String instanceName : instanceStateMap.keySet())
- {
- String desiredState = instanceStateMap.get(instanceName);
-
- String currentState = currentStateOutput.getCurrentState(resourceName, partition,
- instanceName);
- if (currentState == null)
- {
- currentState = stateModelDef.getInitialState();
- }
-
- if (desiredState.equalsIgnoreCase(currentState))
- {
- continue;
- }
-
- String pendingState = currentStateOutput.getPendingState(resourceName, partition,
- instanceName);
-
- String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
- if (nextState == null)
- {
- logger.error("Unable to find a next state for partition: "
- + partition.getPartitionName() + " from stateModelDefinition"
- + stateModelDef.getClass() + " from:" + currentState + " to:" + desiredState);
- continue;
- }
-
- if (pendingState != null)
- {
- if (nextState.equalsIgnoreCase(pendingState))
- {
- logger.debug("Message already exists for " + instanceName + " to transit "
- + partition.getPartitionName() + " from " + currentState + " to " + nextState);
- } else if (currentState.equalsIgnoreCase(pendingState))
- {
- logger.info("Message hasn't been removed for " + instanceName + " to transit"
- + partition.getPartitionName() + " to " + pendingState + ", desiredState: "
- + desiredState);
- } else
- {
- logger.info("IdealState changed before state transition completes for "
- + partition.getPartitionName() + " on " + instanceName + ", pendingState: "
- + pendingState + ", currentState: " + currentState + ", nextState: " + nextState);
- }
- } else
- {
- Message message = createMessage(manager, resourceName, partition.getPartitionName(),
- instanceName, currentState, nextState, sessionIdMap.get(instanceName),
- stateModelDef.getId(), resource.getStateModelFactoryname(), bucketSize);
- IdealState idealState = cache.getIdealState(resourceName);
- // Set timeout of needed
- String stateTransition = currentState + "-" + nextState + "_"
- + Message.Attributes.TIMEOUT;
- if (idealState != null
- && idealState.getRecord().getSimpleField(stateTransition) != null)
- {
- try
- {
- int timeout = Integer.parseInt(idealState.getRecord().getSimpleField(
- stateTransition));
- if (timeout > 0)
- {
- message.setExecutionTimeout(timeout);
- }
- } catch (Exception e)
- {
- logger.error("", e);
- }
- }
- message.getRecord().setSimpleField("ClusterEventName", event.getName());
- output.addMessage(resourceName, partition, message);
- }
- }
- }
- }
- event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
- }
-
- private Message createMessage(HelixManager manager, String resourceName, String partitionName,
- String instanceName, String currentState, String nextState, String sessionId,
- String stateModelDefName, String stateModelFactoryName, int bucketSize)
- {
- String uuid = UUID.randomUUID().toString();
- Message message = new Message(MessageType.STATE_TRANSITION, uuid);
- message.setSrcName(manager.getInstanceName());
- message.setTgtName(instanceName);
- message.setMsgState(MessageState.NEW);
- message.setPartitionName(partitionName);
- message.setResourceName(resourceName);
- message.setFromState(currentState);
- message.setToState(nextState);
- message.setTgtSessionId(sessionId);
- message.setSrcSessionId(manager.getSessionId());
- message.setStateModelDef(stateModelDefName);
- message.setStateModelFactoryName(stateModelFactoryName);
- message.setBucketSize(bucketSize);
-
- return message;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageSelectionStage.java
deleted file mode 100644
index f510d75..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageSelectionStage.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Partition;
-import com.linkedin.helix.model.Resource;
-import com.linkedin.helix.model.StateModelDefinition;
-
-public class MessageSelectionStage extends AbstractBaseStage
-{
- private static final Logger LOG = Logger.getLogger(MessageSelectionStage.class);
-
- static class Bounds
- {
- private int upper;
- private int lower;
-
- public Bounds(int lower, int upper)
- {
- this.lower = lower;
- this.upper = upper;
- }
-
- public void increaseUpperBound()
- {
- upper++;
- }
-
- public void increaseLowerBound()
- {
- lower++;
- }
-
- public void decreaseUpperBound()
- {
- upper--;
- }
-
- public void decreaseLowerBound()
- {
- lower--;
- }
-
- public int getLowerBound()
- {
- return lower;
- }
-
- public int getUpperBound()
- {
- return upper;
- }
- }
-
- @Override
- public void process(ClusterEvent event) throws Exception
- {
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- Map<String, Resource> resourceMap =
- event.getAttribute(AttributeName.RESOURCES.toString());
- CurrentStateOutput currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
- MessageGenerationOutput messageGenOutput =
- event.getAttribute(AttributeName.MESSAGES_ALL.toString());
- if (cache == null || resourceMap == null || currentStateOutput == null
- || messageGenOutput == null)
- {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
- }
-
- MessageSelectionStageOutput output = new MessageSelectionStageOutput();
-
- for (String resourceName : resourceMap.keySet())
- {
- Resource resource = resourceMap.get(resourceName);
- StateModelDefinition stateModelDef =
- cache.getStateModelDef(resource.getStateModelDefRef());
-
- Map<String, Integer> stateTransitionPriorities =
- getStateTransitionPriorityMap(stateModelDef);
- IdealState idealState = cache.getIdealState(resourceName);
- Map<String, Bounds> stateConstraints =
- computeStateConstraints(stateModelDef, idealState, cache);
-
- for (Partition partition : resource.getPartitions())
- {
- List<Message> messages = messageGenOutput.getMessages(resourceName, partition);
- List<Message> selectedMessages =
- selectMessages(cache.getLiveInstances(),
- currentStateOutput.getCurrentStateMap(resourceName, partition),
- currentStateOutput.getPendingStateMap(resourceName, partition),
- messages,
- stateConstraints,
- stateTransitionPriorities,
- stateModelDef.getInitialState());
- output.addMessages(resourceName, partition, selectedMessages);
- }
- }
- event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
- }
-
- // TODO: This method deserves its own class. The class should not understand helix but
- // just be
- // able to solve the problem using the algo. I think the method is following that but if
- // we don't move it to another class its quite easy to break that contract
- /**
- * greedy message selection algorithm: 1) calculate CS+PS state lower/upper-bounds 2)
- * group messages by state transition and sorted by priority 3) from highest priority to
- * lowest, for each message group with the same transition add message one by one and
- * make sure state constraint is not violated update state lower/upper-bounds when a new
- * message is selected
- *
- * @param currentStates
- * @param pendingStates
- * @param messages
- * @param stateConstraints
- * : STATE -> bound (lower:upper)
- * @param stateTransitionPriorities
- * : FROME_STATE-TO_STATE -> priority
- * @return: selected messages
- */
- List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
- Map<String, String> currentStates,
- Map<String, String> pendingStates,
- List<Message> messages,
- Map<String, Bounds> stateConstraints,
- final Map<String, Integer> stateTransitionPriorities,
- String initialState)
- {
- if (messages == null || messages.isEmpty())
- {
- return Collections.emptyList();
- }
-
- List<Message> selectedMessages = new ArrayList<Message>();
- Map<String, Bounds> bounds = new HashMap<String, Bounds>();
-
- // count currentState, if no currentState, count as in initialState
- for (String instance : liveInstances.keySet())
- {
- String state = initialState;
- if (currentStates.containsKey(instance))
- {
- state = currentStates.get(instance);
- }
-
- if (!bounds.containsKey(state))
- {
- bounds.put(state, new Bounds(0, 0));
- }
- bounds.get(state).increaseLowerBound();
- bounds.get(state).increaseUpperBound();
- }
-
- // count pendingStates
- for (String instance : pendingStates.keySet())
- {
- String state = pendingStates.get(instance);
- if (!bounds.containsKey(state))
- {
- bounds.put(state, new Bounds(0, 0));
- }
- // TODO: add lower bound, need to refactor pendingState to include fromState also
- bounds.get(state).increaseUpperBound();
- }
-
- // group messages based on state transition priority
- Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
- new TreeMap<Integer, List<Message>>();
- for (Message message : messages)
- {
- String fromState = message.getFromState();
- String toState = message.getToState();
- String transition = fromState + "-" + toState;
- int priority = Integer.MAX_VALUE;
-
- if (stateTransitionPriorities.containsKey(transition))
- {
- priority = stateTransitionPriorities.get(transition);
- }
-
- if (!messagesGroupByStateTransitPriority.containsKey(priority))
- {
- messagesGroupByStateTransitPriority.put(priority, new ArrayList<Message>());
- }
- messagesGroupByStateTransitPriority.get(priority).add(message);
- }
-
- // select messages
- for (List<Message> messageList : messagesGroupByStateTransitPriority.values())
- {
- for (Message message : messageList)
- {
- String fromState = message.getFromState();
- String toState = message.getToState();
-
- if (!bounds.containsKey(fromState))
- {
- LOG.error("Message's fromState is not in currentState. message: " + message);
- continue;
- }
-
- if (!bounds.containsKey(toState))
- {
- bounds.put(toState, new Bounds(0, 0));
- }
-
- // check lower bound of fromState
- if (stateConstraints.containsKey(fromState))
- {
- int newLowerBound = bounds.get(fromState).getLowerBound() - 1;
- if (newLowerBound < 0)
- {
- LOG.error("Number of currentState in " + fromState
- + " is less than number of messages transiting from " + fromState);
- continue;
- }
-
- if (newLowerBound < stateConstraints.get(fromState).getLowerBound())
- {
- continue;
- }
- }
-
- // check upper bound of toState
- if (stateConstraints.containsKey(toState))
- {
- int newUpperBound = bounds.get(toState).getUpperBound() + 1;
- if (newUpperBound > stateConstraints.get(toState).getUpperBound())
- {
- continue;
- }
- }
-
- selectedMessages.add(message);
- bounds.get(fromState).increaseLowerBound();
- bounds.get(toState).increaseUpperBound();
- }
- }
-
- return selectedMessages;
- }
-
- /**
- * TODO: This code is duplicate in multiple places. Can we do it in to one place in the
- * beginning and compute the stateConstraint instance once and re use at other places.
- * Each IdealState must have a constraint object associated with it
- */
- private Map<String, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
- IdealState idealState,
- ClusterDataCache cache)
- {
- Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
-
- List<String> statePriorityList = stateModelDefinition.getStatesPriorityList();
- for (String state : statePriorityList)
- {
- String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state);
- int max = -1;
- if ("N".equals(numInstancesPerState))
- {
- max = cache.getLiveInstances().size();
- }
- else if ("R".equals(numInstancesPerState))
- {
- // idealState is null when resource has been dropped,
- // R can't be evaluated and ignore state constraints
- if (idealState != null)
- {
- max = cache.getReplicas(idealState.getResourceName());
- }
- }
- else
- {
- try
- {
- max = Integer.parseInt(numInstancesPerState);
- }
- catch (Exception e)
- {
- // use -1
- }
- }
-
- if (max > -1)
- {
- // if state has no constraint, will not put in map
- stateConstraints.put(state, new Bounds(0, max));
- }
- }
-
- return stateConstraints;
- }
-
- // TODO: if state transition priority is not provided then use lexicographical sorting
- // so that behavior is consistent
- private Map<String, Integer> getStateTransitionPriorityMap(StateModelDefinition stateModelDef)
- {
- Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
- List<String> stateTransitionPriorityList =
- stateModelDef.getStateTransitionPriorityList();
- for (int i = 0; i < stateTransitionPriorityList.size(); i++)
- {
- stateTransitionPriorities.put(stateTransitionPriorityList.get(i), i);
- }
-
- return stateTransitionPriorities;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageSelectionStageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageSelectionStageOutput.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageSelectionStageOutput.java
deleted file mode 100644
index 5e91905..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageSelectionStageOutput.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Partition;
-
-public class MessageSelectionStageOutput
-{
- private final Map<String, Map<Partition, List<Message>>> _messagesMap;
-
- public MessageSelectionStageOutput()
- {
- _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
- }
-
- public void addMessages(String resourceName, Partition partition,
- List<Message> selectedMessages)
- {
- if (!_messagesMap.containsKey(resourceName))
- {
- _messagesMap.put(resourceName,
- new HashMap<Partition, List<Message>>());
- }
- _messagesMap.get(resourceName).put(partition, selectedMessages);
-
- }
-
- public List<Message> getMessages(String resourceName,
- Partition partition)
- {
- Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
- if (map != null)
- {
- return map.get(partition);
- }
- return Collections.emptyList();
-
- }
-
- @Override
- public String toString()
- {
- return _messagesMap.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageThrottleStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageThrottleStage.java
deleted file mode 100644
index 8f19bcb..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageThrottleStage.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.model.ClusterConstraints;
-import com.linkedin.helix.model.ClusterConstraints.ConstraintAttribute;
-import com.linkedin.helix.model.ClusterConstraints.ConstraintItem;
-import com.linkedin.helix.model.ClusterConstraints.ConstraintType;
-import com.linkedin.helix.model.ClusterConstraints.ConstraintValue;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Partition;
-import com.linkedin.helix.model.Resource;
-
-public class MessageThrottleStage extends AbstractBaseStage
-{
- private static final Logger LOG =
- Logger.getLogger(MessageThrottleStage.class.getName());
-
- int valueOf(String valueStr)
- {
- int value = Integer.MAX_VALUE;
-
- try
- {
- ConstraintValue valueToken = ConstraintValue.valueOf(valueStr);
- switch (valueToken)
- {
- case ANY:
- value = Integer.MAX_VALUE;
- break;
- default:
- LOG.error("Invalid constraintValue token:" + valueStr + ". Use default value:"
- + Integer.MAX_VALUE);
- break;
- }
- }
- catch (Exception e)
- {
- try
- {
- value = Integer.parseInt(valueStr);
- }
- catch (NumberFormatException ne)
- {
- LOG.error("Invalid constraintValue string:" + valueStr + ". Use default value:"
- + Integer.MAX_VALUE);
- }
- }
- return value;
- }
-
- /**
- * constraints are selected in the order of the following rules: 1) don't select
- * constraints with CONSTRAINT_VALUE=ANY; 2) if one constraint is more specific than the
- * other, select the most specific one 3) if a message matches multiple constraints of
- * incomparable specificity, select the one with the minimum value 4) if a message
- * matches multiple constraints of incomparable specificity, and they all have the same
- * value, select the first in alphabetic order
- */
- Set<ConstraintItem> selectConstraints(Set<ConstraintItem> items,
- Map<ConstraintAttribute, String> attributes)
- {
- Map<String, ConstraintItem> selectedItems = new HashMap<String, ConstraintItem>();
- for (ConstraintItem item : items)
- {
- // don't select constraints with CONSTRAINT_VALUE=ANY
- if (item.getConstraintValue().equals(ConstraintValue.ANY.toString()))
- {
- continue;
- }
-
- String key = item.filter(attributes).toString();
- if (!selectedItems.containsKey(key))
- {
- selectedItems.put(key, item);
- }
- else
- {
- ConstraintItem existingItem = selectedItems.get(key);
- if (existingItem.match(item.getAttributes()))
- {
- // item is more specific than existingItem
- selectedItems.put(key, item);
- }
- else if (!item.match(existingItem.getAttributes()))
- {
- // existingItem and item are of incomparable specificity
- int value = valueOf(item.getConstraintValue());
- int existingValue = valueOf(existingItem.getConstraintValue());
- if (value < existingValue)
- {
- // item's constraint value is less than that of existingItem
- selectedItems.put(key, item);
- }
- else if (value == existingValue)
- {
- if (item.toString().compareTo(existingItem.toString()) < 0)
- {
- // item is ahead of existingItem in alphabetic order
- selectedItems.put(key, item);
- }
- }
- }
- }
- }
- return new HashSet<ConstraintItem>(selectedItems.values());
- }
-
- @Override
- public void process(ClusterEvent event) throws Exception
- {
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- MessageSelectionStageOutput msgSelectionOutput =
- event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- Map<String, Resource> resourceMap =
- event.getAttribute(AttributeName.RESOURCES.toString());
-
- if (cache == null || resourceMap == null || msgSelectionOutput == null)
- {
- throw new StageException("Missing attributes in event: " + event
- + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
- }
-
- MessageThrottleStageOutput output = new MessageThrottleStageOutput();
-
- ClusterConstraints constraint = cache.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
- Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
-
- if (constraint != null)
- {
- // go through all pending messages, they should be counted but not throttled
- for (String instance : cache.getLiveInstances().keySet())
- {
- throttle(throttleCounterMap,
- constraint,
- new ArrayList<Message>(cache.getMessages(instance).values()),
- false);
- }
- }
-
- // go through all new messages, throttle if necessary
- // assume messages should be sorted by state transition priority in messageSelection stage
- for (String resourceName : resourceMap.keySet())
- {
- Resource resource = resourceMap.get(resourceName);
- for (Partition partition : resource.getPartitions())
- {
- List<Message> messages = msgSelectionOutput.getMessages(resourceName, partition);
- if (constraint != null && messages != null && messages.size() > 0)
- {
- messages = throttle(throttleCounterMap, constraint, messages, true);
- }
- output.addMessages(resourceName, partition, messages);
- }
- }
-
- event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output);
- }
-
- private List<Message> throttle(Map<String, Integer> throttleMap,
- ClusterConstraints constraint,
- List<Message> messages,
- final boolean needThrottle)
- {
-
- List<Message> throttleOutputMsgs = new ArrayList<Message>();
- for (Message message : messages)
- {
- Map<ConstraintAttribute, String> msgAttr = ClusterConstraints.toConstraintAttributes(message);
-
- Set<ConstraintItem> matches = constraint.match(msgAttr);
- matches = selectConstraints(matches, msgAttr);
-
- boolean msgThrottled = false;
- for (ConstraintItem item : matches)
- {
- String key = item.filter(msgAttr).toString();
- if (!throttleMap.containsKey(key))
- {
- throttleMap.put(key, valueOf(item.getConstraintValue()));
- }
- int value = throttleMap.get(key);
- throttleMap.put(key, --value);
-
- if (needThrottle && value < 0)
- {
- msgThrottled = true;
-
- if (LOG.isDebugEnabled())
- {
- // TODO: printout constraint item that throttles the message
- LOG.debug("message: " + message + " is throttled by constraint: " + item);
- }
- }
- }
-
- if (!msgThrottled)
- {
- throttleOutputMsgs.add(message);
- }
- }
-
- return throttleOutputMsgs;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageThrottleStageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageThrottleStageOutput.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageThrottleStageOutput.java
deleted file mode 100644
index 73a5d81..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/MessageThrottleStageOutput.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Partition;
-
-public class MessageThrottleStageOutput
-{
- private final Map<String, Map<Partition, List<Message>>> _messagesMap;
-
- public MessageThrottleStageOutput()
- {
- _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
- }
-
- public void addMessages(String resourceName,
- Partition partition,
- List<Message> selectedMessages)
- {
- if (!_messagesMap.containsKey(resourceName))
- {
- _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>());
- }
- _messagesMap.get(resourceName).put(partition, selectedMessages);
-
- }
-
- public List<Message> getMessages(String resourceName, Partition partition)
- {
- Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
- if (map != null)
- {
- return map.get(partition);
- }
- return Collections.emptyList();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/ReadClusterDataStage.java
deleted file mode 100644
index fb22100..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ReadClusterDataStage.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.model.InstanceConfig;
-import com.linkedin.helix.monitoring.mbeans.ClusterStatusMonitor;
-
-public class ReadClusterDataStage extends AbstractBaseStage
-{
- private static final Logger logger = Logger
- .getLogger(ReadClusterDataStage.class.getName());
- ClusterDataCache _cache;
-
- public ReadClusterDataStage()
- {
- _cache = new ClusterDataCache();
- }
-
- @Override
- public void process(ClusterEvent event) throws Exception
- {
- long startTime = System.currentTimeMillis();
- logger.info("START ReadClusterDataStage.process()");
-
-
- HelixManager manager = event.getAttribute("helixmanager");
- if (manager == null)
- {
- throw new StageException("HelixManager attribute value is null");
- }
- HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
- _cache.refresh(dataAccessor);
-
- ClusterStatusMonitor clusterStatusMonitor = (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
- if(clusterStatusMonitor != null)
- {
- int disabledInstances = 0;
- int disabledPartitions = 0;
- for(InstanceConfig config : _cache._instanceConfigMap.values())
- {
- if(config.getInstanceEnabled() == false)
- {
- disabledInstances ++;
- }
- if(config.getDisabledPartitionMap() != null)
- {
- disabledPartitions += config.getDisabledPartitionMap().size();
- }
- }
- clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(), _cache._instanceConfigMap.size(),
- disabledInstances, disabledPartitions);
- }
-
- event.addAttribute("ClusterDataCache", _cache);
-
- long endTime = System.currentTimeMillis();
- logger.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/ReadHealthDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ReadHealthDataStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/ReadHealthDataStage.java
deleted file mode 100644
index f6139e7..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ReadHealthDataStage.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-
-public class ReadHealthDataStage extends AbstractBaseStage
-{
- private static final Logger LOG = Logger.getLogger(ReadHealthDataStage.class.getName());
- HealthDataCache _cache;
-
- public ReadHealthDataStage()
- {
- _cache = new HealthDataCache();
- }
-
- @Override
- public void process(ClusterEvent event) throws Exception
- {
- long startTime = System.currentTimeMillis();
-
- HelixManager manager = event.getAttribute("helixmanager");
- if (manager == null)
- {
- throw new StageException("HelixManager attribute value is null");
- }
- // DataAccessor dataAccessor = manager.getDataAccessor();
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- _cache.refresh(accessor);
-
- event.addAttribute("HealthDataCache", _cache);
-
- long processLatency = System.currentTimeMillis() - startTime;
- addLatencyToMonitor(event, processLatency);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/ResourceComputationStage.java
deleted file mode 100644
index 7980ac7..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/ResourceComputationStage.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Resource;
-
-/**
- * This stage computes all the resources in a cluster. The resources are
- * computed from IdealStates -> this gives all the resources currently active
- * CurrentState for liveInstance-> Helps in finding resources that are inactive
- * and needs to be dropped
- *
- * @author kgopalak
- *
- */
-public class ResourceComputationStage extends AbstractBaseStage
-{
- private static Logger LOG = Logger.getLogger(ResourceComputationStage.class);
-
- @Override
- public void process(ClusterEvent event) throws Exception
- {
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- if (cache == null)
- {
- throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
- }
-
- Map<String, IdealState> idealStates = cache.getIdealStates();
-
- Map<String, Resource> resourceMap = new LinkedHashMap<String, Resource>();
-
- if (idealStates != null && idealStates.size() > 0)
- {
- for (IdealState idealState : idealStates.values())
- {
- Set<String> partitionSet = idealState.getPartitionSet();
- String resourceName = idealState.getResourceName();
-
- for (String partition : partitionSet)
- {
- addPartition(partition, resourceName, resourceMap);
- Resource resource = resourceMap.get(resourceName);
- resource.setStateModelDefRef(idealState.getStateModelDefRef());
- resource.setStateModelFactoryName(idealState.getStateModelFactoryName());
- resource.setBucketSize(idealState.getBucketSize());
- resource.setGroupMessageMode(idealState.getGroupMessageMode());
- }
- }
- }
-
- // It's important to get partitions from CurrentState as well since the
- // idealState might be removed.
- Map<String, LiveInstance> availableInstances = cache.getLiveInstances();
-
- if (availableInstances != null && availableInstances.size() > 0)
- {
- for (LiveInstance instance : availableInstances.values())
- {
- String instanceName = instance.getInstanceName();
- String clientSessionId = instance.getSessionId();
-
- Map<String, CurrentState> currentStateMap = cache.getCurrentState(instanceName,
- clientSessionId);
- if (currentStateMap == null || currentStateMap.size() == 0)
- {
- continue;
- }
- for (CurrentState currentState : currentStateMap.values())
- {
-
- String resourceName = currentState.getResourceName();
- Map<String, String> resourceStateMap = currentState.getPartitionStateMap();
-
- // don't overwrite ideal state settings
- if (!resourceMap.containsKey(resourceName))
- {
- addResource(resourceName, resourceMap);
- Resource resource = resourceMap.get(resourceName);
- resource.setStateModelDefRef(currentState.getStateModelDefRef());
- resource.setStateModelFactoryName(currentState.getStateModelFactoryName());
- resource.setBucketSize(currentState.getBucketSize());
- resource.setGroupMessageMode(currentState.getGroupMessageMode());
- }
-
- if (currentState.getStateModelDefRef() == null)
- {
- LOG.error("state model def is null." + "resource:" + currentState.getResourceName()
- + ", partitions: " + currentState.getPartitionStateMap().keySet() + ", states: "
- + currentState.getPartitionStateMap().values());
- throw new StageException("State model def is null for resource:"
- + currentState.getResourceName());
- }
-
- for (String partition : resourceStateMap.keySet())
- {
- addPartition(partition, resourceName, resourceMap);
- }
- }
- }
- }
-
- event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
- }
-
- private void addResource(String resource, Map<String, Resource> resourceMap)
- {
- if (resource == null || resourceMap == null)
- {
- return;
- }
- if (!resourceMap.containsKey(resource))
- {
- resourceMap.put(resource, new Resource(resource));
- }
- }
-
- private void addPartition(String partition, String resourceName, Map<String, Resource> resourceMap)
- {
- if (resourceName == null || partition == null || resourceMap == null)
- {
- return;
- }
- if (!resourceMap.containsKey(resourceName))
- {
- resourceMap.put(resourceName, new Resource(resourceName));
- }
- Resource resource = resourceMap.get(resourceName);
- resource.addPartition(partition);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/StatsAggregationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/StatsAggregationStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/StatsAggregationStage.java
deleted file mode 100644
index 0e8a5ea..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/StatsAggregationStage.java
+++ /dev/null
@@ -1,457 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.HelixProperty;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.alerts.AlertParser;
-import com.linkedin.helix.alerts.AlertProcessor;
-import com.linkedin.helix.alerts.AlertValueAndStatus;
-import com.linkedin.helix.alerts.AlertsHolder;
-import com.linkedin.helix.alerts.ExpressionParser;
-import com.linkedin.helix.alerts.StatsHolder;
-import com.linkedin.helix.alerts.Tuple;
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageContext;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.healthcheck.StatHealthReportProvider;
-import com.linkedin.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory.ActionOnError;
-import com.linkedin.helix.model.AlertHistory;
-import com.linkedin.helix.model.HealthStat;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.PersistentStats;
-import com.linkedin.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
-
-/**
- * For each LiveInstances select currentState and message whose sessionId matches
- * sessionId from LiveInstance Get Partition,State for all the resources computed in
- * previous State [ResourceComputationStage]
- *
- * @author asilbers
- *
- */
-public class StatsAggregationStage extends AbstractBaseStage
-{
-
- public static final int ALERT_HISTORY_SIZE = 30;
-
- private static final Logger logger =
- Logger.getLogger(StatsAggregationStage.class.getName());
-
- StatsHolder _statsHolder = null;
- AlertsHolder _alertsHolder = null;
- Map<String, Map<String, AlertValueAndStatus>> _alertStatus;
- Map<String, Tuple<String>> _statStatus;
- ClusterAlertMBeanCollection _alertBeanCollection = new ClusterAlertMBeanCollection();
- Map<String, String> _alertActionTaken = new HashMap<String, String>();
-
- public final String PARTICIPANT_STAT_REPORT_NAME = StatHealthReportProvider.REPORT_NAME;
- public final String ESPRESSO_STAT_REPORT_NAME = "RestQueryStats";
- public final String REPORT_NAME = "AggStats";
- // public final String DEFAULT_AGG_TYPE = "decay";
- // public final String DEFAULT_DECAY_PARAM = "0.1";
- // public final String DEFAULT_AGG_TYPE = "window";
- // public final String DEFAULT_DECAY_PARAM = "5";
-
- public StatHealthReportProvider _aggStatsProvider;
-
- // public AggregationType _defaultAggType;
-
- public Map<String, Map<String, AlertValueAndStatus>> getAlertStatus()
- {
- return _alertStatus;
- }
-
- public Map<String, Tuple<String>> getStatStatus()
- {
- return _statStatus;
- }
-
- public void persistAggStats(HelixManager manager)
- {
- Map<String, String> report = _aggStatsProvider.getRecentHealthReport();
- Map<String, Map<String, String>> partitionReport =
- _aggStatsProvider.getRecentPartitionHealthReport();
- ZNRecord record = new ZNRecord(_aggStatsProvider.getReportName());
- if (report != null)
- {
- record.setSimpleFields(report);
- }
- if (partitionReport != null)
- {
- record.setMapFields(partitionReport);
- }
-
-// DataAccessor accessor = manager.getDataAccessor();
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
-// boolean retVal = accessor.setProperty(PropertyType.PERSISTENTSTATS, record);
- Builder keyBuilder = accessor.keyBuilder();
- boolean retVal = accessor.setProperty(keyBuilder.persistantStat(), new PersistentStats(record));
- if (retVal == false)
- {
- logger.error("attempt to persist derived stats failed");
- }
- }
-
- @Override
- public void init(StageContext context)
- {
- }
-
- public String getAgeStatName(String instance)
- {
- return instance + ExpressionParser.statFieldDelim + "reportingage";
- }
-
- // currTime in seconds
- public void reportAgeStat(LiveInstance instance, long modifiedTime, long currTime)
- {
- String statName = getAgeStatName(instance.getInstanceName());
- long age = (currTime - modifiedTime) / 1000; // XXX: ensure this is in
- // seconds
- Map<String, String> ageStatMap = new HashMap<String, String>();
- ageStatMap.put(StatsHolder.TIMESTAMP_NAME, String.valueOf(currTime));
- ageStatMap.put(StatsHolder.VALUE_NAME, String.valueOf(age));
- // note that applyStat will only work if alert already added
- _statsHolder.applyStat(statName, ageStatMap);
- }
-
- @Override
- public void process(ClusterEvent event) throws Exception
- {
- long startTime = System.currentTimeMillis();
- // String aggTypeName =
- // DEFAULT_AGG_TYPE+AggregationType.DELIM+DEFAULT_DECAY_PARAM;
- // _defaultAggType = AggregationTypeFactory.getAggregationType(aggTypeName);
-
- HelixManager manager = event.getAttribute("helixmanager");
- HealthDataCache cache = event.getAttribute("HealthDataCache");
-
- if (manager == null || cache == null)
- {
- throw new StageException("helixmanager|HealthDataCache attribute value is null");
- }
- if(_alertsHolder == null)
- {
- _statsHolder = new StatsHolder(manager, cache);
- _alertsHolder = new AlertsHolder(manager, cache, _statsHolder);
- }
- else
- {
- _statsHolder.updateCache(cache);
- _alertsHolder.updateCache(cache);
- }
- if (_statsHolder.getStatsList().size() == 0)
- {
- logger.info("stat holder is empty");
- return;
- }
-
- // init agg stats from cache
- // initAggStats(cache);
-
- Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-
- long currTime = System.currentTimeMillis();
- // for each live node, read node's stats
- long readInstancesStart = System.currentTimeMillis();
- for (LiveInstance instance : liveInstances.values())
- {
- String instanceName = instance.getInstanceName();
- logger.debug("instanceName: " + instanceName);
- // XXX: now have map of HealthStats, so no need to traverse them...verify
- // correctness
- Map<String, HealthStat> stats;
- stats = cache.getHealthStats(instanceName);
- // find participants stats
- long modTime = -1;
- // TODO: get healthreport child node modified time and reportAgeStat based on that
- boolean reportedAge = false;
- for (HealthStat participantStat : stats.values())
- {
- if (participantStat != null && !reportedAge)
- {
- // generate and report stats for how old this node's report is
- modTime = participantStat.getLastModifiedTimeStamp();
- reportAgeStat(instance, modTime, currTime);
- reportedAge = true;
- }
- // System.out.println(modTime);
- // XXX: need to convert participantStat to a better format
- // need to get instanceName in here
-
- if (participantStat != null)
- {
- // String timestamp = String.valueOf(instance.getModifiedTime()); WANT
- // REPORT LEVEL TS
- Map<String, Map<String, String>> statMap =
- participantStat.getHealthFields(instanceName);
- for (String key : statMap.keySet())
- {
- _statsHolder.applyStat(key, statMap.get(key));
- }
- }
- }
- }
- // Call _statsHolder.persistStats() once per pipeline. This will
- // write the updated persisted stats into zookeeper
- _statsHolder.persistStats();
- logger.info("Done processing stats: "
- + (System.currentTimeMillis() - readInstancesStart));
- // populate _statStatus
- _statStatus = _statsHolder.getStatsMap();
-
- for (String statKey : _statStatus.keySet())
- {
- logger.debug("Stat key, value: " + statKey + ": " + _statStatus.get(statKey));
- }
-
- long alertExecuteStartTime = System.currentTimeMillis();
- // execute alerts, populate _alertStatus
- _alertStatus =
- AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(),
- _statsHolder.getStatsList());
- logger.info("done executing alerts: "
- + (System.currentTimeMillis() - alertExecuteStartTime));
- for (String originAlertName : _alertStatus.keySet())
- {
- _alertBeanCollection.setAlerts(originAlertName,
- _alertStatus.get(originAlertName),
- manager.getClusterName());
- }
-
- executeAlertActions(manager);
- // Write alert fire history to zookeeper
- updateAlertHistory(manager);
- long writeAlertStartTime = System.currentTimeMillis();
- // write out alert status (to zk)
- _alertsHolder.addAlertStatusSet(_alertStatus);
- logger.info("done writing alerts: "
- + (System.currentTimeMillis() - writeAlertStartTime));
-
- // TODO: access the 2 status variables from somewhere to populate graphs
-
- long logAlertStartTime = System.currentTimeMillis();
- // logging alert status
- for (String alertOuterKey : _alertStatus.keySet())
- {
- logger.debug("Alert Outer Key: " + alertOuterKey);
- Map<String, AlertValueAndStatus> alertInnerMap = _alertStatus.get(alertOuterKey);
- if (alertInnerMap == null)
- {
- logger.debug(alertOuterKey + " has no alerts to report.");
- continue;
- }
- for (String alertInnerKey : alertInnerMap.keySet())
- {
- logger.debug(" " + alertInnerKey + " value: "
- + alertInnerMap.get(alertInnerKey).getValue() + ", status: "
- + alertInnerMap.get(alertInnerKey).isFired());
- }
- }
-
- logger.info("done logging alerts: "
- + (System.currentTimeMillis() - logAlertStartTime));
-
- long processLatency = System.currentTimeMillis() - startTime;
- addLatencyToMonitor(event, processLatency);
- logger.info("process end: " + processLatency);
- }
-
- /**
- * Go through the _alertStatus, and call executeAlertAction for those actual alerts that
- * has been fired
- */
-
- void executeAlertActions( HelixManager manager)
- {
- _alertActionTaken.clear();
- // Go through the original alert strings
- for(String originAlertName : _alertStatus.keySet())
- {
- Map<String, String> alertFields = _alertsHolder.getAlertsMap().get(originAlertName);
- if(alertFields != null && alertFields.containsKey(AlertParser.ACTION_NAME))
- {
- String actionValue = alertFields.get(AlertParser.ACTION_NAME);
- Map<String, AlertValueAndStatus> alertResultMap = _alertStatus.get(originAlertName);
- if(alertResultMap == null)
- {
- logger.info("Alert "+ originAlertName + " does not have alert status map");
- continue;
- }
- // For each original alert, iterate all actual alerts that it expands into
- for(String actualStatName : alertResultMap.keySet())
- {
- // if the actual alert is fired, execute the action
- if(alertResultMap.get(actualStatName).isFired())
- {
- logger.warn("Alert " + originAlertName + " action " + actionValue + " is triggered by " + actualStatName);
- _alertActionTaken.put(actualStatName, actionValue);
- // move functionalities into a seperate class
- executeAlertAction(actualStatName, actionValue, manager);
- }
- }
- }
- }
- }
- /**
- * Execute the action if an alert is fired, and the alert has an action associated with it.
- * NOTE: consider unify this with DefaultParticipantErrorMessageHandler.handleMessage()
- */
- void executeAlertAction(String actualStatName, String actionValue, HelixManager manager)
- {
- if(actionValue.equals(ActionOnError.DISABLE_INSTANCE.toString()))
- {
- String instanceName = parseInstanceName(actualStatName, manager);
- if(instanceName != null)
- {
- logger.info("Disabling instance " + instanceName);
- manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), instanceName, false);
- }
- }
- else if(actionValue.equals(ActionOnError.DISABLE_PARTITION.toString()))
- {
- String instanceName = parseInstanceName(actualStatName, manager);
- String resourceName = parseResourceName(actualStatName, manager);
- String partitionName = parsePartitionName(actualStatName, manager);
- if(instanceName != null && resourceName != null && partitionName != null)
- {
- logger.info("Disabling partition " + partitionName + " instanceName " + instanceName);
- manager.getClusterManagmentTool().enablePartition(false, manager.getClusterName(), instanceName,
- resourceName, Arrays.asList(partitionName));
- }
- }
- else if(actionValue.equals(ActionOnError.DISABLE_RESOURCE.toString()))
- {
- String instanceName = parseInstanceName(actualStatName, manager);
- String resourceName = parseResourceName(actualStatName, manager);
- logger.info("Disabling resource " + resourceName + " instanceName " + instanceName + " not implemented");
-
- }
- }
-
- public static String parseResourceName(String actualStatName, HelixManager manager)
- {
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder kb = accessor.keyBuilder();
- List<IdealState> idealStates = accessor.getChildValues(kb.idealStates());
- for (IdealState idealState : idealStates)
- {
- String resourceName = idealState.getResourceName();
- if(actualStatName.contains("=" + resourceName + ".") || actualStatName.contains("=" + resourceName + ";"))
- {
- return resourceName;
- }
- }
- return null;
- }
-
- public static String parsePartitionName(String actualStatName, HelixManager manager)
- {
- String resourceName = parseResourceName(actualStatName, manager);
- if(resourceName != null)
- {
- String partitionKey = "=" + resourceName + "_";
- if(actualStatName.contains(partitionKey))
- {
- int pos = actualStatName.indexOf(partitionKey);
- int nextDotPos = actualStatName.indexOf('.', pos + partitionKey.length());
- int nextCommaPos = actualStatName.indexOf(';', pos + partitionKey.length());
- if(nextCommaPos > 0 && nextCommaPos < nextDotPos)
- {
- nextDotPos = nextCommaPos;
- }
-
- String partitionName = actualStatName.substring(pos + 1, nextDotPos);
- return partitionName;
- }
- }
- return null;
- }
-
- public static String parseInstanceName(String actualStatName, HelixManager manager)
- {
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder kb = accessor.keyBuilder();
- List<LiveInstance> liveInstances = accessor.getChildValues(kb.liveInstances());
- for (LiveInstance instance : liveInstances)
- {
- String instanceName = instance.getInstanceName();
- if(actualStatName.startsWith(instanceName))
- {
- return instanceName;
- }
- }
- return null;
- }
-
- void updateAlertHistory(HelixManager manager)
- {
- // Write alert fire history to zookeeper
- _alertBeanCollection.refreshAlertDelta(manager.getClusterName());
- Map<String, String> delta = _alertBeanCollection.getRecentAlertDelta();
- // Update history only when some beans has changed
- if(delta.size() > 0)
- {
- delta.putAll(_alertActionTaken);
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh:mm:ss:SSS");
- String date = dateFormat.format(new Date());
-
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- HelixProperty property = accessor.getProperty(keyBuilder.alertHistory());
- ZNRecord alertFiredHistory;
- if(property == null)
- {
- alertFiredHistory = new ZNRecord(PropertyType.ALERT_HISTORY.toString());
- }
- else
- {
- alertFiredHistory = property.getRecord();
- }
- while(alertFiredHistory.getMapFields().size() >= ALERT_HISTORY_SIZE)
- {
- // ZNRecord uses TreeMap which is sorted ascending internally
- String firstKey = (String)(alertFiredHistory.getMapFields().keySet().toArray()[0]);
- alertFiredHistory.getMapFields().remove(firstKey);
- }
- alertFiredHistory.setMapField(date, delta);
-// manager.getDataAccessor().setProperty(PropertyType.ALERT_HISTORY, alertFiredHistory);
- accessor.setProperty(keyBuilder.alertHistory(), new AlertHistory(alertFiredHistory));
- _alertBeanCollection.setAlertHistory(alertFiredHistory);
- }
- }
-
- public ClusterAlertMBeanCollection getClusterAlertMBeanCollection()
- {
- return _alertBeanCollection;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/TaskAssignmentStage.java
deleted file mode 100644
index c986cec..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/TaskAssignmentStage.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.controller.stages;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.controller.pipeline.AbstractBaseStage;
-import com.linkedin.helix.controller.pipeline.StageException;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Partition;
-import com.linkedin.helix.model.Resource;
-
-public class TaskAssignmentStage extends AbstractBaseStage
-{
- private static Logger logger = Logger.getLogger(TaskAssignmentStage.class);
-
- @Override
- public void process(ClusterEvent event) throws Exception
- {
- long startTime = System.currentTimeMillis();
- logger.info("START TaskAssignmentStage.process()");
-
- HelixManager manager = event.getAttribute("helixmanager");
- Map<String, Resource> resourceMap =
- event.getAttribute(AttributeName.RESOURCES.toString());
- MessageThrottleStageOutput messageOutput =
- event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
-
- if (manager == null || resourceMap == null || messageOutput == null)
- {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache");
- }
-
- HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
- List<Message> messagesToSend = new ArrayList<Message>();
- for (String resourceName : resourceMap.keySet())
- {
- Resource resource = resourceMap.get(resourceName);
- for (Partition partition : resource.getPartitions())
- {
- List<Message> messages = messageOutput.getMessages(resourceName, partition);
- messagesToSend.addAll(messages);
- }
- }
-
- List<Message> outputMessages =
- groupMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap);
- sendMessages(dataAccessor, outputMessages);
-
- long endTime = System.currentTimeMillis();
- logger.info("END TaskAssignmentStage.process(). took: " + (endTime - startTime)
- + " ms");
-
- }
-
- List<Message> groupMessage(Builder keyBuilder,
- List<Message> messages,
- Map<String, Resource> resourceMap)
- {
- // group messages by its CurrentState path + "/" + fromState + "/" + toState
- Map<String, Message> groupMessages = new HashMap<String, Message>();
- List<Message> outputMessages = new ArrayList<Message>();
-
- Iterator<Message> iter = messages.iterator();
- while (iter.hasNext())
- {
- Message message = iter.next();
- String resourceName = message.getResourceName();
- Resource resource = resourceMap.get(resourceName);
- if (resource == null || !resource.getGroupMessageMode())
- {
- outputMessages.add(message);
- continue;
- }
-
- String key =
- keyBuilder.currentState(message.getTgtName(),
- message.getTgtSessionId(),
- message.getResourceName()).getPath()
- + "/" + message.getFromState() + "/" + message.getToState();
-
- if (!groupMessages.containsKey(key))
- {
- Message groupMessage = new Message(message.getRecord());
- groupMessage.setGroupMessageMode(true);
- outputMessages.add(groupMessage);
- groupMessages.put(key, groupMessage);
- }
- groupMessages.get(key).addPartitionName(message.getPartitionName());
- }
-
- return outputMessages;
- }
-
- protected void sendMessages(HelixDataAccessor dataAccessor, List<Message> messages)
- {
- if (messages == null || messages.isEmpty())
- {
- return;
- }
-
- Builder keyBuilder = dataAccessor.keyBuilder();
-
- List<PropertyKey> keys = new ArrayList<PropertyKey>();
- for (Message message : messages)
- {
- logger.info("Sending Message " + message.getMsgId() + " to " + message.getTgtName()
- + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
- + " from:" + message.getFromState() + " to:" + message.getToState());
-
- keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
- }
-
- dataAccessor.createChildren(keys, new ArrayList<Message>(messages));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/controller/stages/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/controller/stages/package-info.java b/helix-core/src/main/java/com/linkedin/helix/controller/stages/package-info.java
deleted file mode 100644
index baef0ff..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/controller/stages/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Stages in Helix controller pipelines
- *
- */
-package com.linkedin.helix.controller.stages;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/examples/BootstrapHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/examples/BootstrapHandler.java b/helix-core/src/main/java/com/linkedin/helix/examples/BootstrapHandler.java
deleted file mode 100644
index 43f4b85..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/examples/BootstrapHandler.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.examples;
-
-import java.util.UUID;
-
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.ClusterMessagingService;
-import com.linkedin.helix.Criteria;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageState;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.participant.statemachine.StateModel;
-import com.linkedin.helix.participant.statemachine.StateModelFactory;
-import com.linkedin.helix.participant.statemachine.StateModelInfo;
-import com.linkedin.helix.participant.statemachine.Transition;
-
-public class BootstrapHandler extends StateModelFactory<StateModel>
-{
-
- @Override
- public StateModel createNewStateModel(String stateUnitKey)
- {
- return new BootstrapStateModel(stateUnitKey);
- }
-
- @StateModelInfo(initialState = "OFFLINE", states = "{'OFFLINE','SLAVE','MASTER'}")
- public static class BootstrapStateModel extends StateModel
- {
-
- private final String _stateUnitKey;
-
- public BootstrapStateModel(String stateUnitKey)
- {
- _stateUnitKey = stateUnitKey;
-
- }
- @Transition(from = "MASTER", to = "SLAVE")
- public void masterToSlave(Message message, NotificationContext context)
- {
-
- }
- @Transition(from = "OFFLINE", to = "SLAVE")
- public void offlineToSlave(Message message, NotificationContext context)
- {
- System.out
- .println("BootstrapProcess.BootstrapStateModel.offlineToSlave()");
- HelixManager manager = context.getManager();
- ClusterMessagingService messagingService = manager.getMessagingService();
- Message requestBackupUriRequest = new Message(
- MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
- requestBackupUriRequest
- .setMsgSubType(BootstrapProcess.REQUEST_BOOTSTRAP_URL);
- requestBackupUriRequest.setMsgState(MessageState.NEW);
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setInstanceName("*");
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setResource(message.getResourceName());
- recipientCriteria.setPartition(message.getPartitionName());
- recipientCriteria.setSessionSpecific(true);
- // wait for 30 seconds
- int timeout = 30000;
- BootstrapReplyHandler responseHandler = new BootstrapReplyHandler();
-
- int sentMessageCount = messagingService.sendAndWait(recipientCriteria,
- requestBackupUriRequest, responseHandler, timeout);
- if (sentMessageCount == 0)
- {
- // could not find any other node hosting the partition
- } else if (responseHandler.getBootstrapUrl() != null)
- {
- System.out.println("Got bootstrap url:"+ responseHandler.getBootstrapUrl() );
- System.out.println("Got backup time:"+ responseHandler.getBootstrapTime() );
- // Got the url fetch it
- } else
- {
- // Either go to error state
- // throw new Exception("Cant find backup/bootstrap data");
- // Request some node to start backup process
- }
- }
- @Transition(from = "SLAVE", to = "OFFLINE")
- public void slaveToOffline(Message message, NotificationContext context)
- {
- System.out
- .println("BootstrapProcess.BootstrapStateModel.slaveToOffline()");
- }
- @Transition(from = "SLAVE", to = "MASTER")
- public void slaveToMaster(Message message, NotificationContext context)
- {
- System.out
- .println("BootstrapProcess.BootstrapStateModel.slaveToMaster()");
- }
-
- }
-}
\ No newline at end of file