You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/16 01:51:43 UTC
[6/9] [HELIX-209] Shuffling around rebalancer code to allow for
compatibility
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
new file mode 100644
index 0000000..5d9746b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
@@ -0,0 +1,213 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.config.SchedulerTaskConfig;
+import org.apache.helix.api.id.MessageId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.SessionId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.api.rebalancer.RebalancerContext;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * Compares the currentState, pendingState with IdealState and generate messages
+ */
+public class MessageGenerationStage extends AbstractBaseStage {
+ private static Logger LOG = Logger.getLogger(MessageGenerationStage.class);
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ HelixManager manager = event.getAttribute("helixmanager");
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ Map<StateModelDefId, StateModelDefinition> stateModelDefMap = cluster.getStateModelMap();
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ ResourceCurrentState currentStateOutput =
+ event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ BestPossibleStateOutput bestPossibleStateOutput =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+ if (manager == null || cluster == null || resourceMap == null || currentStateOutput == null
+ || bestPossibleStateOutput == null) {
+ throw new StageException("Missing attributes in event:" + event
+ + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
+ }
+
+ MessageOutput output = new MessageOutput();
+
+ for (ResourceId resourceId : resourceMap.keySet()) {
+ ResourceConfig resourceConfig = resourceMap.get(resourceId);
+ int bucketSize = resourceConfig.getBucketSize();
+
+ RebalancerContext rebalancerCtx =
+ resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
+ StateModelDefinition stateModelDef = stateModelDefMap.get(rebalancerCtx.getStateModelDefId());
+
+ ResourceAssignment resourceAssignment =
+ bestPossibleStateOutput.getResourceAssignment(resourceId);
+ for (PartitionId subUnitId : resourceConfig.getSubUnitMap().keySet()) {
+ Map<ParticipantId, State> instanceStateMap = resourceAssignment.getReplicaMap(subUnitId);
+
+ // we should generate message based on the desired-state priority
+ // so keep generated messages in a temp map keyed by state
+ // desired-state->list of generated-messages
+ Map<State, List<Message>> messageMap = new HashMap<State, List<Message>>();
+
+ for (ParticipantId participantId : instanceStateMap.keySet()) {
+ State desiredState = instanceStateMap.get(participantId);
+
+ State currentState =
+ currentStateOutput.getCurrentState(resourceId, subUnitId, participantId);
+ if (currentState == null) {
+ currentState = stateModelDef.getTypedInitialState();
+ }
+
+ if (desiredState.equals(currentState)) {
+ continue;
+ }
+
+ State pendingState =
+ currentStateOutput.getPendingState(resourceId, subUnitId, participantId);
+
+ // TODO fix it
+ State nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
+ if (nextState == null) {
+ LOG.error("Unable to find a next state for partition: " + subUnitId
+ + " from stateModelDefinition" + stateModelDef.getClass() + " from:" + currentState
+ + " to:" + desiredState);
+ continue;
+ }
+
+ if (pendingState != null) {
+ if (nextState.equals(pendingState)) {
+ LOG.debug("Message already exists for " + participantId + " to transit " + subUnitId
+ + " from " + currentState + " to " + nextState);
+ } else if (currentState.equals(pendingState)) {
+ LOG.info("Message hasn't been removed for " + participantId + " to transit"
+ + subUnitId + " to " + pendingState + ", desiredState: " + desiredState);
+ } else {
+ LOG.info("IdealState changed before state transition completes for " + subUnitId
+ + " on " + participantId + ", pendingState: " + pendingState + ", currentState: "
+ + currentState + ", nextState: " + nextState);
+ }
+ } else {
+ // TODO check if instance is alive
+ SessionId sessionId =
+ cluster.getLiveParticipantMap().get(participantId).getRunningInstance()
+ .getSessionId();
+ RebalancerContext rebalancerContext =
+ resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
+ Message message =
+ createMessage(manager, resourceId, subUnitId, participantId, currentState,
+ nextState, sessionId, StateModelDefId.from(stateModelDef.getId()),
+ rebalancerContext.getStateModelFactoryId(), bucketSize);
+
+ // TODO refactor get/set timeout/inner-message
+ if (rebalancerContext != null
+ && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
+ StateModelDefId.SchedulerTaskQueue)) {
+ if (resourceConfig.getSubUnitMap().size() > 0) {
+ // TODO refactor it -- we need a way to read in scheduler tasks a priori
+ Message innerMsg =
+ resourceConfig.getSchedulerTaskConfig().getInnerMessage(subUnitId);
+ if (innerMsg != null) {
+ message.setInnerMessage(innerMsg);
+ }
+ }
+ }
+
+ // Set timeout if needed
+ String stateTransition =
+ String.format("%s-%s_%s", currentState, nextState,
+ Message.Attributes.TIMEOUT.name());
+ SchedulerTaskConfig schedulerTaskConfig = resourceConfig.getSchedulerTaskConfig();
+ if (schedulerTaskConfig != null) {
+ int timeout = schedulerTaskConfig.getTimeout(stateTransition, subUnitId);
+ if (timeout > 0) {
+ message.setExecutionTimeout(timeout);
+ }
+ }
+ message.setClusterEvent(event);
+
+ if (!messageMap.containsKey(desiredState)) {
+ messageMap.put(desiredState, new ArrayList<Message>());
+ }
+ messageMap.get(desiredState).add(message);
+ }
+ }
+
+ // add generated messages to output according to state priority
+ List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
+ for (State state : statesPriorityList) {
+ if (messageMap.containsKey(state)) {
+ for (Message message : messageMap.get(state)) {
+ output.addMessage(resourceId, subUnitId, message);
+ }
+ }
+ }
+
+ } // end of for-each-partition
+ }
+ event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
+ // System.out.println("output: " + output);
+ }
+
+ private Message createMessage(HelixManager manager, ResourceId resourceId,
+ PartitionId partitionId, ParticipantId participantId, State currentState, State nextState,
+ SessionId participantSessionId, StateModelDefId stateModelDefId,
+ StateModelFactoryId stateModelFactoryId, int bucketSize) {
+ MessageId uuid = MessageId.from(UUID.randomUUID().toString());
+ Message message = new Message(MessageType.STATE_TRANSITION, uuid);
+ message.setSrcName(manager.getInstanceName());
+ message.setTgtName(participantId.stringify());
+ message.setMsgState(MessageState.NEW);
+ message.setPartitionId(partitionId);
+ message.setResourceId(resourceId);
+ message.setFromState(currentState);
+ message.setToState(nextState);
+ message.setTgtSessionId(participantSessionId);
+ message.setSrcSessionId(SessionId.from(manager.getSessionId()));
+ message.setStateModelDef(stateModelDefId);
+ message.setStateModelFactoryId(stateModelFactoryId);
+ message.setBucketSize(bucketSize);
+
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
new file mode 100644
index 0000000..9c8c154
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
@@ -0,0 +1,79 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.model.Message;
+
+public class MessageOutput {
+
+ private final Map<ResourceId, Map<PartitionId, List<Message>>> _messagesMap;
+
+ public MessageOutput() {
+ _messagesMap = new HashMap<ResourceId, Map<PartitionId, List<Message>>>();
+
+ }
+
+ public void addMessage(ResourceId resourceId, PartitionId partitionId, Message message) {
+ if (!_messagesMap.containsKey(resourceId)) {
+ _messagesMap.put(resourceId, new HashMap<PartitionId, List<Message>>());
+ }
+ if (!_messagesMap.get(resourceId).containsKey(partitionId)) {
+ _messagesMap.get(resourceId).put(partitionId, new ArrayList<Message>());
+
+ }
+ _messagesMap.get(resourceId).get(partitionId).add(message);
+
+ }
+
+ public void setMessages(ResourceId resourceId, PartitionId partitionId,
+ List<Message> selectedMessages) {
+ if (!_messagesMap.containsKey(resourceId)) {
+ _messagesMap.put(resourceId, new HashMap<PartitionId, List<Message>>());
+ }
+ _messagesMap.get(resourceId).put(partitionId, selectedMessages);
+
+ }
+
+ public List<Message> getMessages(ResourceId resourceId, PartitionId partitionId) {
+ Map<PartitionId, List<Message>> map = _messagesMap.get(resourceId);
+ if (map != null) {
+ return map.get(partitionId);
+ }
+ return Collections.emptyList();
+
+ }
+
+ public Map<PartitionId, List<Message>> getMessages(ResourceId resourceId) {
+ return _messagesMap.get(resourceId);
+ }
+
+ @Override
+ public String toString() {
+ return _messagesMap.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 1a3f37b..15004a6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -26,18 +26,25 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.Scope;
import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.rebalancer.RebalancerConfig;
+import org.apache.helix.api.rebalancer.RebalancerContext;
+import org.apache.helix.api.rebalancer.ReplicatedRebalancerContext;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
-@Deprecated
public class MessageSelectionStage extends AbstractBaseStage {
private static final Logger LOG = Logger.getLogger(MessageSelectionStage.class);
@@ -73,41 +80,54 @@ public class MessageSelectionStage extends AbstractBaseStage {
public int getUpperBound() {
return upper;
}
+
+ @Override
+ public String toString() {
+ return String.format("%d-%d", lower, upper);
+ }
}
@Override
public void process(ClusterEvent event) throws Exception {
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
- CurrentStateOutput currentStateOutput =
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ Map<StateModelDefId, StateModelDefinition> stateModelDefMap = cluster.getStateModelMap();
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ ResourceCurrentState currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.toString());
- MessageGenerationOutput messageGenOutput =
- event.getAttribute(AttributeName.MESSAGES_ALL.toString());
- if (cache == null || resourceMap == null || currentStateOutput == null
+ MessageOutput messageGenOutput = event.getAttribute(AttributeName.MESSAGES_ALL.toString());
+ if (cluster == null || resourceMap == null || currentStateOutput == null
|| messageGenOutput == null) {
throw new StageException("Missing attributes in event:" + event
+ ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
}
- MessageSelectionStageOutput output = new MessageSelectionStageOutput();
+ MessageOutput output = new MessageOutput();
- for (String resourceName : resourceMap.keySet()) {
- Resource resource = resourceMap.get(resourceName);
- StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
+ for (ResourceId resourceId : resourceMap.keySet()) {
+ ResourceConfig resource = resourceMap.get(resourceId);
+ StateModelDefinition stateModelDef =
+ stateModelDefMap.get(resource.getRebalancerConfig()
+ .getRebalancerContext(RebalancerContext.class).getStateModelDefId());
+ // TODO have a logical model for transition
Map<String, Integer> stateTransitionPriorities = getStateTransitionPriorityMap(stateModelDef);
- IdealState idealState = cache.getIdealState(resourceName);
- Map<String, Bounds> stateConstraints =
- computeStateConstraints(stateModelDef, idealState, cache);
+ Resource configResource = cluster.getResource(resourceId);
+
+ // if configResource == null, the resource has been dropped
+ Map<State, Bounds> stateConstraints =
+ computeStateConstraints(stateModelDef,
+ configResource == null ? null : configResource.getRebalancerConfig(), cluster);
- for (Partition partition : resource.getPartitions()) {
- List<Message> messages = messageGenOutput.getMessages(resourceName, partition);
+ // TODO fix it
+ for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
+ List<Message> messages = messageGenOutput.getMessages(resourceId, partitionId);
List<Message> selectedMessages =
- selectMessages(cache.getLiveInstances(),
- currentStateOutput.getCurrentStateMap(resourceName, partition),
- currentStateOutput.getPendingStateMap(resourceName, partition), messages,
- stateConstraints, stateTransitionPriorities, stateModelDef.getInitialState());
- output.addMessages(resourceName, partition, selectedMessages);
+ selectMessages(cluster.getLiveParticipantMap(),
+ currentStateOutput.getCurrentStateMap(resourceId, partitionId),
+ currentStateOutput.getPendingStateMap(resourceId, partitionId), messages,
+ stateConstraints, stateTransitionPriorities, stateModelDef.getTypedInitialState());
+ output.setMessages(resourceId, partitionId, selectedMessages);
}
}
event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
@@ -132,22 +152,22 @@ public class MessageSelectionStage extends AbstractBaseStage {
* : FROME_STATE-TO_STATE -> priority
* @return: selected messages
*/
- List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
- Map<String, String> currentStates, Map<String, String> pendingStates, List<Message> messages,
- Map<String, Bounds> stateConstraints, final Map<String, Integer> stateTransitionPriorities,
- String initialState) {
+ List<Message> selectMessages(Map<ParticipantId, Participant> liveParticipants,
+ Map<ParticipantId, State> currentStates, Map<ParticipantId, State> pendingStates,
+ List<Message> messages, Map<State, Bounds> stateConstraints,
+ final Map<String, Integer> stateTransitionPriorities, State initialState) {
if (messages == null || messages.isEmpty()) {
return Collections.emptyList();
}
List<Message> selectedMessages = new ArrayList<Message>();
- Map<String, Bounds> bounds = new HashMap<String, Bounds>();
+ Map<State, Bounds> bounds = new HashMap<State, Bounds>();
// count currentState, if no currentState, count as in initialState
- for (String instance : liveInstances.keySet()) {
- String state = initialState;
- if (currentStates.containsKey(instance)) {
- state = currentStates.get(instance);
+ for (ParticipantId liveParticipantId : liveParticipants.keySet()) {
+ State state = initialState;
+ if (currentStates.containsKey(liveParticipantId)) {
+ state = currentStates.get(liveParticipantId);
}
if (!bounds.containsKey(state)) {
@@ -158,8 +178,8 @@ public class MessageSelectionStage extends AbstractBaseStage {
}
// count pendingStates
- for (String instance : pendingStates.keySet()) {
- String state = pendingStates.get(instance);
+ for (ParticipantId participantId : pendingStates.keySet()) {
+ State state = pendingStates.get(participantId);
if (!bounds.containsKey(state)) {
bounds.put(state, new Bounds(0, 0));
}
@@ -173,7 +193,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
for (Message message : messages) {
State fromState = message.getTypedFromState();
State toState = message.getTypedToState();
- String transition = fromState + "-" + toState;
+ String transition = fromState.toString() + "-" + toState.toString();
int priority = Integer.MAX_VALUE;
if (stateTransitionPriorities.containsKey(transition)) {
@@ -198,7 +218,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
}
if (!bounds.containsKey(toState)) {
- bounds.put(toState.toString(), new Bounds(0, 0));
+ bounds.put(toState, new Bounds(0, 0));
}
// check lower bound of fromState
@@ -236,22 +256,35 @@ public class MessageSelectionStage extends AbstractBaseStage {
* TODO: This code is duplicate in multiple places. Can we do it in to one place in the
* beginning and compute the stateConstraint instance once and re use at other places.
* Each IdealState must have a constraint object associated with it
+ * @param stateModelDefinition
+ * @param rebalancerConfig if rebalancerConfig == null, we can't evaluate R thus no constraints
+ * @param cluster
+ * @return
*/
- private Map<String, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
- IdealState idealState, ClusterDataCache cache) {
- Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
-
- List<String> statePriorityList = stateModelDefinition.getStatesPriorityList();
- for (String state : statePriorityList) {
- String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state);
+ private Map<State, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
+ RebalancerConfig rebalancerConfig, Cluster cluster) {
+ ReplicatedRebalancerContext context =
+ (rebalancerConfig != null) ? rebalancerConfig
+ .getRebalancerContext(ReplicatedRebalancerContext.class) : null;
+ Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
+
+ List<State> statePriorityList = stateModelDefinition.getTypedStatesPriorityList();
+ for (State state : statePriorityList) {
+ String numInstancesPerState =
+ cluster.getStateUpperBoundConstraint(Scope.cluster(cluster.getId()),
+ stateModelDefinition.getStateModelDefId(), state);
int max = -1;
if ("N".equals(numInstancesPerState)) {
- max = cache.getLiveInstances().size();
+ max = cluster.getLiveParticipantMap().size();
} else if ("R".equals(numInstancesPerState)) {
// idealState is null when resource has been dropped,
// R can't be evaluated and ignore state constraints
- if (idealState != null) {
- max = cache.getReplicas(idealState.getResourceName());
+ if (context != null) {
+ if (context.anyLiveParticipant()) {
+ max = cluster.getLiveParticipantMap().size();
+ } else {
+ max = context.getReplicaCount();
+ }
}
} else {
try {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java
deleted file mode 100644
index 54ab384..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStageOutput.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-
-@Deprecated
-public class MessageSelectionStageOutput {
- private final Map<String, Map<Partition, List<Message>>> _messagesMap;
-
- public MessageSelectionStageOutput() {
- _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
- }
-
- public void addMessages(String resourceName, Partition partition, List<Message> selectedMessages) {
- if (!_messagesMap.containsKey(resourceName)) {
- _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>());
- }
- _messagesMap.get(resourceName).put(partition, selectedMessages);
-
- }
-
- public List<Message> getMessages(String resourceName, Partition partition) {
- Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
- if (map != null) {
- return map.get(partition);
- }
- return Collections.emptyList();
-
- }
-
- @Override
- public String toString() {
- return _messagesMap.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
index 62fbafe..a7b75a3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
@@ -26,6 +26,12 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.ClusterConstraints;
@@ -34,11 +40,8 @@ import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ClusterConstraints.ConstraintValue;
import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
import org.apache.log4j.Logger;
-@Deprecated
public class MessageThrottleStage extends AbstractBaseStage {
private static final Logger LOG = Logger.getLogger(MessageThrottleStage.class.getName());
@@ -113,39 +116,43 @@ public class MessageThrottleStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- MessageSelectionStageOutput msgSelectionOutput =
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ MessageOutput msgSelectionOutput =
event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
- if (cache == null || resourceMap == null || msgSelectionOutput == null) {
+ if (cluster == null || resourceMap == null || msgSelectionOutput == null) {
throw new StageException("Missing attributes in event: " + event
+ ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
}
- MessageThrottleStageOutput output = new MessageThrottleStageOutput();
+ MessageOutput output = new MessageOutput();
- ClusterConstraints constraint = cache.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
+ // TODO fix it
+ ClusterConstraints constraint = cluster.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
if (constraint != null) {
// go through all pending messages, they should be counted but not throttled
- for (String instance : cache.getLiveInstances().keySet()) {
- throttle(throttleCounterMap, constraint, new ArrayList<Message>(cache.getMessages(instance)
- .values()), false);
+ for (ParticipantId participantId : cluster.getLiveParticipantMap().keySet()) {
+ Participant liveParticipant = cluster.getLiveParticipantMap().get(participantId);
+ throttle(throttleCounterMap, constraint, new ArrayList<Message>(liveParticipant
+ .getMessageMap().values()), false);
}
}
// go through all new messages, throttle if necessary
// assume messages should be sorted by state transition priority in messageSelection stage
- for (String resourceName : resourceMap.keySet()) {
- Resource resource = resourceMap.get(resourceName);
- for (Partition partition : resource.getPartitions()) {
- List<Message> messages = msgSelectionOutput.getMessages(resourceName, partition);
+ for (ResourceId resourceId : resourceMap.keySet()) {
+ ResourceConfig resource = resourceMap.get(resourceId);
+ // TODO fix it
+ for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
+ List<Message> messages = msgSelectionOutput.getMessages(resourceId, partitionId);
if (constraint != null && messages != null && messages.size() > 0) {
messages = throttle(throttleCounterMap, constraint, messages, true);
}
- output.addMessages(resourceName, partition, messages);
+ output.setMessages(resourceId, partitionId, messages);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java
deleted file mode 100644
index 5983eff..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStageOutput.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-
-@Deprecated
-public class MessageThrottleStageOutput {
- private final Map<String, Map<Partition, List<Message>>> _messagesMap;
-
- public MessageThrottleStageOutput() {
- _messagesMap = new HashMap<String, Map<Partition, List<Message>>>();
- }
-
- public void addMessages(String resourceName, Partition partition, List<Message> selectedMessages) {
- if (!_messagesMap.containsKey(resourceName)) {
- _messagesMap.put(resourceName, new HashMap<Partition, List<Message>>());
- }
- _messagesMap.get(resourceName).put(partition, selectedMessages);
-
- }
-
- public List<Message> getMessages(String resourceName, Partition partition) {
- Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
- if (map != null) {
- return map.get(partition);
- }
- return Collections.emptyList();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
deleted file mode 100644
index 8b56bec..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ /dev/null
@@ -1,142 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.State;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.Rebalancer;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * For partition compute best possible (instance,state) pair based on
- * IdealState,StateModel,LiveInstance
- */
-public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
- private static final Logger LOG = Logger.getLogger(NewBestPossibleStateCalcStage.class.getName());
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- long startTime = System.currentTimeMillis();
- if (LOG.isInfoEnabled()) {
- LOG.info("START BestPossibleStateCalcStage.process()");
- }
-
- ResourceCurrentState currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
- Map<ResourceId, ResourceConfig> resourceMap =
- event.getAttribute(AttributeName.RESOURCES.toString());
- Cluster cluster = event.getAttribute("ClusterDataCache");
-
- if (currentStateOutput == null || resourceMap == null || cluster == null) {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires CURRENT_STATE|RESOURCES|DataCache");
- }
-
- NewBestPossibleStateOutput bestPossibleStateOutput =
- compute(cluster, event, resourceMap, currentStateOutput);
- event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
-
- long endTime = System.currentTimeMillis();
- if (LOG.isInfoEnabled()) {
- LOG.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
- }
- }
-
- /**
- * Fallback for cases when the resource has been dropped, but current state exists
- * @param cluster cluster snapshot
- * @param resourceId the resource for which to generate an assignment
- * @param currentStateOutput full snapshot of the current state
- * @param stateModelDef state model the resource follows
- * @return assignment for the dropped resource
- */
- private ResourceAssignment mapDroppedResource(Cluster cluster, ResourceId resourceId,
- ResourceCurrentState currentStateOutput, StateModelDefinition stateModelDef) {
- ResourceAssignment partitionMapping = new ResourceAssignment(resourceId);
- Set<? extends PartitionId> mappedPartitions =
- currentStateOutput.getCurrentStateMappedPartitions(resourceId);
- if (mappedPartitions == null) {
- return partitionMapping;
- }
- for (PartitionId partitionId : mappedPartitions) {
- Set<ParticipantId> disabledParticipantsForPartition =
- NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
- partitionId);
- Map<State, String> upperBounds =
- NewConstraintBasedAssignment.stateConstraints(stateModelDef, resourceId,
- cluster.getConfig());
- partitionMapping.addReplicaMap(partitionId, NewConstraintBasedAssignment
- .computeAutoBestStateForPartition(upperBounds, cluster.getLiveParticipantMap().keySet(),
- stateModelDef, null, currentStateOutput.getCurrentStateMap(resourceId, partitionId),
- disabledParticipantsForPartition));
- }
- return partitionMapping;
- }
-
- private NewBestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
- Map<ResourceId, ResourceConfig> resourceMap, ResourceCurrentState currentStateOutput) {
- NewBestPossibleStateOutput output = new NewBestPossibleStateOutput();
- Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
-
- for (ResourceId resourceId : resourceMap.keySet()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing resource:" + resourceId);
- }
- ResourceConfig resourceConfig = resourceMap.get(resourceId);
- RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
- ResourceAssignment resourceAssignment = null;
- if (rebalancerConfig != null) {
- Rebalancer rebalancer = rebalancerConfig.getRebalancer();
- if (rebalancer != null) {
- HelixManager manager = event.getAttribute("helixmanager");
- rebalancer.init(manager);
- resourceAssignment =
- rebalancer.computeResourceMapping(rebalancerConfig, cluster, currentStateOutput);
- }
- }
- if (resourceAssignment == null) {
- RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
- StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
- resourceAssignment =
- mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
- }
-
- output.setResourceAssignment(resourceId, resourceAssignment);
- }
-
- return output;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
deleted file mode 100644
index 7720143..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package org.apache.helix.controller.stages;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.model.ResourceAssignment;
-
-import com.google.common.collect.Maps;
-
-public class NewBestPossibleStateOutput {
-
- Map<ResourceId, ResourceAssignment> _resourceAssignmentMap;
-
- public NewBestPossibleStateOutput() {
- _resourceAssignmentMap = Maps.newHashMap();
- }
-
- /**
- * Set the computed resource assignment for a resource
- * @param resourceId the resource to set
- * @param resourceAssignment the computed assignment
- */
- public void setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
- _resourceAssignmentMap.put(resourceId, resourceAssignment);
- }
-
- /**
- * Get the resource assignment computed for a resource
- * @param resourceId resource to look up
- * @return ResourceAssignment computed by the best possible state calculation
- */
- public ResourceAssignment getResourceAssignment(ResourceId resourceId) {
- return _resourceAssignmentMap.get(resourceId);
- }
-
- /**
- * Get all of the resources currently assigned
- * @return set of assigned resource ids
- */
- public Set<ResourceId> getAssignedResources() {
- return _resourceAssignmentMap.keySet();
- }
-
- @Override
- public String toString() {
- return _resourceAssignmentMap.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/NewCompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCompatibilityCheckStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCompatibilityCheckStage.java
deleted file mode 100644
index ea1a507..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCompatibilityCheckStage.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Map;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerProperties;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.HelixVersion;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.log4j.Logger;
-
-/**
- * controller checks if participant version is compatible
- */
-public class NewCompatibilityCheckStage extends AbstractBaseStage {
- private static final Logger LOG = Logger.getLogger(NewCompatibilityCheckStage.class.getName());
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- HelixManager manager = event.getAttribute("helixmanager");
- Cluster cluster = event.getAttribute("ClusterDataCache");
- if (manager == null || cluster == null) {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires HelixManager | DataCache");
- }
-
- HelixManagerProperties properties = manager.getProperties();
- // Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
- Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
- for (Participant liveParticipant : liveParticipants.values()) {
- HelixVersion version = liveParticipant.getRunningInstance().getVersion();
- String participantVersion = (version != null) ? version.toString() : null;
- if (!properties.isParticipantCompatible(participantVersion)) {
- String errorMsg =
- "incompatible participant. pipeline will not continue. " + "controller: "
- + manager.getInstanceName() + ", controllerVersion: " + properties.getVersion()
- + ", minimumSupportedParticipantVersion: "
- + properties.getProperty("minimum_supported_version.participant")
- + ", participant: " + liveParticipant.getId() + ", participantVersion: "
- + participantVersion;
- LOG.error(errorMsg);
- throw new StageException(errorMsg);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
deleted file mode 100644
index f7f2a5f..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
+++ /dev/null
@@ -1,142 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.State;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.MessageId;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.SessionId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageType;
-
-/**
- * For each LiveInstances select currentState and message whose sessionId matches
- * sessionId from LiveInstance Get Partition,State for all the resources computed in
- * previous State [ResourceComputationStage]
- */
-public class NewCurrentStateComputationStage extends AbstractBaseStage {
- @Override
- public void process(ClusterEvent event) throws Exception {
- Cluster cluster = event.getAttribute("ClusterDataCache");
- Map<ResourceId, ResourceConfig> resourceMap =
- event.getAttribute(AttributeName.RESOURCES.toString());
-
- if (cluster == null || resourceMap == null) {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires DataCache|RESOURCE");
- }
-
- ResourceCurrentState currentStateOutput = new ResourceCurrentState();
-
- for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
- ParticipantId participantId = liveParticipant.getId();
-
- // add pending messages
- Map<MessageId, Message> instanceMsgs = liveParticipant.getMessageMap();
- for (Message message : instanceMsgs.values()) {
- if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())) {
- continue;
- }
-
- if (!liveParticipant.getRunningInstance().getSessionId().equals(message.getTypedTgtSessionId())) {
- continue;
- }
-
- ResourceId resourceId = message.getResourceId();
- ResourceConfig resource = resourceMap.get(resourceId);
- if (resource == null) {
- continue;
- }
-
- if (!message.getBatchMessageMode()) {
- PartitionId partitionId = message.getPartitionId();
- Partition partition = resource.getSubUnit(partitionId);
- if (partition != null) {
- currentStateOutput.setPendingState(resourceId, partitionId, participantId,
- message.getTypedToState());
- } else {
- // log
- }
- } else {
- List<PartitionId> partitionNames = message.getPartitionIds();
- if (!partitionNames.isEmpty()) {
- for (PartitionId partitionId : partitionNames) {
- Partition partition = resource.getSubUnit(partitionId);
- if (partition != null) {
- currentStateOutput.setPendingState(resourceId, partitionId, participantId,
- message.getTypedToState());
- } else {
- // log
- }
- }
- }
- }
- }
-
- // add current state
- SessionId sessionId = liveParticipant.getRunningInstance().getSessionId();
- Map<ResourceId, CurrentState> curStateMap = liveParticipant.getCurrentStateMap();
- for (CurrentState curState : curStateMap.values()) {
- if (!sessionId.equals(curState.getTypedSessionId())) {
- continue;
- }
-
- ResourceId resourceId = curState.getResourceId();
- StateModelDefId stateModelDefId = curState.getStateModelDefId();
- ResourceConfig resource = resourceMap.get(resourceId);
- if (resource == null) {
- continue;
- }
-
- if (stateModelDefId != null) {
- currentStateOutput.setResourceStateModelDef(resourceId, stateModelDefId);
- }
-
- currentStateOutput.setBucketSize(resourceId, curState.getBucketSize());
-
- Map<PartitionId, State> partitionStateMap = curState.getTypedPartitionStateMap();
- for (PartitionId partitionId : partitionStateMap.keySet()) {
- Partition partition = resource.getSubUnit(partitionId);
- if (partition != null) {
- currentStateOutput.setCurrentState(resourceId, partitionId, participantId,
- curState.getState(partitionId));
- } else {
- // log
- }
- }
- }
- }
-
- event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
deleted file mode 100644
index d67931d..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
+++ /dev/null
@@ -1,281 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.ZNRecordDelta;
-import org.apache.helix.ZNRecordDelta.MergeOperation;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.State;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.config.SchedulerTaskConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.StatusUpdate;
-import org.apache.log4j.Logger;
-
-public class NewExternalViewComputeStage extends AbstractBaseStage {
- private static Logger LOG = Logger.getLogger(NewExternalViewComputeStage.class);
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- long startTime = System.currentTimeMillis();
- LOG.info("START ExternalViewComputeStage.process()");
-
- HelixManager manager = event.getAttribute("helixmanager");
- Map<ResourceId, ResourceConfig> resourceMap =
- event.getAttribute(AttributeName.RESOURCES.toString());
- Cluster cluster = event.getAttribute("ClusterDataCache");
-
- if (manager == null || resourceMap == null || cluster == null) {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires ClusterManager|RESOURCES|DataCache");
- }
-
- HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
- PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
-
- ResourceCurrentState currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
-
- List<ExternalView> newExtViews = new ArrayList<ExternalView>();
- List<PropertyKey> keys = new ArrayList<PropertyKey>();
-
- // TODO use external-view accessor
- Map<String, ExternalView> curExtViews =
- dataAccessor.getChildValuesMap(keyBuilder.externalViews());
-
- for (ResourceId resourceId : resourceMap.keySet()) {
- ExternalView view = new ExternalView(resourceId.stringify());
- // view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
- // if resource ideal state has bucket size, set it
- // otherwise resource has been dropped, use bucket size from current state instead
- ResourceConfig resource = resourceMap.get(resourceId);
- RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
- SchedulerTaskConfig schedulerTaskConfig = resource.getSchedulerTaskConfig();
-
- if (resource.getBucketSize() > 0) {
- view.setBucketSize(resource.getBucketSize());
- } else {
- view.setBucketSize(currentStateOutput.getBucketSize(resourceId));
- }
- for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
- Map<ParticipantId, State> currentStateMap =
- currentStateOutput.getCurrentStateMap(resourceId, partitionId);
- if (currentStateMap != null && currentStateMap.size() > 0) {
- // Set<String> disabledInstances
- // = cache.getDisabledInstancesForResource(resource.toString());
- for (ParticipantId participantId : currentStateMap.keySet()) {
- // if (!disabledInstances.contains(instance))
- // {
- view.setState(partitionId.stringify(), participantId.stringify(),
- currentStateMap.get(participantId).toString());
- // }
- }
- }
- }
-
- // TODO fix this
- // Update cluster status monitor mbean
- // ClusterStatusMonitor clusterStatusMonitor =
- // (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
- // IdealState idealState = cache._idealStateMap.get(view.getResourceName());
- // if (idealState != null) {
- // if (clusterStatusMonitor != null
- // && !idealState.getStateModelDefRef().equalsIgnoreCase(
- // DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
- // clusterStatusMonitor.onExternalViewChange(view,
- // cache._idealStateMap.get(view.getResourceName()));
- // }
- // }
-
- // compare the new external view with current one, set only on different
- ExternalView curExtView = curExtViews.get(resourceId.stringify());
- if (curExtView == null || !curExtView.getRecord().equals(view.getRecord())) {
- keys.add(keyBuilder.externalView(resourceId.stringify()));
- newExtViews.add(view);
-
- // For SCHEDULER_TASK_RESOURCE resource group (helix task queue), we need to find out which
- // task
- // partitions are finished (COMPLETED or ERROR), update the status update of the original
- // scheduler
- // message, and then remove the partitions from the ideal state
- RebalancerContext rebalancerContext =
- (rebalancerConfig != null) ? rebalancerConfig
- .getRebalancerContext(RebalancerContext.class) : null;
- if (rebalancerContext != null
- && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
- StateModelDefId.SchedulerTaskQueue)) {
- updateScheduledTaskStatus(resourceId, view, manager, schedulerTaskConfig);
- }
- }
- }
- // TODO: consider not setting the externalview of SCHEDULER_TASK_QUEUE at all.
- // Are there any entity that will be interested in its change?
-
- // add/update external-views
- if (newExtViews.size() > 0) {
- dataAccessor.setChildren(keys, newExtViews);
- }
-
- // remove dead external-views
- for (String resourceName : curExtViews.keySet()) {
- if (!resourceMap.containsKey(ResourceId.from(resourceName))) {
- dataAccessor.removeProperty(keyBuilder.externalView(resourceName));
- }
- }
-
- long endTime = System.currentTimeMillis();
- LOG.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
- }
-
- // TODO fix it
- private void updateScheduledTaskStatus(ResourceId resourceId, ExternalView ev,
- HelixManager manager, SchedulerTaskConfig schedulerTaskConfig) {
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- ZNRecord finishedTasks = new ZNRecord(ev.getResourceName());
-
- // Place holder for finished partitions
- Map<String, String> emptyMap = new HashMap<String, String>();
- List<String> emptyList = new LinkedList<String>();
-
- Map<String, Integer> controllerMsgIdCountMap = new HashMap<String, Integer>();
- Map<String, Map<String, String>> controllerMsgUpdates =
- new HashMap<String, Map<String, String>>();
-
- for (String taskPartitionName : ev.getPartitionSet()) {
- for (String taskState : ev.getStateMap(taskPartitionName).values()) {
- if (taskState.equalsIgnoreCase(HelixDefinedState.ERROR.toString())
- || taskState.equalsIgnoreCase("COMPLETED")) {
- LOG.info(taskPartitionName + " finished as " + taskState);
- finishedTasks.setListField(taskPartitionName, emptyList);
- finishedTasks.setMapField(taskPartitionName, emptyMap);
-
- // Update original scheduler message status update
- Message innerMessage =
- schedulerTaskConfig.getInnerMessage(PartitionId.from(taskPartitionName));
- if (innerMessage != null) {
- String controllerMsgId = innerMessage.getControllerMessagId();
- if (controllerMsgId != null) {
- LOG.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
- if (!controllerMsgUpdates.containsKey(controllerMsgId)) {
- controllerMsgUpdates.put(controllerMsgId, new HashMap<String, String>());
- }
- controllerMsgUpdates.get(controllerMsgId).put(taskPartitionName, taskState);
- }
- }
- }
- }
- }
- // fill the controllerMsgIdCountMap
- for (PartitionId taskId : schedulerTaskConfig.getPartitionSet()) {
- Message innerMessage = schedulerTaskConfig.getInnerMessage(taskId);
- String controllerMsgId = innerMessage.getControllerMessagId();
-
- if (controllerMsgId != null) {
- Integer curCnt = controllerMsgIdCountMap.get(controllerMsgId);
- if (curCnt == null) {
- curCnt = 0;
- }
- controllerMsgIdCountMap.put(controllerMsgId, curCnt + 1);
- }
- }
-
- if (controllerMsgUpdates.size() > 0) {
- for (String controllerMsgId : controllerMsgUpdates.keySet()) {
- PropertyKey controllerStatusUpdateKey =
- keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), controllerMsgId);
- StatusUpdate controllerStatusUpdate = accessor.getProperty(controllerStatusUpdateKey);
- for (String taskPartitionName : controllerMsgUpdates.get(controllerMsgId).keySet()) {
- Message innerMessage =
- schedulerTaskConfig.getInnerMessage(PartitionId.from(taskPartitionName));
-
- Map<String, String> result = new HashMap<String, String>();
- result.put("Result", controllerMsgUpdates.get(controllerMsgId).get(taskPartitionName));
- controllerStatusUpdate.getRecord().setMapField(
- "MessageResult " + innerMessage.getTgtName() + " " + taskPartitionName + " "
- + innerMessage.getMessageId(), result);
- }
-
- // All done for the scheduled tasks that came from controllerMsgId, add summary for it
- if (controllerMsgUpdates.get(controllerMsgId).size() == controllerMsgIdCountMap.get(
- controllerMsgId).intValue()) {
- int finishedTasksNum = 0;
- int completedTasksNum = 0;
- for (String key : controllerStatusUpdate.getRecord().getMapFields().keySet()) {
- if (key.startsWith("MessageResult ")) {
- finishedTasksNum++;
- }
- if (controllerStatusUpdate.getRecord().getMapField(key).get("Result") != null) {
- if (controllerStatusUpdate.getRecord().getMapField(key).get("Result")
- .equalsIgnoreCase("COMPLETED")) {
- completedTasksNum++;
- }
- }
- }
- Map<String, String> summary = new TreeMap<String, String>();
- summary.put("TotalMessages:", "" + finishedTasksNum);
- summary.put("CompletedMessages", "" + completedTasksNum);
-
- controllerStatusUpdate.getRecord().setMapField("Summary", summary);
- }
- // Update the statusUpdate of controllerMsgId
- accessor.updateProperty(controllerStatusUpdateKey, controllerStatusUpdate);
- }
- }
-
- if (finishedTasks.getListFields().size() > 0) {
- ZNRecordDelta znDelta = new ZNRecordDelta(finishedTasks, MergeOperation.SUBTRACT);
- List<ZNRecordDelta> deltaList = new LinkedList<ZNRecordDelta>();
- deltaList.add(znDelta);
- IdealState delta = new IdealState(resourceId);
- delta.setDeltaList(deltaList);
-
- // Remove the finished (COMPLETED or ERROR) tasks from the SCHEDULER_TASK_RESOURCE idealstate
- keyBuilder = accessor.keyBuilder();
- accessor.updateProperty(keyBuilder.idealState(resourceId.stringify()), delta);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
deleted file mode 100644
index 3d51bd0..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
+++ /dev/null
@@ -1,213 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.State;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.config.SchedulerTaskConfig;
-import org.apache.helix.api.id.MessageId;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.SessionId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * Compares the currentState, pendingState with IdealState and generate messages
- */
-public class NewMessageGenerationStage extends AbstractBaseStage {
- private static Logger LOG = Logger.getLogger(NewMessageGenerationStage.class);
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- HelixManager manager = event.getAttribute("helixmanager");
- Cluster cluster = event.getAttribute("ClusterDataCache");
- Map<StateModelDefId, StateModelDefinition> stateModelDefMap = cluster.getStateModelMap();
- Map<ResourceId, ResourceConfig> resourceMap =
- event.getAttribute(AttributeName.RESOURCES.toString());
- ResourceCurrentState currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
- NewBestPossibleStateOutput bestPossibleStateOutput =
- event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
- if (manager == null || cluster == null || resourceMap == null || currentStateOutput == null
- || bestPossibleStateOutput == null) {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
- }
-
- NewMessageOutput output = new NewMessageOutput();
-
- for (ResourceId resourceId : resourceMap.keySet()) {
- ResourceConfig resourceConfig = resourceMap.get(resourceId);
- int bucketSize = resourceConfig.getBucketSize();
-
- RebalancerContext rebalancerCtx =
- resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
- StateModelDefinition stateModelDef = stateModelDefMap.get(rebalancerCtx.getStateModelDefId());
-
- ResourceAssignment resourceAssignment =
- bestPossibleStateOutput.getResourceAssignment(resourceId);
- for (PartitionId subUnitId : resourceConfig.getSubUnitMap().keySet()) {
- Map<ParticipantId, State> instanceStateMap = resourceAssignment.getReplicaMap(subUnitId);
-
- // we should generate message based on the desired-state priority
- // so keep generated messages in a temp map keyed by state
- // desired-state->list of generated-messages
- Map<State, List<Message>> messageMap = new HashMap<State, List<Message>>();
-
- for (ParticipantId participantId : instanceStateMap.keySet()) {
- State desiredState = instanceStateMap.get(participantId);
-
- State currentState =
- currentStateOutput.getCurrentState(resourceId, subUnitId, participantId);
- if (currentState == null) {
- currentState = stateModelDef.getTypedInitialState();
- }
-
- if (desiredState.equals(currentState)) {
- continue;
- }
-
- State pendingState =
- currentStateOutput.getPendingState(resourceId, subUnitId, participantId);
-
- // TODO fix it
- State nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
- if (nextState == null) {
- LOG.error("Unable to find a next state for partition: " + subUnitId
- + " from stateModelDefinition" + stateModelDef.getClass() + " from:" + currentState
- + " to:" + desiredState);
- continue;
- }
-
- if (pendingState != null) {
- if (nextState.equals(pendingState)) {
- LOG.debug("Message already exists for " + participantId + " to transit " + subUnitId
- + " from " + currentState + " to " + nextState);
- } else if (currentState.equals(pendingState)) {
- LOG.info("Message hasn't been removed for " + participantId + " to transit"
- + subUnitId + " to " + pendingState + ", desiredState: " + desiredState);
- } else {
- LOG.info("IdealState changed before state transition completes for " + subUnitId
- + " on " + participantId + ", pendingState: " + pendingState + ", currentState: "
- + currentState + ", nextState: " + nextState);
- }
- } else {
- // TODO check if instance is alive
- SessionId sessionId =
- cluster.getLiveParticipantMap().get(participantId).getRunningInstance()
- .getSessionId();
- RebalancerContext rebalancerContext =
- resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
- Message message =
- createMessage(manager, resourceId, subUnitId, participantId, currentState,
- nextState, sessionId, StateModelDefId.from(stateModelDef.getId()),
- rebalancerContext.getStateModelFactoryId(), bucketSize);
-
- // TODO refactor get/set timeout/inner-message
- if (rebalancerContext != null
- && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
- StateModelDefId.SchedulerTaskQueue)) {
- if (resourceConfig.getSubUnitMap().size() > 0) {
- // TODO refactor it -- we need a way to read in scheduler tasks a priori
- Message innerMsg =
- resourceConfig.getSchedulerTaskConfig().getInnerMessage(subUnitId);
- if (innerMsg != null) {
- message.setInnerMessage(innerMsg);
- }
- }
- }
-
- // Set timeout if needed
- String stateTransition =
- String.format("%s-%s_%s", currentState, nextState,
- Message.Attributes.TIMEOUT.name());
- SchedulerTaskConfig schedulerTaskConfig = resourceConfig.getSchedulerTaskConfig();
- if (schedulerTaskConfig != null) {
- int timeout = schedulerTaskConfig.getTimeout(stateTransition, subUnitId);
- if (timeout > 0) {
- message.setExecutionTimeout(timeout);
- }
- }
- message.setClusterEvent(event);
-
- if (!messageMap.containsKey(desiredState)) {
- messageMap.put(desiredState, new ArrayList<Message>());
- }
- messageMap.get(desiredState).add(message);
- }
- }
-
- // add generated messages to output according to state priority
- List<State> statesPriorityList = stateModelDef.getTypedStatesPriorityList();
- for (State state : statesPriorityList) {
- if (messageMap.containsKey(state)) {
- for (Message message : messageMap.get(state)) {
- output.addMessage(resourceId, subUnitId, message);
- }
- }
- }
-
- } // end of for-each-partition
- }
- event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
- // System.out.println("output: " + output);
- }
-
- private Message createMessage(HelixManager manager, ResourceId resourceId,
- PartitionId partitionId, ParticipantId participantId, State currentState, State nextState,
- SessionId participantSessionId, StateModelDefId stateModelDefId,
- StateModelFactoryId stateModelFactoryId, int bucketSize) {
- MessageId uuid = MessageId.from(UUID.randomUUID().toString());
- Message message = new Message(MessageType.STATE_TRANSITION, uuid);
- message.setSrcName(manager.getInstanceName());
- message.setTgtName(participantId.stringify());
- message.setMsgState(MessageState.NEW);
- message.setPartitionId(partitionId);
- message.setResourceId(resourceId);
- message.setFromState(currentState);
- message.setToState(nextState);
- message.setTgtSessionId(participantSessionId);
- message.setSrcSessionId(SessionId.from(manager.getSessionId()));
- message.setStateModelDef(stateModelDefId);
- message.setStateModelFactoryId(stateModelFactoryId);
- message.setBucketSize(bucketSize);
-
- return message;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageOutput.java
deleted file mode 100644
index 89231c2..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageOutput.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.model.Message;
-
-public class NewMessageOutput {
-
- private final Map<ResourceId, Map<PartitionId, List<Message>>> _messagesMap;
-
- public NewMessageOutput() {
- _messagesMap = new HashMap<ResourceId, Map<PartitionId, List<Message>>>();
-
- }
-
- public void addMessage(ResourceId resourceId, PartitionId partitionId, Message message) {
- if (!_messagesMap.containsKey(resourceId)) {
- _messagesMap.put(resourceId, new HashMap<PartitionId, List<Message>>());
- }
- if (!_messagesMap.get(resourceId).containsKey(partitionId)) {
- _messagesMap.get(resourceId).put(partitionId, new ArrayList<Message>());
-
- }
- _messagesMap.get(resourceId).get(partitionId).add(message);
-
- }
-
- public void setMessages(ResourceId resourceId, PartitionId partitionId,
- List<Message> selectedMessages) {
- if (!_messagesMap.containsKey(resourceId)) {
- _messagesMap.put(resourceId, new HashMap<PartitionId, List<Message>>());
- }
- _messagesMap.get(resourceId).put(partitionId, selectedMessages);
-
- }
-
- public List<Message> getMessages(ResourceId resourceId, PartitionId partitionId) {
- Map<PartitionId, List<Message>> map = _messagesMap.get(resourceId);
- if (map != null) {
- return map.get(partitionId);
- }
- return Collections.emptyList();
-
- }
-
- public Map<PartitionId, List<Message>> getMessages(ResourceId resourceId) {
- return _messagesMap.get(resourceId);
- }
-
- @Override
- public String toString() {
- return _messagesMap.toString();
- }
-}