You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/16 01:51:42 UTC
[5/9] [HELIX-209] Shuffling around rebalancer code to allow for
compatibility
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
deleted file mode 100644
index 4a46a4c..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
+++ /dev/null
@@ -1,317 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.State;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.ReplicatedRebalancerContext;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-public class NewMessageSelectionStage extends AbstractBaseStage {
- private static final Logger LOG = Logger.getLogger(NewMessageSelectionStage.class);
-
- public static class Bounds {
- private int upper;
- private int lower;
-
- public Bounds(int lower, int upper) {
- this.lower = lower;
- this.upper = upper;
- }
-
- public void increaseUpperBound() {
- upper++;
- }
-
- public void increaseLowerBound() {
- lower++;
- }
-
- public void decreaseUpperBound() {
- upper--;
- }
-
- public void decreaseLowerBound() {
- lower--;
- }
-
- public int getLowerBound() {
- return lower;
- }
-
- public int getUpperBound() {
- return upper;
- }
-
- @Override
- public String toString() {
- return String.format("%d-%d", lower, upper);
- }
- }
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- Cluster cluster = event.getAttribute("ClusterDataCache");
- Map<StateModelDefId, StateModelDefinition> stateModelDefMap = cluster.getStateModelMap();
- Map<ResourceId, ResourceConfig> resourceMap =
- event.getAttribute(AttributeName.RESOURCES.toString());
- ResourceCurrentState currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
- NewMessageOutput messageGenOutput = event.getAttribute(AttributeName.MESSAGES_ALL.toString());
- if (cluster == null || resourceMap == null || currentStateOutput == null
- || messageGenOutput == null) {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
- }
-
- NewMessageOutput output = new NewMessageOutput();
-
- for (ResourceId resourceId : resourceMap.keySet()) {
- ResourceConfig resource = resourceMap.get(resourceId);
- StateModelDefinition stateModelDef =
- stateModelDefMap.get(resource.getRebalancerConfig()
- .getRebalancerContext(RebalancerContext.class).getStateModelDefId());
-
- // TODO have a logical model for transition
- Map<String, Integer> stateTransitionPriorities = getStateTransitionPriorityMap(stateModelDef);
- Resource configResource = cluster.getResource(resourceId);
-
- // if configResource == null, the resource has been dropped
- Map<State, Bounds> stateConstraints =
- computeStateConstraints(stateModelDef,
- configResource == null ? null : configResource.getRebalancerConfig(), cluster);
-
- // TODO fix it
- for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
- List<Message> messages = messageGenOutput.getMessages(resourceId, partitionId);
- List<Message> selectedMessages =
- selectMessages(cluster.getLiveParticipantMap(),
- currentStateOutput.getCurrentStateMap(resourceId, partitionId),
- currentStateOutput.getPendingStateMap(resourceId, partitionId), messages,
- stateConstraints, stateTransitionPriorities, stateModelDef.getTypedInitialState());
- output.setMessages(resourceId, partitionId, selectedMessages);
- }
- }
- event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
- }
-
- // TODO: This method deserves its own class. The class should not understand helix but
- // just be
- // able to solve the problem using the algo. I think the method is following that but if
- // we don't move it to another class its quite easy to break that contract
- /**
- * greedy message selection algorithm: 1) calculate CS+PS state lower/upper-bounds 2)
- * group messages by state transition and sorted by priority 3) from highest priority to
- * lowest, for each message group with the same transition add message one by one and
- * make sure state constraint is not violated update state lower/upper-bounds when a new
- * message is selected
- * @param currentStates
- * @param pendingStates
- * @param messages
- * @param stateConstraints
- * : STATE -> bound (lower:upper)
- * @param stateTransitionPriorities
- * : FROME_STATE-TO_STATE -> priority
- * @return: selected messages
- */
- List<Message> selectMessages(Map<ParticipantId, Participant> liveParticipants,
- Map<ParticipantId, State> currentStates, Map<ParticipantId, State> pendingStates,
- List<Message> messages, Map<State, Bounds> stateConstraints,
- final Map<String, Integer> stateTransitionPriorities, State initialState) {
- if (messages == null || messages.isEmpty()) {
- return Collections.emptyList();
- }
-
- List<Message> selectedMessages = new ArrayList<Message>();
- Map<State, Bounds> bounds = new HashMap<State, Bounds>();
-
- // count currentState, if no currentState, count as in initialState
- for (ParticipantId liveParticipantId : liveParticipants.keySet()) {
- State state = initialState;
- if (currentStates.containsKey(liveParticipantId)) {
- state = currentStates.get(liveParticipantId);
- }
-
- if (!bounds.containsKey(state)) {
- bounds.put(state, new Bounds(0, 0));
- }
- bounds.get(state).increaseLowerBound();
- bounds.get(state).increaseUpperBound();
- }
-
- // count pendingStates
- for (ParticipantId participantId : pendingStates.keySet()) {
- State state = pendingStates.get(participantId);
- if (!bounds.containsKey(state)) {
- bounds.put(state, new Bounds(0, 0));
- }
- // TODO: add lower bound, need to refactor pendingState to include fromState also
- bounds.get(state).increaseUpperBound();
- }
-
- // group messages based on state transition priority
- Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
- new TreeMap<Integer, List<Message>>();
- for (Message message : messages) {
- State fromState = message.getTypedFromState();
- State toState = message.getTypedToState();
- String transition = fromState.toString() + "-" + toState.toString();
- int priority = Integer.MAX_VALUE;
-
- if (stateTransitionPriorities.containsKey(transition)) {
- priority = stateTransitionPriorities.get(transition);
- }
-
- if (!messagesGroupByStateTransitPriority.containsKey(priority)) {
- messagesGroupByStateTransitPriority.put(priority, new ArrayList<Message>());
- }
- messagesGroupByStateTransitPriority.get(priority).add(message);
- }
-
- // select messages
- for (List<Message> messageList : messagesGroupByStateTransitPriority.values()) {
- for (Message message : messageList) {
- State fromState = message.getTypedFromState();
- State toState = message.getTypedToState();
-
- if (!bounds.containsKey(fromState)) {
- LOG.error("Message's fromState is not in currentState. message: " + message);
- continue;
- }
-
- if (!bounds.containsKey(toState)) {
- bounds.put(toState, new Bounds(0, 0));
- }
-
- // check lower bound of fromState
- if (stateConstraints.containsKey(fromState)) {
- int newLowerBound = bounds.get(fromState).getLowerBound() - 1;
- if (newLowerBound < 0) {
- LOG.error("Number of currentState in " + fromState
- + " is less than number of messages transiting from " + fromState);
- continue;
- }
-
- if (newLowerBound < stateConstraints.get(fromState).getLowerBound()) {
- continue;
- }
- }
-
- // check upper bound of toState
- if (stateConstraints.containsKey(toState)) {
- int newUpperBound = bounds.get(toState).getUpperBound() + 1;
- if (newUpperBound > stateConstraints.get(toState).getUpperBound()) {
- continue;
- }
- }
-
- selectedMessages.add(message);
- bounds.get(fromState).increaseLowerBound();
- bounds.get(toState).increaseUpperBound();
- }
- }
-
- return selectedMessages;
- }
-
- /**
- * TODO: This code is duplicate in multiple places. Can we do it in to one place in the
- * beginning and compute the stateConstraint instance once and re use at other places.
- * Each IdealState must have a constraint object associated with it
- * @param stateModelDefinition
- * @param rebalancerConfig if rebalancerConfig == null, we can't evaluate R thus no constraints
- * @param cluster
- * @return
- */
- private Map<State, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
- RebalancerConfig rebalancerConfig, Cluster cluster) {
- ReplicatedRebalancerContext context =
- (rebalancerConfig != null) ? rebalancerConfig
- .getRebalancerContext(ReplicatedRebalancerContext.class) : null;
- Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
-
- List<State> statePriorityList = stateModelDefinition.getTypedStatesPriorityList();
- for (State state : statePriorityList) {
- String numInstancesPerState =
- cluster.getStateUpperBoundConstraint(Scope.cluster(cluster.getId()),
- stateModelDefinition.getStateModelDefId(), state);
- int max = -1;
- if ("N".equals(numInstancesPerState)) {
- max = cluster.getLiveParticipantMap().size();
- } else if ("R".equals(numInstancesPerState)) {
- // idealState is null when resource has been dropped,
- // R can't be evaluated and ignore state constraints
- if (context != null) {
- if (context.anyLiveParticipant()) {
- max = cluster.getLiveParticipantMap().size();
- } else {
- max = context.getReplicaCount();
- }
- }
- } else {
- try {
- max = Integer.parseInt(numInstancesPerState);
- } catch (Exception e) {
- // use -1
- }
- }
-
- if (max > -1) {
- // if state has no constraint, will not put in map
- stateConstraints.put(state, new Bounds(0, max));
- }
- }
-
- return stateConstraints;
- }
-
- // TODO: if state transition priority is not provided then use lexicographical sorting
- // so that behavior is consistent
- private Map<String, Integer> getStateTransitionPriorityMap(StateModelDefinition stateModelDef) {
- Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
- List<String> stateTransitionPriorityList = stateModelDef.getStateTransitionPriorityStringList();
- for (int i = 0; i < stateTransitionPriorityList.size(); i++) {
- stateTransitionPriorities.put(stateTransitionPriorityList.get(i), i);
- }
-
- return stateTransitionPriorities;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
deleted file mode 100644
index dfea7fc..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
+++ /dev/null
@@ -1,198 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.ClusterConstraints;
-import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
-import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.ClusterConstraints.ConstraintValue;
-import org.apache.helix.model.ConstraintItem;
-import org.apache.helix.model.Message;
-import org.apache.log4j.Logger;
-
-public class NewMessageThrottleStage extends AbstractBaseStage {
- private static final Logger LOG = Logger.getLogger(NewMessageThrottleStage.class.getName());
-
- int valueOf(String valueStr) {
- int value = Integer.MAX_VALUE;
-
- try {
- ConstraintValue valueToken = ConstraintValue.valueOf(valueStr);
- switch (valueToken) {
- case ANY:
- value = Integer.MAX_VALUE;
- break;
- default:
- LOG.error("Invalid constraintValue token:" + valueStr + ". Use default value:"
- + Integer.MAX_VALUE);
- break;
- }
- } catch (Exception e) {
- try {
- value = Integer.parseInt(valueStr);
- } catch (NumberFormatException ne) {
- LOG.error("Invalid constraintValue string:" + valueStr + ". Use default value:"
- + Integer.MAX_VALUE);
- }
- }
- return value;
- }
-
- /**
- * constraints are selected in the order of the following rules: 1) don't select
- * constraints with CONSTRAINT_VALUE=ANY; 2) if one constraint is more specific than the
- * other, select the most specific one 3) if a message matches multiple constraints of
- * incomparable specificity, select the one with the minimum value 4) if a message
- * matches multiple constraints of incomparable specificity, and they all have the same
- * value, select the first in alphabetic order
- */
- Set<ConstraintItem> selectConstraints(Set<ConstraintItem> items,
- Map<ConstraintAttribute, String> attributes) {
- Map<String, ConstraintItem> selectedItems = new HashMap<String, ConstraintItem>();
- for (ConstraintItem item : items) {
- // don't select constraints with CONSTRAINT_VALUE=ANY
- if (item.getConstraintValue().equals(ConstraintValue.ANY.toString())) {
- continue;
- }
-
- String key = item.filter(attributes).toString();
- if (!selectedItems.containsKey(key)) {
- selectedItems.put(key, item);
- } else {
- ConstraintItem existingItem = selectedItems.get(key);
- if (existingItem.match(item.getAttributes())) {
- // item is more specific than existingItem
- selectedItems.put(key, item);
- } else if (!item.match(existingItem.getAttributes())) {
- // existingItem and item are of incomparable specificity
- int value = valueOf(item.getConstraintValue());
- int existingValue = valueOf(existingItem.getConstraintValue());
- if (value < existingValue) {
- // item's constraint value is less than that of existingItem
- selectedItems.put(key, item);
- } else if (value == existingValue) {
- if (item.toString().compareTo(existingItem.toString()) < 0) {
- // item is ahead of existingItem in alphabetic order
- selectedItems.put(key, item);
- }
- }
- }
- }
- }
- return new HashSet<ConstraintItem>(selectedItems.values());
- }
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- Cluster cluster = event.getAttribute("ClusterDataCache");
- NewMessageOutput msgSelectionOutput =
- event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- Map<ResourceId, ResourceConfig> resourceMap =
- event.getAttribute(AttributeName.RESOURCES.toString());
-
- if (cluster == null || resourceMap == null || msgSelectionOutput == null) {
- throw new StageException("Missing attributes in event: " + event
- + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
- }
-
- NewMessageOutput output = new NewMessageOutput();
-
- // TODO fix it
- ClusterConstraints constraint = cluster.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
- Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
-
- if (constraint != null) {
- // go through all pending messages, they should be counted but not throttled
- for (ParticipantId participantId : cluster.getLiveParticipantMap().keySet()) {
- Participant liveParticipant = cluster.getLiveParticipantMap().get(participantId);
- throttle(throttleCounterMap, constraint, new ArrayList<Message>(liveParticipant
- .getMessageMap().values()), false);
- }
- }
-
- // go through all new messages, throttle if necessary
- // assume messages should be sorted by state transition priority in messageSelection stage
- for (ResourceId resourceId : resourceMap.keySet()) {
- ResourceConfig resource = resourceMap.get(resourceId);
- // TODO fix it
- for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
- List<Message> messages = msgSelectionOutput.getMessages(resourceId, partitionId);
- if (constraint != null && messages != null && messages.size() > 0) {
- messages = throttle(throttleCounterMap, constraint, messages, true);
- }
- output.setMessages(resourceId, partitionId, messages);
- }
- }
-
- event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output);
- }
-
- private List<Message> throttle(Map<String, Integer> throttleMap, ClusterConstraints constraint,
- List<Message> messages, final boolean needThrottle) {
-
- List<Message> throttleOutputMsgs = new ArrayList<Message>();
- for (Message message : messages) {
- Map<ConstraintAttribute, String> msgAttr = ClusterConstraints.toConstraintAttributes(message);
-
- Set<ConstraintItem> matches = constraint.match(msgAttr);
- matches = selectConstraints(matches, msgAttr);
-
- boolean msgThrottled = false;
- for (ConstraintItem item : matches) {
- String key = item.filter(msgAttr).toString();
- if (!throttleMap.containsKey(key)) {
- throttleMap.put(key, valueOf(item.getConstraintValue()));
- }
- int value = throttleMap.get(key);
- throttleMap.put(key, --value);
-
- if (needThrottle && value < 0) {
- msgThrottled = true;
-
- if (LOG.isDebugEnabled()) {
- // TODO: printout constraint item that throttles the message
- LOG.debug("message: " + message + " is throttled by constraint: " + item);
- }
- }
- }
-
- if (!msgThrottled) {
- throttleOutputMsgs.add(message);
- }
- }
-
- return throttleOutputMsgs;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
deleted file mode 100644
index 26050f8..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.accessor.ClusterAccessor;
-import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
-import org.apache.log4j.Logger;
-
-public class NewReadClusterDataStage extends AbstractBaseStage {
- private static final Logger LOG = Logger.getLogger(NewReadClusterDataStage.class.getName());
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- long startTime = System.currentTimeMillis();
- LOG.info("START ReadClusterDataStage.process()");
-
- HelixManager manager = event.getAttribute("helixmanager");
- if (manager == null) {
- throw new StageException("HelixManager attribute value is null");
- }
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- ClusterId clusterId = ClusterId.from(manager.getClusterName());
- ClusterAccessor clusterAccessor = new ClusterAccessor(clusterId, accessor);
-
- Cluster cluster = clusterAccessor.readCluster();
-
- ClusterStatusMonitor clusterStatusMonitor =
- (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
- if (clusterStatusMonitor != null) {
- // TODO fix it
- // int disabledInstances = 0;
- // int disabledPartitions = 0;
- // for (InstanceConfig config : _cache._instanceConfigMap.values()) {
- // if (config.getInstanceEnabled() == false) {
- // disabledInstances++;
- // }
- // if (config.getDisabledPartitions() != null) {
- // disabledPartitions += config.getDisabledPartitions().size();
- // }
- // }
- // clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(),
- // _cache._instanceConfigMap.size(), disabledInstances, disabledPartitions);
- }
-
- event.addAttribute("ClusterDataCache", cluster);
-
- long endTime = System.currentTimeMillis();
- LOG.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
deleted file mode 100644
index b531bd7..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ /dev/null
@@ -1,138 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.model.CurrentState;
-import org.apache.log4j.Logger;
-
-/**
- * This stage computes all the resources in a cluster. The resources are
- * computed from IdealStates -> this gives all the resources currently active
- * CurrentState for liveInstance-> Helps in finding resources that are inactive
- * and needs to be dropped
- */
-public class NewResourceComputationStage extends AbstractBaseStage {
- private static Logger LOG = Logger.getLogger(NewResourceComputationStage.class);
-
- @Override
- public void process(ClusterEvent event) throws StageException {
- Cluster cluster = event.getAttribute("ClusterDataCache");
- if (cluster == null) {
- throw new StageException("Missing attributes in event: " + event + ". Requires Cluster");
- }
-
- Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
- Map<ResourceId, ResourceConfig> csResCfgMap = getCurStateResourceCfgMap(cluster);
-
- // ideal-state may be removed, add all resource config in current-state but not in ideal-state
- for (ResourceId resourceId : csResCfgMap.keySet()) {
- if (!cluster.getResourceMap().keySet().contains(resourceId)) {
- resCfgMap.put(resourceId, csResCfgMap.get(resourceId));
- }
- }
-
- for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
- Resource resource = cluster.getResource(resourceId);
- RebalancerConfig rebalancerCfg = resource.getRebalancerConfig();
-
- ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
- resCfgBuilder.bucketSize(resource.getBucketSize());
- resCfgBuilder.batchMessageMode(resource.getBatchMessageMode());
- resCfgBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
- resCfgBuilder.rebalancerContext(rebalancerCfg.getRebalancerContext(RebalancerContext.class));
- resCfgMap.put(resourceId, resCfgBuilder.build());
- }
-
- event.addAttribute(AttributeName.RESOURCES.toString(), resCfgMap);
- }
-
- /**
- * Get resource config's from current-state
- * @param cluster
- * @return resource config map or empty map if not available
- * @throws StageException
- */
- Map<ResourceId, ResourceConfig> getCurStateResourceCfgMap(Cluster cluster) throws StageException {
- Map<ResourceId, ResourceConfig.Builder> resCfgBuilderMap =
- new HashMap<ResourceId, ResourceConfig.Builder>();
-
- Map<ResourceId, PartitionedRebalancerContext.Builder> rebCtxBuilderMap =
- new HashMap<ResourceId, PartitionedRebalancerContext.Builder>();
-
- for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
- for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
- CurrentState currentState = liveParticipant.getCurrentStateMap().get(resourceId);
-
- if (currentState.getStateModelDefRef() == null) {
- LOG.error("state model def is null." + "resource:" + currentState.getResourceId()
- + ", partitions: " + currentState.getPartitionStateMap().keySet()
- + ", states: " + currentState.getPartitionStateMap().values());
- throw new StageException("State model def is null for resource:"
- + currentState.getResourceId());
- }
-
- if (!resCfgBuilderMap.containsKey(resourceId)) {
- PartitionedRebalancerContext.Builder rebCtxBuilder =
- new PartitionedRebalancerContext.Builder(resourceId);
- rebCtxBuilder.stateModelDefId(currentState.getStateModelDefId());
- rebCtxBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
- .getStateModelFactoryName()));
- rebCtxBuilderMap.put(resourceId, rebCtxBuilder);
-
- ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
- resCfgBuilder.bucketSize(currentState.getBucketSize());
- resCfgBuilder.batchMessageMode(currentState.getBatchMessageMode());
- resCfgBuilderMap.put(resourceId, resCfgBuilder);
- }
-
- PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
- for (PartitionId partitionId : currentState.getTypedPartitionStateMap().keySet()) {
- rebCtxBuilder.addPartition(new Partition(partitionId));
- }
- }
- }
-
- Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
- for (ResourceId resourceId : resCfgBuilderMap.keySet()) {
- ResourceConfig.Builder resCfgBuilder = resCfgBuilderMap.get(resourceId);
- PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
- resCfgBuilder.rebalancerContext(rebCtxBuilder.build());
- resCfgMap.put(resourceId, resCfgBuilder.build());
- }
-
- return resCfgMap;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
deleted file mode 100644
index 51c9284..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
+++ /dev/null
@@ -1,151 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerProperties;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.Message;
-import org.apache.log4j.Logger;
-
-public class NewTaskAssignmentStage extends AbstractBaseStage {
- private static Logger logger = Logger.getLogger(NewTaskAssignmentStage.class);
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- long startTime = System.currentTimeMillis();
- logger.info("START TaskAssignmentStage.process()");
-
- HelixManager manager = event.getAttribute("helixmanager");
- Map<ResourceId, ResourceConfig> resourceMap =
- event.getAttribute(AttributeName.RESOURCES.toString());
- NewMessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
- Cluster cluster = event.getAttribute("ClusterDataCache");
- Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
-
- if (manager == null || resourceMap == null || messageOutput == null || cluster == null
- || liveParticipantMap == null) {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache|liveInstanceMap");
- }
-
- HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
- List<Message> messagesToSend = new ArrayList<Message>();
- for (ResourceId resourceId : resourceMap.keySet()) {
- ResourceConfig resource = resourceMap.get(resourceId);
- for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
- List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
- messagesToSend.addAll(messages);
- }
- }
-
- List<Message> outputMessages =
- batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveParticipantMap,
- manager.getProperties());
- sendMessages(dataAccessor, outputMessages);
-
- long endTime = System.currentTimeMillis();
- logger.info("END TaskAssignmentStage.process(). took: " + (endTime - startTime) + " ms");
-
- }
-
- List<Message> batchMessage(Builder keyBuilder, List<Message> messages,
- Map<ResourceId, ResourceConfig> resourceMap,
- Map<ParticipantId, Participant> liveParticipantMap, HelixManagerProperties properties) {
- // group messages by its CurrentState path + "/" + fromState + "/" + toState
- Map<String, Message> batchMessages = new HashMap<String, Message>();
- List<Message> outputMessages = new ArrayList<Message>();
-
- Iterator<Message> iter = messages.iterator();
- while (iter.hasNext()) {
- Message message = iter.next();
- ResourceId resourceId = message.getResourceId();
- ResourceConfig resource = resourceMap.get(resourceId);
-
- ParticipantId participantId = ParticipantId.from(message.getTgtName());
- Participant liveParticipant = liveParticipantMap.get(participantId);
- String participantVersion = null;
- if (liveParticipant != null) {
- participantVersion = liveParticipant.getRunningInstance().getVersion().toString();
- }
-
- if (resource == null || !resource.getBatchMessageMode() || participantVersion == null
- || !properties.isFeatureSupported("batch_message", participantVersion)) {
- outputMessages.add(message);
- continue;
- }
-
- String key =
- keyBuilder.currentState(message.getTgtName(), message.getTypedTgtSessionId().stringify(),
- message.getResourceId().stringify()).getPath()
- + "/" + message.getTypedFromState() + "/" + message.getTypedToState();
-
- if (!batchMessages.containsKey(key)) {
- Message batchMessage = new Message(message.getRecord());
- batchMessage.setBatchMessageMode(true);
- outputMessages.add(batchMessage);
- batchMessages.put(key, batchMessage);
- }
- batchMessages.get(key).addPartitionName(message.getPartitionId().stringify());
- }
-
- return outputMessages;
- }
-
- protected void sendMessages(HelixDataAccessor dataAccessor, List<Message> messages) {
- if (messages == null || messages.isEmpty()) {
- return;
- }
-
- Builder keyBuilder = dataAccessor.keyBuilder();
-
- List<PropertyKey> keys = new ArrayList<PropertyKey>();
- for (Message message : messages) {
- logger.info("Sending Message " + message.getMessageId() + " to " + message.getTgtName()
- + " transit " + message.getPartitionId() + "|" + message.getPartitionIds() + " from:"
- + message.getTypedFromState() + " to:" + message.getTypedToState());
-
- // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to "
- // + message.getTgtName() + " transit " + message.getPartitionId() + "|"
- // + message.getPartitionIds() + " from: " + message.getFromState() + " to: "
- // + message.getToState());
-
- keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
- }
-
- dataAccessor.createChildren(keys, new ArrayList<Message>(messages));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index be0b7f0..31dbb08 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -35,7 +35,7 @@ public class PersistAssignmentStage extends AbstractBaseStage {
HelixManager helixManager = event.getAttribute("helixmanager");
HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
ResourceAccessor resourceAccessor = new ResourceAccessor(accessor);
- NewBestPossibleStateOutput assignments =
+ BestPossibleStateOutput assignments =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
for (ResourceId resourceId : assignments.getAssignedResources()) {
ResourceAssignment assignment = assignments.getResourceAssignment(resourceId);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index ce81f1f..44fddb6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -21,53 +21,53 @@ package org.apache.helix.controller.stages;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.id.ClusterId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.InstanceConfig;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.log4j.Logger;
-@Deprecated
public class ReadClusterDataStage extends AbstractBaseStage {
- private static final Logger logger = Logger.getLogger(ReadClusterDataStage.class.getName());
- ClusterDataCache _cache;
-
- public ReadClusterDataStage() {
- _cache = new ClusterDataCache();
- }
+ private static final Logger LOG = Logger.getLogger(ReadClusterDataStage.class.getName());
@Override
public void process(ClusterEvent event) throws Exception {
long startTime = System.currentTimeMillis();
- logger.info("START ReadClusterDataStage.process()");
+ LOG.info("START ReadClusterDataStage.process()");
HelixManager manager = event.getAttribute("helixmanager");
if (manager == null) {
throw new StageException("HelixManager attribute value is null");
}
- HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
- _cache.refresh(dataAccessor);
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ ClusterId clusterId = ClusterId.from(manager.getClusterName());
+ ClusterAccessor clusterAccessor = new ClusterAccessor(clusterId, accessor);
+
+ Cluster cluster = clusterAccessor.readCluster();
ClusterStatusMonitor clusterStatusMonitor =
(ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
if (clusterStatusMonitor != null) {
- int disabledInstances = 0;
- int disabledPartitions = 0;
- for (InstanceConfig config : _cache._instanceConfigMap.values()) {
- if (config.getInstanceEnabled() == false) {
- disabledInstances++;
- }
- if (config.getDisabledPartitions() != null) {
- disabledPartitions += config.getDisabledPartitions().size();
- }
- }
- clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(),
- _cache._instanceConfigMap.size(), disabledInstances, disabledPartitions);
+ // TODO fix it
+ // int disabledInstances = 0;
+ // int disabledPartitions = 0;
+ // for (InstanceConfig config : _cache._instanceConfigMap.values()) {
+ // if (config.getInstanceEnabled() == false) {
+ // disabledInstances++;
+ // }
+ // if (config.getDisabledPartitions() != null) {
+ // disabledPartitions += config.getDisabledPartitions().size();
+ // }
+ // }
+ // clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(),
+ // _cache._instanceConfigMap.size(), disabledInstances, disabledPartitions);
}
- event.addAttribute("ClusterDataCache", _cache);
+ event.addAttribute("ClusterDataCache", cluster);
long endTime = System.currentTimeMillis();
- logger.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
+ LOG.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
index ae873c7..859c1d0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
@@ -23,10 +23,8 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.log4j.Logger;
public class ReadHealthDataStage extends AbstractBaseStage {
- private static final Logger LOG = Logger.getLogger(ReadHealthDataStage.class.getName());
HealthDataCache _cache;
public ReadHealthDataStage() {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index da38ee2..dc56b89 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -19,16 +19,23 @@ package org.apache.helix.controller.stages;
* under the License.
*/
-import java.util.LinkedHashMap;
+import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.api.rebalancer.PartitionedRebalancerContext;
+import org.apache.helix.api.rebalancer.RebalancerConfig;
+import org.apache.helix.api.rebalancer.RebalancerContext;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Resource;
import org.apache.log4j.Logger;
/**
@@ -37,102 +44,95 @@ import org.apache.log4j.Logger;
* CurrentState for liveInstance-> Helps in finding resources that are inactive
* and needs to be dropped
*/
-@Deprecated
public class ResourceComputationStage extends AbstractBaseStage {
private static Logger LOG = Logger.getLogger(ResourceComputationStage.class);
@Override
- public void process(ClusterEvent event) throws Exception {
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- if (cache == null) {
- throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
+ public void process(ClusterEvent event) throws StageException {
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ if (cluster == null) {
+ throw new StageException("Missing attributes in event: " + event + ". Requires Cluster");
}
- Map<String, IdealState> idealStates = cache.getIdealStates();
+ Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
+ Map<ResourceId, ResourceConfig> csResCfgMap = getCurStateResourceCfgMap(cluster);
- Map<String, Resource> resourceMap = new LinkedHashMap<String, Resource>();
+ // ideal-state may be removed, add all resource config in current-state but not in ideal-state
+ for (ResourceId resourceId : csResCfgMap.keySet()) {
+ if (!cluster.getResourceMap().keySet().contains(resourceId)) {
+ resCfgMap.put(resourceId, csResCfgMap.get(resourceId));
+ }
+ }
- if (idealStates != null && idealStates.size() > 0) {
- for (IdealState idealState : idealStates.values()) {
- Set<String> partitionSet = idealState.getPartitionSet();
- String resourceName = idealState.getResourceName();
+ for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
+ Resource resource = cluster.getResource(resourceId);
+ RebalancerConfig rebalancerCfg = resource.getRebalancerConfig();
- for (String partition : partitionSet) {
- addPartition(partition, resourceName, resourceMap);
- Resource resource = resourceMap.get(resourceName);
- resource.setStateModelDefRef(idealState.getStateModelDefRef());
- resource.setStateModelFactoryName(idealState.getStateModelFactoryName());
- resource.setBucketSize(idealState.getBucketSize());
- resource.setBatchMessageMode(idealState.getBatchMessageMode());
- }
- }
+ ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
+ resCfgBuilder.bucketSize(resource.getBucketSize());
+ resCfgBuilder.batchMessageMode(resource.getBatchMessageMode());
+ resCfgBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
+ resCfgBuilder.rebalancerContext(rebalancerCfg.getRebalancerContext(RebalancerContext.class));
+ resCfgMap.put(resourceId, resCfgBuilder.build());
}
- // It's important to get partitions from CurrentState as well since the
- // idealState might be removed.
- Map<String, LiveInstance> availableInstances = cache.getLiveInstances();
+ event.addAttribute(AttributeName.RESOURCES.toString(), resCfgMap);
+ }
- if (availableInstances != null && availableInstances.size() > 0) {
- for (LiveInstance instance : availableInstances.values()) {
- String instanceName = instance.getInstanceName();
- String clientSessionId = instance.getTypedSessionId().stringify();
+ /**
+ * Get resource config's from current-state
+ * @param cluster
+ * @return resource config map or empty map if not available
+ * @throws StageException
+ */
+ Map<ResourceId, ResourceConfig> getCurStateResourceCfgMap(Cluster cluster) throws StageException {
+ Map<ResourceId, ResourceConfig.Builder> resCfgBuilderMap =
+ new HashMap<ResourceId, ResourceConfig.Builder>();
+
+ Map<ResourceId, PartitionedRebalancerContext.Builder> rebCtxBuilderMap =
+ new HashMap<ResourceId, PartitionedRebalancerContext.Builder>();
+
+ for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
+ for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
+ CurrentState currentState = liveParticipant.getCurrentStateMap().get(resourceId);
+
+ if (currentState.getStateModelDefRef() == null) {
+ LOG.error("state model def is null." + "resource:" + currentState.getResourceId()
+ + ", partitions: " + currentState.getPartitionStateMap().keySet()
+ + ", states: " + currentState.getPartitionStateMap().values());
+ throw new StageException("State model def is null for resource:"
+ + currentState.getResourceId());
+ }
- Map<String, CurrentState> currentStateMap =
- cache.getCurrentState(instanceName, clientSessionId);
- if (currentStateMap == null || currentStateMap.size() == 0) {
- continue;
+ if (!resCfgBuilderMap.containsKey(resourceId)) {
+ PartitionedRebalancerContext.Builder rebCtxBuilder =
+ new PartitionedRebalancerContext.Builder(resourceId);
+ rebCtxBuilder.stateModelDefId(currentState.getStateModelDefId());
+ rebCtxBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
+ .getStateModelFactoryName()));
+ rebCtxBuilderMap.put(resourceId, rebCtxBuilder);
+
+ ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
+ resCfgBuilder.bucketSize(currentState.getBucketSize());
+ resCfgBuilder.batchMessageMode(currentState.getBatchMessageMode());
+ resCfgBuilderMap.put(resourceId, resCfgBuilder);
}
- for (CurrentState currentState : currentStateMap.values()) {
-
- String resourceName = currentState.getResourceName();
- Map<String, String> resourceStateMap = currentState.getPartitionStateMap();
-
- // don't overwrite ideal state settings
- if (!resourceMap.containsKey(resourceName)) {
- addResource(resourceName, resourceMap);
- Resource resource = resourceMap.get(resourceName);
- resource.setStateModelDefRef(currentState.getStateModelDefRef());
- resource.setStateModelFactoryName(currentState.getStateModelFactoryName());
- resource.setBucketSize(currentState.getBucketSize());
- resource.setBatchMessageMode(currentState.getBatchMessageMode());
- }
-
- if (currentState.getStateModelDefRef() == null) {
- LOG.error("state model def is null." + "resource:" + currentState.getResourceName()
- + ", partitions: " + currentState.getPartitionStateMap().keySet()
- + ", states: " + currentState.getPartitionStateMap().values());
- throw new StageException("State model def is null for resource:"
- + currentState.getResourceName());
- }
-
- for (String partition : resourceStateMap.keySet()) {
- addPartition(partition, resourceName, resourceMap);
- }
+
+ PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
+ for (PartitionId partitionId : currentState.getTypedPartitionStateMap().keySet()) {
+ rebCtxBuilder.addPartition(new Partition(partitionId));
}
}
}
- event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
- }
-
- private void addResource(String resource, Map<String, Resource> resourceMap) {
- if (resource == null || resourceMap == null) {
- return;
- }
- if (!resourceMap.containsKey(resource)) {
- resourceMap.put(resource, new Resource(resource));
- }
- }
-
- private void addPartition(String partition, String resourceName, Map<String, Resource> resourceMap) {
- if (resourceName == null || partition == null || resourceMap == null) {
- return;
- }
- if (!resourceMap.containsKey(resourceName)) {
- resourceMap.put(resourceName, new Resource(resourceName));
+ Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
+ for (ResourceId resourceId : resCfgBuilderMap.keySet()) {
+ ResourceConfig.Builder resCfgBuilder = resCfgBuilderMap.get(resourceId);
+ PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
+ resCfgBuilder.rebalancerContext(rebCtxBuilder.build());
+ resCfgMap.put(resourceId, resCfgBuilder.build());
}
- Resource resource = resourceMap.get(resourceName);
- resource.addPartition(partition);
+ return resCfgMap;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index c942db9..02188be 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -30,16 +30,17 @@ import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
import org.apache.log4j.Logger;
-@Deprecated
public class TaskAssignmentStage extends AbstractBaseStage {
private static Logger logger = Logger.getLogger(TaskAssignmentStage.class);
@@ -49,30 +50,30 @@ public class TaskAssignmentStage extends AbstractBaseStage {
logger.info("START TaskAssignmentStage.process()");
HelixManager manager = event.getAttribute("helixmanager");
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
- MessageThrottleStageOutput messageOutput =
- event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
-
- if (manager == null || resourceMap == null || messageOutput == null || cache == null
- || liveInstanceMap == null) {
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
+
+ if (manager == null || resourceMap == null || messageOutput == null || cluster == null
+ || liveParticipantMap == null) {
throw new StageException("Missing attributes in event:" + event
+ ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache|liveInstanceMap");
}
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
List<Message> messagesToSend = new ArrayList<Message>();
- for (String resourceName : resourceMap.keySet()) {
- Resource resource = resourceMap.get(resourceName);
- for (Partition partition : resource.getPartitions()) {
- List<Message> messages = messageOutput.getMessages(resourceName, partition);
+ for (ResourceId resourceId : resourceMap.keySet()) {
+ ResourceConfig resource = resourceMap.get(resourceId);
+ for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
+ List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
messagesToSend.addAll(messages);
}
}
List<Message> outputMessages =
- batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveInstanceMap,
+ batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveParticipantMap,
manager.getProperties());
sendMessages(dataAccessor, outputMessages);
@@ -82,8 +83,8 @@ public class TaskAssignmentStage extends AbstractBaseStage {
}
List<Message> batchMessage(Builder keyBuilder, List<Message> messages,
- Map<String, Resource> resourceMap, Map<String, LiveInstance> liveInstanceMap,
- HelixManagerProperties properties) {
+ Map<ResourceId, ResourceConfig> resourceMap,
+ Map<ParticipantId, Participant> liveParticipantMap, HelixManagerProperties properties) {
// group messages by its CurrentState path + "/" + fromState + "/" + toState
Map<String, Message> batchMessages = new HashMap<String, Message>();
List<Message> outputMessages = new ArrayList<Message>();
@@ -92,13 +93,13 @@ public class TaskAssignmentStage extends AbstractBaseStage {
while (iter.hasNext()) {
Message message = iter.next();
ResourceId resourceId = message.getResourceId();
- Resource resource = resourceMap.get(resourceId.stringify());
+ ResourceConfig resource = resourceMap.get(resourceId);
- String instanceName = message.getTgtName();
- LiveInstance liveInstance = liveInstanceMap.get(instanceName);
+ ParticipantId participantId = ParticipantId.from(message.getTgtName());
+ Participant liveParticipant = liveParticipantMap.get(participantId);
String participantVersion = null;
- if (liveInstance != null) {
- participantVersion = liveInstance.getTypedHelixVersion().toString();
+ if (liveParticipant != null) {
+ participantVersion = liveParticipant.getRunningInstance().getVersion().toString();
}
if (resource == null || !resource.getBatchMessageMode() || participantVersion == null
@@ -137,10 +138,10 @@ public class TaskAssignmentStage extends AbstractBaseStage {
+ " transit " + message.getPartitionId() + "|" + message.getPartitionIds() + " from:"
+ message.getTypedFromState() + " to:" + message.getTypedToState());
- // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to " +
- // message.getTgtName()
- // + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
- // + " from: " + message.getFromState() + " to: " + message.getToState());
+ // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to "
+ // + message.getTgtName() + " transit " + message.getPartitionId() + "|"
+ // + message.getPartitionIds() + " from: " + message.getFromState() + " to: "
+ // + message.getToState());
keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 7d84258..805f6bf 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -38,7 +38,7 @@ import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.rebalancer.context.RebalancerRef;
+import org.apache.helix.api.rebalancer.RebalancerRef;
import org.apache.log4j.Logger;
import com.google.common.base.Function;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index f591a24..1563769 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -55,10 +55,10 @@ import org.apache.helix.controller.pipeline.StageContext;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.NewBestPossibleStateOutput;
-import org.apache.helix.controller.stages.NewCurrentStateComputationStage;
-import org.apache.helix.controller.stages.NewResourceComputationStage;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
@@ -250,7 +250,7 @@ public class ClusterStateVerifier {
ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
Cluster cluster = clusterAccessor.readCluster();
// calculate best possible state
- NewBestPossibleStateOutput bestPossOutput = ClusterStateVerifier.calcBestPossState(cluster);
+ BestPossibleStateOutput bestPossOutput = ClusterStateVerifier.calcBestPossState(cluster);
// set error states
if (errStates != null) {
@@ -416,19 +416,19 @@ public class ClusterStateVerifier {
* @throws Exception
*/
- static NewBestPossibleStateOutput calcBestPossState(Cluster cluster) throws Exception {
+ static BestPossibleStateOutput calcBestPossState(Cluster cluster) throws Exception {
ClusterEvent event = new ClusterEvent("sampleEvent");
event.addAttribute("ClusterDataCache", cluster);
- NewResourceComputationStage rcState = new NewResourceComputationStage();
- NewCurrentStateComputationStage csStage = new NewCurrentStateComputationStage();
- NewBestPossibleStateCalcStage bpStage = new NewBestPossibleStateCalcStage();
+ ResourceComputationStage rcState = new ResourceComputationStage();
+ CurrentStateComputationStage csStage = new CurrentStateComputationStage();
+ BestPossibleStateCalcStage bpStage = new BestPossibleStateCalcStage();
runStage(event, rcState);
runStage(event, csStage);
runStage(event, bpStage);
- NewBestPossibleStateOutput output =
+ BestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
return output;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
index 85330be..f48ebbc 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
@@ -45,10 +45,10 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.api.rebalancer.CustomRebalancerContext;
+import org.apache.helix.api.rebalancer.PartitionedRebalancerContext;
+import org.apache.helix.api.rebalancer.RebalancerContext;
+import org.apache.helix.api.rebalancer.SemiAutoRebalancerContext;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index c478bbb..7fe3314 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -33,11 +33,11 @@ import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.api.rebalancer.SemiAutoRebalancerContext;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.NewBestPossibleStateOutput;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -130,13 +130,13 @@ public class TestNewStages extends ZkUnitTestBase {
// Run the stage
try {
- new NewBestPossibleStateCalcStage().process(event);
+ new BestPossibleStateCalcStage().process(event);
} catch (Exception e) {
Assert.fail(e.toString());
}
// Verify the result
- NewBestPossibleStateOutput bestPossibleStateOutput =
+ BestPossibleStateOutput bestPossibleStateOutput =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
Assert.assertNotNull(bestPossibleStateOutput);
ResourceId resourceId = ResourceId.from("TestDB0");
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java b/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
index 74781cd..0a578c1 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
@@ -9,8 +9,8 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.api.rebalancer.FullAutoRebalancerContext;
+import org.apache.helix.api.rebalancer.SemiAutoRebalancerContext;
import org.testng.Assert;
import org.testng.annotations.Test;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
index 5bbe54f..8650475 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
@@ -8,6 +8,9 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.rebalancer.CustomRebalancerContext;
+import org.apache.helix.api.rebalancer.RebalancerConfig;
+import org.apache.helix.api.rebalancer.RebalancerContext;
import org.apache.helix.model.ResourceConfiguration;
import org.testng.Assert;
import org.testng.annotations.Test;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index ecb8151..6279087 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -39,10 +39,10 @@ import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.rebalancer.PartitionedRebalancerContext;
+import org.apache.helix.api.rebalancer.RebalancerContext;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.InstanceConfig;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
index 18e8f4d..cb60691 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
@@ -62,12 +62,12 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
- NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
+ ReadClusterDataStage stage1 = new ReadClusterDataStage();
runStage(event, stage1);
- NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
+ BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
runStage(event, stage2);
- NewBestPossibleStateOutput output =
+ BestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
for (int p = 0; p < 5; p++) {
Map<ParticipantId, State> replicaMap =
@@ -100,12 +100,12 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
- NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
+ ReadClusterDataStage stage1 = new ReadClusterDataStage();
runStage(event, stage1);
- NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
+ BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
runStage(event, stage2);
- NewBestPossibleStateOutput output =
+ BestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
for (int p = 0; p < 5; p++) {
Map<ParticipantId, State> replicaMap =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
index d3f348e..d116182 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
@@ -52,12 +52,12 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest {
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
- NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
+ ReadClusterDataStage stage1 = new ReadClusterDataStage();
runStage(event, stage1);
- NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
+ BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
runStage(event, stage2);
- NewBestPossibleStateOutput output =
+ BestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
for (int p = 0; p < 5; p++) {
Map<ParticipantId, State> replicaMap =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
index fb113b9..9d1dd04 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
@@ -78,13 +78,13 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
.put("minimum_supported_version.participant", minSupportedParticipantVersion);
}
event.addAttribute("helixmanager", manager);
- runStage(event, new NewReadClusterDataStage());
+ runStage(event, new ReadClusterDataStage());
}
@Test
public void testCompatible() {
prepare("0.4.0", "0.4.0");
- NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
+ CompatibilityCheckStage stage = new CompatibilityCheckStage();
StageContext context = new StageContext();
stage.init(context);
stage.preProcess();
@@ -99,7 +99,7 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
@Test
public void testNullParticipantVersion() {
prepare("0.4.0", null);
- NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
+ CompatibilityCheckStage stage = new CompatibilityCheckStage();
StageContext context = new StageContext();
stage.init(context);
stage.preProcess();
@@ -115,7 +115,7 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
@Test
public void testNullControllerVersion() {
prepare(null, "0.4.0");
- NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
+ CompatibilityCheckStage stage = new CompatibilityCheckStage();
StageContext context = new StageContext();
stage.init(context);
stage.preProcess();
@@ -131,7 +131,7 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
@Test
public void testIncompatible() {
prepare("0.6.1-incubating-SNAPSHOT", "0.3.4", "0.4");
- NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
+ CompatibilityCheckStage stage = new CompatibilityCheckStage();
StageContext context = new StageContext();
stage.init(context);
stage.preProcess();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index 3412e0a..65d551d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -48,8 +48,8 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
List<IdealState> idealStates = setupIdealState(5, resources, 10, 1, RebalanceMode.SEMI_AUTO);
Map<ResourceId, ResourceConfig> resourceMap = getResourceMap(idealStates);
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
- NewCurrentStateComputationStage stage = new NewCurrentStateComputationStage();
- runStage(event, new NewReadClusterDataStage());
+ CurrentStateComputationStage stage = new CurrentStateComputationStage();
+ runStage(event, new ReadClusterDataStage());
runStage(event, stage);
ResourceCurrentState output = event.getAttribute(AttributeName.CURRENT_STATE.toString());
AssertJUnit.assertEquals(
@@ -69,8 +69,8 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
setupLiveInstances(5);
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
- NewCurrentStateComputationStage stage = new NewCurrentStateComputationStage();
- runStage(event, new NewReadClusterDataStage());
+ CurrentStateComputationStage stage = new CurrentStateComputationStage();
+ runStage(event, new ReadClusterDataStage());
runStage(event, stage);
ResourceCurrentState output1 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
AssertJUnit.assertEquals(
@@ -89,7 +89,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.message("localhost_" + 3, message.getId()), message);
- runStage(event, new NewReadClusterDataStage());
+ runStage(event, new ReadClusterDataStage());
runStage(event, stage);
ResourceCurrentState output2 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
State pendingState =
@@ -114,7 +114,7 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
accessor.setProperty(
keyBuilder.currentState("localhost_3", "session_dead", "testResourceName"),
stateWithDeadSession);
- runStage(event, new NewReadClusterDataStage());
+ runStage(event, new ReadClusterDataStage());
runStage(event, stage);
ResourceCurrentState output3 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
State currentState =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9f229c80/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index 0bd8795..ba61361 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -78,7 +78,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
ClusterEvent event = new ClusterEvent("testEvent");
event.addAttribute("helixmanager", manager);
- NewMessageThrottleStage throttleStage = new NewMessageThrottleStage();
+ MessageThrottleStage throttleStage = new MessageThrottleStage();
try {
runStage(event, throttleStage);
Assert.fail("Should throw exception since DATA_CACHE is null");
@@ -87,7 +87,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
}
Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new NewReadClusterDataStage());
+ dataRefresh.addStage(new ReadClusterDataStage());
runPipeline(event, dataRefresh);
try {
@@ -96,7 +96,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
} catch (Exception e) {
// OK
}
- runStage(event, new NewResourceComputationStage());
+ runStage(event, new ResourceComputationStage());
try {
runStage(event, throttleStage);
@@ -104,7 +104,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
} catch (Exception e) {
// OK
}
- NewMessageOutput msgSelectOutput = new NewMessageOutput();
+ MessageOutput msgSelectOutput = new MessageOutput();
List<Message> selectMessages = new ArrayList<Message>();
Message msg =
createMessage(MessageType.STATE_TRANSITION, MessageId.from("msgId-001"), "OFFLINE",
@@ -117,7 +117,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
runStage(event, throttleStage);
- NewMessageOutput msgThrottleOutput =
+ MessageOutput msgThrottleOutput =
event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
Assert.assertEquals(
msgThrottleOutput.getMessages(ResourceId.from("TestDB"), PartitionId.from("TestDB_0"))
@@ -221,7 +221,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
ClusterConstraints constraint =
accessor.getProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()));
- NewMessageThrottleStage throttleStage = new NewMessageThrottleStage();
+ MessageThrottleStage throttleStage = new MessageThrottleStage();
// test constraintSelection
// message1: hit contraintSelection rule1 and rule2
@@ -271,10 +271,10 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
event.addAttribute("helixmanager", manager);
Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new NewReadClusterDataStage());
+ dataRefresh.addStage(new ReadClusterDataStage());
runPipeline(event, dataRefresh);
- runStage(event, new NewResourceComputationStage());
- NewMessageOutput msgSelectOutput = new NewMessageOutput();
+ runStage(event, new ResourceComputationStage());
+ MessageOutput msgSelectOutput = new MessageOutput();
Message msg3 =
createMessage(MessageType.STATE_TRANSITION, MessageId.from("msgId-003"), "OFFLINE",
@@ -306,7 +306,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
runStage(event, throttleStage);
- NewMessageOutput msgThrottleOutput =
+ MessageOutput msgThrottleOutput =
event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
List<Message> throttleMessages =
msgThrottleOutput.getMessages(ResourceId.from("TestDB"), PartitionId.from("TestDB_0"));