You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/09/05 22:11:52 UTC
[1/2] helix rebalancer refactor using logical models
Updated Branches:
refs/heads/helix-logical-model 9c7de4c33 -> 5d0e048e1
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
index 4038c69..9745c64 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
@@ -27,10 +27,14 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.IdealState;
@@ -75,15 +79,22 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
public int getUpperBound() {
return upper;
}
+
+ @Override
+ public String toString() {
+ return String.format("%d-%d", lower, upper);
+ }
}
@Override
public void process(ClusterEvent event) throws Exception {
Cluster cluster = event.getAttribute("ClusterDataCache");
+ Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
+ event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
NewCurrentStateOutput currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.toString());
- MessageGenerationOutput messageGenOutput =
+ NewMessageOutput messageGenOutput =
event.getAttribute(AttributeName.MESSAGES_ALL.toString());
if (cluster == null || resourceMap == null || currentStateOutput == null
|| messageGenOutput == null) {
@@ -91,29 +102,28 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
+ ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
}
- MessageSelectionStageOutput output = new MessageSelectionStageOutput();
+ NewMessageOutput output = new NewMessageOutput();
for (ResourceId resourceId : resourceMap.keySet()) {
Resource resource = resourceMap.get(resourceId);
- // TODO fix it
- StateModelDefinition stateModelDef = null;
- // cache.getStateModelDef(resource.getStateModelDefRef());
+ StateModelDefinition stateModelDef =
+ stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
+ // TODO have a logical model for transition
Map<String, Integer> stateTransitionPriorities = getStateTransitionPriorityMap(stateModelDef);
- // IdealState idealState = cache.getIdealState(resourceName);
- Map<String, Bounds> stateConstraints =
+ Map<State, Bounds> stateConstraints =
computeStateConstraints(stateModelDef, resource.getRebalancerConfig(), cluster);
// TODO fix it
- // for (Partition partition : resource.getPartitions()) {
- // List<Message> messages = messageGenOutput.getMessages(resourceId.stringify(), partition);
- // List<Message> selectedMessages =
- // selectMessages(cache.getLiveInstances(),
- // currentStateOutput.getCurrentStateMap(resourceName, partition),
- // currentStateOutput.getPendingStateMap(resourceName, partition), messages,
- // stateConstraints, stateTransitionPriorities, stateModelDef.getInitialStateString());
- // output.addMessages(resourceId.stringify(), partition, selectedMessages);
- // }
+ for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+ List<Message> messages = messageGenOutput.getMessages(resourceId, partitionId);
+ List<Message> selectedMessages =
+ selectMessages(cluster.getLiveParticipantMap(),
+ currentStateOutput.getCurrentStateMap(resourceId, partitionId),
+ currentStateOutput.getPendingStateMap(resourceId, partitionId), messages,
+ stateConstraints, stateTransitionPriorities, stateModelDef.getInitialState());
+ output.setMessages(resourceId, partitionId, selectedMessages);
+ }
}
event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
}
@@ -137,22 +147,22 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
* : FROME_STATE-TO_STATE -> priority
* @return: selected messages
*/
- List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
- Map<String, String> currentStates, Map<String, String> pendingStates, List<Message> messages,
- Map<String, Bounds> stateConstraints, final Map<String, Integer> stateTransitionPriorities,
- String initialState) {
+ List<Message> selectMessages(Map<ParticipantId, Participant> liveParticipants,
+ Map<ParticipantId, State> currentStates, Map<ParticipantId, State> pendingStates,
+ List<Message> messages, Map<State, Bounds> stateConstraints,
+ final Map<String, Integer> stateTransitionPriorities, State initialState) {
if (messages == null || messages.isEmpty()) {
return Collections.emptyList();
}
List<Message> selectedMessages = new ArrayList<Message>();
- Map<String, Bounds> bounds = new HashMap<String, Bounds>();
+ Map<State, Bounds> bounds = new HashMap<State, Bounds>();
// count currentState, if no currentState, count as in initialState
- for (String instance : liveInstances.keySet()) {
- String state = initialState;
- if (currentStates.containsKey(instance)) {
- state = currentStates.get(instance);
+ for (ParticipantId liveParticipantId : liveParticipants.keySet()) {
+ State state = initialState;
+ if (currentStates.containsKey(liveParticipantId)) {
+ state = currentStates.get(liveParticipantId);
}
if (!bounds.containsKey(state)) {
@@ -163,8 +173,8 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
}
// count pendingStates
- for (String instance : pendingStates.keySet()) {
- String state = pendingStates.get(instance);
+ for (ParticipantId participantId : pendingStates.keySet()) {
+ State state = pendingStates.get(participantId);
if (!bounds.containsKey(state)) {
bounds.put(state, new Bounds(0, 0));
}
@@ -178,7 +188,7 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
for (Message message : messages) {
State fromState = message.getFromState();
State toState = message.getToState();
- String transition = fromState + "-" + toState;
+ String transition = fromState.toString() + "-" + toState.toString();
int priority = Integer.MAX_VALUE;
if (stateTransitionPriorities.containsKey(transition)) {
@@ -203,7 +213,7 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
}
if (!bounds.containsKey(toState)) {
- bounds.put(toState.toString(), new Bounds(0, 0));
+ bounds.put(toState, new Bounds(0, 0));
}
// check lower bound of fromState
@@ -243,13 +253,13 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
* beginning and compute the stateConstraint instance once and re use at other places.
* Each IdealState must have a constraint object associated with it
*/
- private Map<String, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
+ private Map<State, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
RebalancerConfig rebalancerConfig, Cluster cluster) {
- Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
+ Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
- List<String> statePriorityList = stateModelDefinition.getStatesPriorityStringList();
- for (String state : statePriorityList) {
- String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state);
+ List<State> statePriorityList = stateModelDefinition.getStatesPriorityList();
+ for (State state : statePriorityList) {
+ String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state.toString());
int max = -1;
if ("N".equals(numInstancesPerState)) {
max = cluster.getLiveParticipantMap().size();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
index e45cd38..5bea5b4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
@@ -27,6 +27,9 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
import org.apache.helix.api.Resource;
import org.apache.helix.api.ResourceId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -115,7 +118,7 @@ public class NewMessageThrottleStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
Cluster cluster = event.getAttribute("ClusterDataCache");
- MessageSelectionStageOutput msgSelectionOutput =
+ NewMessageOutput msgSelectionOutput =
event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
@@ -124,34 +127,33 @@ public class NewMessageThrottleStage extends AbstractBaseStage {
+ ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
}
- MessageThrottleStageOutput output = new MessageThrottleStageOutput();
+ NewMessageOutput output = new NewMessageOutput();
// TODO fix it
- ClusterConstraints constraint = null;
- // cache.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
+ ClusterConstraints constraint = cluster.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
- // TODO fix it
- // if (constraint != null) {
- // // go through all pending messages, they should be counted but not throttled
- // for (String instance : cache.getLiveInstances().keySet()) {
- // throttle(throttleCounterMap, constraint, new ArrayList<Message>(cache.getMessages(instance)
- // .values()), false);
- // }
- // }
+ if (constraint != null) {
+ // go through all pending messages, they should be counted but not throttled
+ for (ParticipantId participantId : cluster.getLiveParticipantMap().keySet()) {
+ Participant liveParticipant = cluster.getLiveParticipantMap().get(participantId);
+ throttle(throttleCounterMap, constraint, new ArrayList<Message>(liveParticipant
+ .getMessageMap().values()), false);
+ }
+ }
// go through all new messages, throttle if necessary
// assume messages should be sorted by state transition priority in messageSelection stage
for (ResourceId resourceId : resourceMap.keySet()) {
Resource resource = resourceMap.get(resourceId);
// TODO fix it
- // for (Partition partition : resource.getPartitions()) {
- // List<Message> messages = msgSelectionOutput.getMessages(resourceName, partition);
- // if (constraint != null && messages != null && messages.size() > 0) {
- // messages = throttle(throttleCounterMap, constraint, messages, true);
- // }
- // output.addMessages(resourceName, partition, messages);
- // }
+ for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+ List<Message> messages = msgSelectionOutput.getMessages(resourceId, partitionId);
+ if (constraint != null && messages != null && messages.size() > 0) {
+ messages = throttle(throttleCounterMap, constraint, messages, true);
+ }
+ output.setMessages(resourceId, partitionId, messages);
+ }
}
event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
new file mode 100644
index 0000000..ed487a1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
@@ -0,0 +1,85 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.ClusterAccessor;
+import org.apache.helix.api.ClusterId;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.StateModelDefinitionAccessor;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.log4j.Logger;
+
+public class NewReadClusterDataStage extends AbstractBaseStage {
+ private static final Logger LOG = Logger.getLogger(NewReadClusterDataStage.class.getName());
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ long startTime = System.currentTimeMillis();
+ LOG.info("START ReadClusterDataStage.process()");
+
+ HelixManager manager = event.getAttribute("helixmanager");
+ if (manager == null) {
+ throw new StageException("HelixManager attribute value is null");
+ }
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ ClusterId clusterId = Id.cluster(manager.getClusterName());
+ ClusterAccessor clusterAccessor = new ClusterAccessor(clusterId, accessor);
+ StateModelDefinitionAccessor stateModelDefAccessor =
+ new StateModelDefinitionAccessor(clusterId, accessor);
+
+ Cluster cluster = clusterAccessor.readCluster();
+ Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
+ stateModelDefAccessor.readStateModelDefinitions();
+
+ ClusterStatusMonitor clusterStatusMonitor =
+ (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+ if (clusterStatusMonitor != null) {
+ // TODO fix it
+ // int disabledInstances = 0;
+ // int disabledPartitions = 0;
+ // for (InstanceConfig config : _cache._instanceConfigMap.values()) {
+ // if (config.getInstanceEnabled() == false) {
+ // disabledInstances++;
+ // }
+ // if (config.getDisabledPartitions() != null) {
+ // disabledPartitions += config.getDisabledPartitions().size();
+ // }
+ // }
+ // clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(),
+ // _cache._instanceConfigMap.size(), disabledInstances, disabledPartitions);
+ }
+
+ event.addAttribute("ClusterDataCache", cluster);
+ event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefMap);
+
+ long endTime = System.currentTimeMillis();
+ LOG.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java
deleted file mode 100644
index 3359b50..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.ResourceId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.rebalancer.NewRebalancer;
-import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.util.HelixUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Check and invoke custom implementation idealstate rebalancers.<br/>
- * If the resourceConfig has specified className of the customized rebalancer, <br/>
- * the rebalancer will be invoked to re-write the idealstate of the resource<br/>
- */
-public class NewRebalanceIdealStateStage extends AbstractBaseStage {
- private static final Logger LOG = Logger.getLogger(NewRebalanceIdealStateStage.class.getName());
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- Cluster cluster = event.getAttribute("ClusterDataCache");
- NewCurrentStateOutput currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
-
- // Map<String, IdealState> updatedIdealStates = new HashMap<String, IdealState>();
- for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
- // IdealState currentIdealState = idealStateMap.get(resourceName);
- Resource resource = cluster.getResource(resourceId);
- RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
- if (rebalancerConfig.getRebalancerMode() == RebalanceMode.USER_DEFINED
- && rebalancerConfig.getRebalancerClassName() != null) {
- String rebalancerClassName = rebalancerConfig.getRebalancerClassName();
- LOG.info("resource " + resourceId + " use idealStateRebalancer " + rebalancerClassName);
- try {
- NewRebalancer balancer =
- (NewRebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
-
- // TODO add state model def
- ResourceAssignment resourceAssignment =
- balancer.computeResourceMapping(resource, cluster, null);
-
- // TODO impl this
- // currentIdealState.updateFromAssignment(resourceAssignment);
- // updatedIdealStates.put(resourceName, currentIdealState);
- } catch (Exception e) {
- LOG.error("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
- }
- }
- }
-
- // TODO
- // if (updatedIdealStates.size() > 0) {
- // cache.getIdealStates().putAll(updatedIdealStates);
- // }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
index b8c1ecf..af23eb2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -21,7 +21,6 @@ package org.apache.helix.controller.stages;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Set;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
@@ -67,7 +66,7 @@ public class NewResourceComputationStage extends AbstractBaseStage {
// include all partitions from CurrentState as well since idealState might be removed
for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
- for ( ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
+ for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
CurrentState currentState = liveParticipant.getCurrentStateMap().get(resourceId);
if (currentState.getStateModelDefRef() == null) {
@@ -80,13 +79,15 @@ public class NewResourceComputationStage extends AbstractBaseStage {
// don't overwrite ideal state configs
if (!resourceBuilderMap.containsKey(resourceId)) {
- RebalancerConfig.Builder rebalancerConfigBuilder = new RebalancerConfig.Builder();
+ RebalancerConfig.Builder rebalancerConfigBuilder =
+ new RebalancerConfig.Builder(resourceId);
rebalancerConfigBuilder.stateModelDef(currentState.getStateModelDefId());
- rebalancerConfigBuilder.stateModelFactoryId(new StateModelFactoryId(currentState.getStateModelFactoryName()));
+ rebalancerConfigBuilder.stateModelFactoryId(new StateModelFactoryId(currentState
+ .getStateModelFactoryName()));
rebalancerConfigBuilder.bucketSize(currentState.getBucketSize());
rebalancerConfigBuilder.batchMessageMode(currentState.getBatchMessageMode());
- org.apache.helix.api.Resource.Builder resourceBuilder = new org.apache.helix.api.Resource.Builder(resourceId);
+ Resource.Builder resourceBuilder = new Resource.Builder(resourceId);
resourceBuilder.rebalancerConfig(rebalancerConfigBuilder.build());
resourceBuilderMap.put(resourceId, resourceBuilder);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
index 95862ae..2b8a0c8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
@@ -31,8 +31,10 @@ import org.apache.helix.HelixManagerProperties;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Id;
import org.apache.helix.api.Participant;
import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
import org.apache.helix.api.Resource;
import org.apache.helix.api.ResourceId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -52,7 +54,7 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
HelixManager manager = event.getAttribute("helixmanager");
Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
- MessageThrottleStageOutput messageOutput =
+ NewMessageOutput messageOutput =
event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
Cluster cluster = event.getAttribute("ClusterDataCache");
Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
@@ -67,11 +69,10 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
List<Message> messagesToSend = new ArrayList<Message>();
for (ResourceId resourceId : resourceMap.keySet()) {
Resource resource = resourceMap.get(resourceId);
- // TODO fix it
- // for (Partition partition : resource.getPartitions()) {
- // List<Message> messages = messageOutput.getMessages(resourceName, partition);
- // messagesToSend.addAll(messages);
- // }
+ for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+ List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
+ messagesToSend.addAll(messages);
+ }
}
List<Message> outputMessages =
@@ -95,9 +96,9 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
while (iter.hasNext()) {
Message message = iter.next();
ResourceId resourceId = message.getResourceId();
- Resource resource = resourceMap.get(resourceId.stringify());
+ Resource resource = resourceMap.get(resourceId);
- String participantId = message.getTgtName();
+ ParticipantId participantId = Id.participant(message.getTgtName());
Participant liveParticipant = liveParticipantMap.get(participantId);
String participantVersion = null;
if (liveParticipant != null) {
@@ -141,10 +142,10 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
+ " transit " + message.getPartitionId() + "|" + message.getPartitionIds() + " from:"
+ message.getFromState() + " to:" + message.getToState());
- // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to " +
- // message.getTgtName()
- // + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
- // + " from: " + message.getFromState() + " to: " + message.getToState());
+ // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to "
+ // + message.getTgtName() + " transit " + message.getPartitionId() + "|"
+ // + message.getPartitionId() + " from: " + message.getFromState() + " to: "
+ // + message.getToState());
keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
index f16bb39..b6facea 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
@@ -29,6 +29,7 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.util.HelixUtil;
import org.apache.log4j.Logger;
@@ -66,7 +67,9 @@ public class RebalanceIdealStateStage extends AbstractBaseStage {
ResourceAssignment resourceAssignment =
balancer.computeResourceMapping(resource, currentIdealState, currentStateOutput,
cache);
- currentIdealState.updateFromAssignment(resourceAssignment);
+ StateModelDefinition stateModelDef =
+ cache.getStateModelDef(currentIdealState.getStateModelDefRef());
+ currentIdealState.updateFromAssignment(resourceAssignment, stateModelDef);
updatedIdealStates.put(resourceName, currentIdealState);
} catch (Exception e) {
LOG.error("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index 087d2fb..835af6e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -410,6 +410,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
switch (type) {
case EXTERNALVIEW:
if (value.getBucketSize() == 0) {
+ System.out.println("set: " + value.getRecord());
records.add(value.getRecord());
} else {
_baseDataAccessor.remove(path, options);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 0f690db..7fb641f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -712,6 +712,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
for (int i = 0; i < paths.size(); i++) {
success[i] = (results.get(i)._retCode == RetCode.OK);
}
+
+ for (int i = 0; i < paths.size(); i++) {
+ String path = paths.get(i);
+ T record = records.get(i);
+ if (path.indexOf("EXTERNALVIEW") != -1) {
+ System.out.println("path: " + path + ", record: " + record + ", success: " + success[i]);
+ }
+ }
return success;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index ffff483..24ec7c9 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -29,6 +29,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.Id;
@@ -38,14 +39,18 @@ import org.apache.helix.api.RebalancerRef;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.StateModelFactoryId;
import org.apache.helix.controller.rebalancer.Rebalancer;
import org.apache.log4j.Logger;
import com.google.common.base.Function;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
/**
* The ideal states of all partitions in a resource
@@ -459,6 +464,14 @@ public class IdealState extends HelixProperty {
}
/**
+ * Set the state model associated with this resource
+ * @param stateModel state model identifier
+ */
+ public void setStateModelDefId(StateModelDefId stateModelDefId) {
+ setStateModelDefRef(stateModelDefId.stringify());
+ }
+
+ /**
* Set the number of partitions of this resource
* @param numPartitions the number of partitions
*/
@@ -540,6 +553,14 @@ public class IdealState extends HelixProperty {
}
/**
+ * Set the state model factory associated with this resource
+ * @param name state model factory id
+ */
+ public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+ setStateModelFactoryName(stateModelFactoryId.stringify());
+ }
+
+ /**
* Get the state model factory associated with this resource
* @return state model factory name
*/
@@ -549,6 +570,14 @@ public class IdealState extends HelixProperty {
}
/**
+ * Get the state model factory associated with this resource
+ * @return state model factory id
+ */
+ public StateModelFactoryId getStateModelFactoryId() {
+ return Id.stateModelFactory(getStateModelFactoryName());
+ }
+
+ /**
* Set the frequency with which to rebalance
* @return the rebalancing timer period
*/
@@ -613,13 +642,39 @@ public class IdealState extends HelixProperty {
return _record.getSimpleField(IdealStateProperty.INSTANCE_GROUP_TAG.toString());
}
- public void updateFromAssignment(ResourceAssignment assignment) {
+ /**
+ * Update the ideal state from a ResourceAssignment computed during a rebalance
+ * @param assignment the new resource assignment
+ * @param stateModelDef state model of the resource
+ */
+ public void updateFromAssignment(ResourceAssignment assignment, StateModelDefinition stateModelDef) {
+ // clear all preference lists and maps
_record.getMapFields().clear();
_record.getListFields().clear();
+
+ // assign a partition at a time
for (PartitionId partition : assignment.getMappedPartitions()) {
+ List<ParticipantId> preferenceList = new ArrayList<ParticipantId>();
+ Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
+
+ // invert the map to get in state order
Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partition);
- setParticipantStateMap(partition, replicaMap);
- setPreferenceList(partition, new ArrayList<ParticipantId>(replicaMap.keySet()));
+ ListMultimap<State, ParticipantId> inverseMap = ArrayListMultimap.create();
+ Multimaps.invertFrom(Multimaps.forMap(replicaMap), inverseMap);
+
+ // update the ideal state in order of state priorities
+ for (State state : stateModelDef.getStatesPriorityList()) {
+ if (!state.equals(State.from(HelixDefinedState.DROPPED))
+ && !state.equals(State.from(HelixDefinedState.ERROR))) {
+ List<ParticipantId> stateParticipants = inverseMap.get(state);
+ for (ParticipantId participant : stateParticipants) {
+ preferenceList.add(participant);
+ participantStateMap.put(participant, state);
+ }
+ }
+ }
+ setPreferenceList(partition, preferenceList);
+ setParticipantStateMap(partition, participantStateMap);
}
}
@@ -674,12 +729,13 @@ public class IdealState extends HelixProperty {
if (rawPreferenceList == null) {
return Collections.emptyList();
}
- return Lists.transform(rawPreferenceList, new Function<String, ParticipantId>() {
- @Override
- public ParticipantId apply(String participantName) {
- return Id.participant(participantName);
- }
- });
+ return Lists.transform(new ArrayList<String>(rawPreferenceList),
+ new Function<String, ParticipantId>() {
+ @Override
+ public ParticipantId apply(String participantName) {
+ return Id.participant(participantName);
+ }
+ });
}
/**
@@ -710,12 +766,13 @@ public class IdealState extends HelixProperty {
if (preferenceList == null) {
return Collections.emptyList();
}
- return Lists.transform(preferenceList, new Function<ParticipantId, String>() {
- @Override
- public String apply(ParticipantId participantId) {
- return participantId.stringify();
- }
- });
+ return Lists.transform(new ArrayList<ParticipantId>(preferenceList),
+ new Function<ParticipantId, String>() {
+ @Override
+ public String apply(ParticipantId participantId) {
+ return participantId.stringify();
+ }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/test/java/org/apache/helix/api/TestId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestId.java b/helix-core/src/test/java/org/apache/helix/api/TestId.java
index 05da8a3..57c01e7 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestId.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestId.java
@@ -41,6 +41,7 @@ public class TestId {
final String sessionName = "Session";
final String processName = "Process";
final String stateModelName = "StateModel";
+ final String stateModelFactoryName = "StateModelFactory";
final String messageName = "Message";
Assert.assertEquals(Id.resource(resourceName).stringify(), resourceName);
Assert.assertEquals(Id.cluster(clusterName).stringify(), clusterName);
@@ -48,6 +49,8 @@ public class TestId {
Assert.assertEquals(Id.session(sessionName).stringify(), sessionName);
Assert.assertEquals(Id.process(processName).stringify(), processName);
Assert.assertEquals(Id.stateModelDef(stateModelName).stringify(), stateModelName);
+ Assert.assertEquals(Id.stateModelFactory(stateModelFactoryName).stringify(),
+ stateModelFactoryName);
Assert.assertEquals(Id.message(messageName).stringify(), messageName);
}
@@ -72,6 +75,7 @@ public class TestId {
Assert.assertNull(Id.session(null));
Assert.assertNull(Id.process(null));
Assert.assertNull(Id.stateModelDef(null));
+ Assert.assertNull(Id.stateModelFactory(null));
Assert.assertNull(Id.message(null));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index 98ae60b..cc26596 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -20,20 +20,33 @@ package org.apache.helix.api;
*/
import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.controller.rebalancer.NewAutoRebalancer;
+import org.apache.helix.controller.rebalancer.NewCustomRebalancer;
+import org.apache.helix.controller.rebalancer.NewSemiAutoRebalancer;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.NewBestPossibleStateOutput;
+import org.apache.helix.controller.stages.NewCurrentStateOutput;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.mock.controller.ClusterController;
import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.StateModelConfigGenerator;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -42,6 +55,7 @@ import org.testng.annotations.Test;
public class TestNewStages extends ZkUnitTestBase {
final int n = 2;
final int p = 8;
+ final int r = 2;
MockParticipant[] _participants = new MockParticipant[n];
ClusterController _controller;
@@ -88,6 +102,146 @@ public class TestNewStages extends ZkUnitTestBase {
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}
+ @Test
+ public void testBasicBestPossibleStateCalcStage() {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String testName = className + "_" + methodName;
+
+ System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+ // Set up the event
+ ClusterAccessor clusterAccessor = new ClusterAccessor(_clusterId, _dataAccessor);
+ Cluster cluster = clusterAccessor.readCluster();
+ ClusterEvent event = new ClusterEvent(testName);
+ event.addAttribute(AttributeName.CURRENT_STATE.toString(), new NewCurrentStateOutput());
+ event.addAttribute(AttributeName.RESOURCES.toString(), cluster.getResourceMap());
+ event.addAttribute("ClusterDataCache", cluster);
+ Map<StateModelDefId, StateModelDefinition> stateModelMap =
+ new HashMap<StateModelDefId, StateModelDefinition>();
+ stateModelMap.put(Id.stateModelDef("MasterSlave"), new StateModelDefinition(
+ StateModelConfigGenerator.generateConfigForMasterSlave()));
+ event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelMap);
+
+ // Run the stage
+ try {
+ new NewBestPossibleStateCalcStage().process(event);
+ } catch (Exception e) {
+ Assert.fail(e.toString());
+ }
+
+ // Verify the result
+ NewBestPossibleStateOutput bestPossibleStateOutput =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+ Assert.assertNotNull(bestPossibleStateOutput);
+ ResourceId resourceId = new ResourceId("TestDB0");
+ ResourceAssignment assignment = bestPossibleStateOutput.getResourceAssignment(resourceId);
+ Assert.assertNotNull(assignment);
+ Resource resource = cluster.getResource(resourceId);
+ verifySemiAutoRebalance(resource, assignment);
+
+ System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testClusterRebalancers() {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String testName = className + "_" + methodName;
+
+ System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+ ClusterAccessor clusterAccessor = new ClusterAccessor(_clusterId, _dataAccessor);
+ Cluster cluster = clusterAccessor.readCluster();
+
+ ResourceId resourceId = new ResourceId("TestDB0");
+ Resource resource = cluster.getResource(resourceId);
+ StateModelDefinition masterSlave =
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+ NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
+ ResourceAssignment fullAutoResult =
+ new NewAutoRebalancer().computeResourceMapping(resource, cluster, masterSlave,
+ currentStateOutput);
+ verifyFullAutoRebalance(resource, fullAutoResult);
+ ResourceAssignment semiAutoResult =
+ new NewSemiAutoRebalancer().computeResourceMapping(resource, cluster, masterSlave,
+ currentStateOutput);
+ verifySemiAutoRebalance(resource, semiAutoResult);
+ ResourceAssignment customResult =
+ new NewCustomRebalancer().computeResourceMapping(resource, cluster, masterSlave,
+ currentStateOutput);
+ verifyCustomRebalance(resource, customResult);
+
+ System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ /**
+ * Check that a full auto rebalance is run, and at least one replica per partition is mapped
+ * @param resource the resource to verify
+ * @param assignment the assignment to verify
+ */
+ private void verifyFullAutoRebalance(Resource resource, ResourceAssignment assignment) {
+ Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getPartitionSet().size());
+ for (PartitionId partitionId : assignment.getMappedPartitions()) {
+ Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
+ Assert.assertTrue(replicaMap.size() <= r);
+ Assert.assertTrue(replicaMap.size() > 0);
+ boolean hasMaster = false;
+ for (State state : replicaMap.values()) {
+ if (state.equals(State.from("MASTER"))) {
+ Assert.assertFalse(hasMaster);
+ hasMaster = true;
+ }
+ }
+ Assert.assertTrue(hasMaster);
+ }
+ }
+
+ /**
+ * Check that a semi auto rebalance is run, and all partitions are mapped by preference list
+ * @param resource the resource to verify
+ * @param assignment the assignment to verify
+ */
+ private void verifySemiAutoRebalance(Resource resource, ResourceAssignment assignment) {
+ Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getPartitionSet().size());
+ RebalancerConfig config = resource.getRebalancerConfig();
+ for (PartitionId partitionId : assignment.getMappedPartitions()) {
+ List<ParticipantId> preferenceList = config.getPreferenceList(partitionId);
+ Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
+ Assert.assertEquals(replicaMap.size(), preferenceList.size());
+ Assert.assertEquals(replicaMap.size(), r);
+ boolean hasMaster = false;
+ for (ParticipantId participant : preferenceList) {
+ Assert.assertTrue(replicaMap.containsKey(participant));
+ State state = replicaMap.get(participant);
+ if (state.equals(State.from("MASTER"))) {
+ Assert.assertFalse(hasMaster);
+ hasMaster = true;
+ }
+ }
+ Assert.assertEquals(replicaMap.get(preferenceList.get(0)), State.from("MASTER"));
+ }
+ }
+
+ /**
+ * For vanilla customized rebalancing, the resource assignment should match the preference map
+ * @param resource the resource to verify
+ * @param assignment the assignment to verify
+ */
+ private void verifyCustomRebalance(Resource resource, ResourceAssignment assignment) {
+ Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getPartitionSet().size());
+ RebalancerConfig config = resource.getRebalancerConfig();
+ for (PartitionId partitionId : assignment.getMappedPartitions()) {
+ Map<ParticipantId, State> preferenceMap = config.getPreferenceMap(partitionId);
+ Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
+ Assert.assertEquals(replicaMap.size(), preferenceMap.size());
+ Assert.assertEquals(replicaMap.size(), r);
+ for (ParticipantId participant : preferenceMap.keySet()) {
+ Assert.assertTrue(replicaMap.containsKey(participant));
+ Assert.assertEquals(replicaMap.get(participant), preferenceMap.get(participant));
+ }
+ }
+ }
@BeforeClass
public void beforeClass() throws Exception {
@@ -106,7 +260,7 @@ public class TestNewStages extends ZkUnitTestBase {
1, // resources
p, // partitions per resource
n, // number of nodes
- 2, // replicas
+ r, // replicas
"MasterSlave", true); // do rebalance
_controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
[2/2] git commit: helix rebalancer refactor using logical models
Posted by zz...@apache.org.
helix rebalancer refactor using logical models
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/5d0e048e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/5d0e048e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/5d0e048e
Branch: refs/heads/helix-logical-model
Commit: 5d0e048e134a6f21200a67618fdbe852ca1d7592
Parents: 9c7de4c
Author: zzhang <zz...@apache.org>
Authored: Thu Sep 5 13:11:36 2013 -0700
Committer: zzhang <zz...@apache.org>
Committed: Thu Sep 5 13:11:36 2013 -0700
----------------------------------------------------------------------
.../main/java/org/apache/helix/api/Cluster.java | 23 +-
.../org/apache/helix/api/ClusterAccessor.java | 20 +-
.../apache/helix/api/ParticipantAccessor.java | 8 +-
.../org/apache/helix/api/RebalancerConfig.java | 117 +++++++---
.../org/apache/helix/api/RebalancerRef.java | 20 +-
.../java/org/apache/helix/api/Resource.java | 70 +++---
.../apache/helix/api/SchedulerTaskConfig.java | 47 ++++
.../controller/GenericHelixController.java | 31 ++-
.../rebalancer/NewAutoRebalancer.java | 35 ++-
.../rebalancer/NewCustomRebalancer.java | 13 +-
.../controller/rebalancer/NewRebalancer.java | 4 +-
.../rebalancer/NewSemiAutoRebalancer.java | 5 +-
.../util/NewConstraintBasedAssignment.java | 13 +-
.../helix/controller/stages/AttributeName.java | 3 +-
.../stages/NewBestPossibleStateCalcStage.java | 104 +++++----
.../stages/NewBestPossibleStateOutput.java | 19 +-
.../stages/NewCompatibilityCheckStage.java | 68 ++++++
.../stages/NewCurrentStateOutput.java | 19 +-
.../stages/NewMessageGenerationPhase.java | 233 -------------------
.../stages/NewMessageGenerationStage.java | 211 +++++++++++++++++
.../controller/stages/NewMessageOutput.java | 75 ++++++
.../stages/NewMessageSelectionStage.java | 78 ++++---
.../stages/NewMessageThrottleStage.java | 40 ++--
.../stages/NewReadClusterDataStage.java | 85 +++++++
.../stages/NewRebalanceIdealStateStage.java | 84 -------
.../stages/NewResourceComputationStage.java | 11 +-
.../stages/NewTaskAssignmentStage.java | 25 +-
.../stages/RebalanceIdealStateStage.java | 5 +-
.../helix/manager/zk/ZKHelixDataAccessor.java | 1 +
.../helix/manager/zk/ZkBaseDataAccessor.java | 8 +
.../java/org/apache/helix/model/IdealState.java | 87 +++++--
.../test/java/org/apache/helix/api/TestId.java | 4 +
.../org/apache/helix/api/TestNewStages.java | 156 ++++++++++++-
33 files changed, 1158 insertions(+), 564 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index 193b238..e890fb4 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -22,6 +22,9 @@ package org.apache.helix.api;
import java.util.Collections;
import java.util.Map;
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+
import com.google.common.collect.ImmutableMap;
/**
@@ -59,6 +62,8 @@ public class Cluster {
private final ClusterConfig _config = null;
+ private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+
/**
* construct a cluster
* @param id
@@ -69,7 +74,7 @@ public class Cluster {
*/
public Cluster(ClusterId id, Map<ResourceId, Resource> resourceMap,
Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
- ControllerId leaderId) {
+ ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap) {
_id = id;
@@ -89,6 +94,8 @@ public class Cluster {
_leaderId = leaderId;
+ _constraintMap = ImmutableMap.copyOf(constraintMap);
+
// TODO impl this when we persist controllers and spectators on zookeeper
_controllerMap = ImmutableMap.copyOf(controllerMap);
_spectatorMap = Collections.emptyMap();
@@ -159,4 +166,18 @@ public class Cluster {
return _spectatorMap;
}
+ /**
+ * @return
+ */
+ public Map<ConstraintType, ClusterConstraints> getConstraintMap() {
+ return _constraintMap;
+ }
+
+ /**
+ * @param type
+ * @return
+ */
+ public ClusterConstraints getConstraint(ConstraintType type) {
+ return _constraintMap.get(type);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
index 5902a24..04d5831 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
@@ -27,6 +27,8 @@ import java.util.Map;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
@@ -139,12 +141,18 @@ public class ClusterAccessor {
LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+ /**
+ * map of constraint-type to constraints
+ */
+ Map<String, ClusterConstraints> constraintMap =
+ _accessor.getChildValuesMap(_keyBuilder.constraints());
+
Map<ResourceId, Resource> resourceMap = new HashMap<ResourceId, Resource>();
for (String resourceName : idealStateMap.keySet()) {
IdealState idealState = idealStateMap.get(resourceName);
// TODO pass resource assignment
- ResourceId resourceId = new ResourceId(resourceName);
+ ResourceId resourceId = Id.resource(resourceName);
resourceMap.put(resourceId, new Resource(resourceId, idealState, null));
}
@@ -167,7 +175,15 @@ public class ClusterAccessor {
controllerMap.put(leaderId, new Controller(leaderId, leader, true));
}
- return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId);
+ Map<ConstraintType, ClusterConstraints> clusterConstraintMap =
+ new HashMap<ConstraintType, ClusterConstraints>();
+ for (String constraintType : constraintMap.keySet()) {
+ clusterConstraintMap.put(ConstraintType.valueOf(constraintType),
+ constraintMap.get(constraintType));
+ }
+
+ return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
+ clusterConstraintMap);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
index d2ae927..da2c433 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
@@ -307,9 +307,11 @@ public class ParticipantAccessor {
}
Map<MessageId, Message> msgMap = new HashMap<MessageId, Message>();
- for (String msgId : instanceMsgMap.keySet()) {
- Message message = instanceMsgMap.get(msgId);
- msgMap.put(new MessageId(msgId), message);
+ if (instanceMsgMap != null) {
+ for (String msgId : instanceMsgMap.keySet()) {
+ Message message = instanceMsgMap.get(msgId);
+ msgMap.put(new MessageId(msgId), message);
+ }
}
Map<ResourceId, CurrentState> curStateMap = new HashMap<ResourceId, CurrentState>();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
index 2baf63b..4ac254d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
@@ -19,18 +19,25 @@ package org.apache.helix.api;
* under the License.
*/
-import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.ResourceAssignment;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Captures the configuration properties necessary for rebalancing
+ */
public class RebalancerConfig {
private final RebalanceMode _rebalancerMode;
private final RebalancerRef _rebalancerRef;
private final StateModelDefId _stateModelDefId;
private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
+ private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
private final ResourceAssignment _resourceAssignment;
private final int _replicaCount;
private final String _participantGroupTag;
@@ -39,20 +46,38 @@ public class RebalancerConfig {
private final boolean _batchMessageMode;
private final StateModelFactoryId _stateModelFactoryId;
- public RebalancerConfig(RebalanceMode mode, RebalancerRef rebalancerRef,
- StateModelDefId stateModelDefId, ResourceAssignment resourceAssignment, int bucketSize,
- boolean batchMessageMode, StateModelFactoryId stateModelFactoryId) {
- _rebalancerMode = mode;
- _rebalancerRef = rebalancerRef;
- _stateModelDefId = stateModelDefId;
+ /**
+ * Instantiate the configuration of a rebalance task
+ * @param idealState the physical ideal state
+ * @param resourceAssignment last mapping of a resource
+ */
+ public RebalancerConfig(IdealState idealState, ResourceAssignment resourceAssignment) {
+ _rebalancerMode = idealState.getRebalanceMode();
+ _rebalancerRef = idealState.getRebalancerRef();
+ _stateModelDefId = idealState.getStateModelDefId();
+ _replicaCount = Integer.parseInt(idealState.getReplicas());
+ _participantGroupTag = idealState.getInstanceGroupTag();
+ _maxPartitionsPerParticipant = idealState.getMaxPartitionsPerInstance();
+ _bucketSize = idealState.getBucketSize();
+ _batchMessageMode = idealState.getBatchMessageMode();
+ _stateModelFactoryId = idealState.getStateModelFactoryId();
+
+ // Build preference lists and maps
+ ImmutableMap.Builder<PartitionId, List<ParticipantId>> preferenceLists =
+ new ImmutableMap.Builder<PartitionId, List<ParticipantId>>();
+ ImmutableMap.Builder<PartitionId, Map<ParticipantId, State>> preferenceMaps =
+ new ImmutableMap.Builder<PartitionId, Map<ParticipantId, State>>();
+ for (PartitionId partitionId : idealState.getPartitionSet()) {
+ preferenceLists.put(partitionId,
+ ImmutableList.copyOf(idealState.getPreferenceList(partitionId)));
+ preferenceMaps.put(partitionId,
+ ImmutableMap.copyOf(idealState.getParticipantStateMap(partitionId)));
+ }
+ _preferenceLists = preferenceLists.build();
+ _preferenceMaps = preferenceMaps.build();
+
+ // Leave the resource assignment as is
_resourceAssignment = resourceAssignment;
- _preferenceLists = Collections.emptyMap(); // TODO: stub
- _replicaCount = 0; // TODO: stub
- _participantGroupTag = null; // TODO: stub
- _maxPartitionsPerParticipant = Integer.MAX_VALUE; // TODO: stub
- _bucketSize = bucketSize;
- _batchMessageMode = batchMessageMode;
- _stateModelFactoryId = stateModelFactoryId;
}
/**
@@ -97,6 +122,15 @@ public class RebalancerConfig {
}
/**
+ * Get the preference map of participants and states for a given partition
+ * @param partitionId the partition to look up
+ * @return a mapping of participant to state for each replica
+ */
+ public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
+ return _preferenceMaps.get(partitionId);
+ }
+
+ /**
* Get the number of replicas each partition should have
* @return replica count
*/
@@ -144,29 +178,27 @@ public class RebalancerConfig {
return _stateModelFactoryId;
}
- // TODO impl this
- public String getRebalancerClassName() {
- throw new UnsupportedOperationException("impl this");
- }
-
/**
* Assembles a RebalancerConfig
*/
public static class Builder {
- private RebalanceMode _mode = RebalanceMode.NONE;
- private RebalancerRef _rebalancerRef;
- private StateModelDefId _stateModelDefId;
+ private final IdealState _idealState;
private ResourceAssignment _resourceAssignment;
- private int _bucketSize;
- private boolean _batchMessageMode;
- private StateModelFactoryId _stateModelFactoryId;
+
+ /**
+ * Configure the rebalancer for a resource
+ * @param resourceId the resource to rebalance
+ */
+ public Builder(ResourceId resourceId) {
+ _idealState = new IdealState(resourceId);
+ }
/**
* Set the rebalancer mode
* @param mode {@link RebalanceMode}
*/
public Builder rebalancerMode(RebalanceMode mode) {
- _mode = mode;
+ _idealState.setRebalanceMode(mode);
return this;
}
@@ -176,7 +208,7 @@ public class RebalancerConfig {
* @return Builder
*/
public Builder rebalancer(RebalancerRef rebalancerRef) {
- _rebalancerRef = rebalancerRef;
+ _idealState.setRebalancerRef(rebalancerRef);
return this;
}
@@ -186,7 +218,7 @@ public class RebalancerConfig {
* @return Builder
*/
public Builder stateModelDef(StateModelDefId stateModelDefId) {
- _stateModelDefId = stateModelDefId;
+ _idealState.setStateModelDefId(stateModelDefId);
return this;
}
@@ -206,7 +238,7 @@ public class RebalancerConfig {
* @return Builder
*/
public Builder bucketSize(int bucketSize) {
- _bucketSize = bucketSize;
+ _idealState.setBucketSize(bucketSize);
return this;
}
@@ -216,7 +248,27 @@ public class RebalancerConfig {
* @return Builder
*/
public Builder batchMessageMode(boolean batchMessageMode) {
- _batchMessageMode = batchMessageMode;
+ _idealState.setBatchMessageMode(batchMessageMode);
+ return this;
+ }
+
+ /**
+ * Set the number of replicas
+ * @param replicaCount number of replicas
+ * @return Builder
+ */
+ public Builder replicaCount(int replicaCount) {
+ _idealState.setReplicas(Integer.toString(replicaCount));
+ return this;
+ }
+
+ /**
+ * Set the maximum number of partitions to assign to any participant
+ * @param maxPartitions
+ * @return Builder
+ */
+ public Builder maxPartitionsPerParticipant(int maxPartitions) {
+ _idealState.setMaxPartitionsPerInstance(maxPartitions);
return this;
}
@@ -226,7 +278,7 @@ public class RebalancerConfig {
* @return Builder
*/
public Builder stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
- _stateModelFactoryId = stateModelFactoryId;
+ _idealState.setStateModelFactoryId(stateModelFactoryId);
return this;
}
@@ -235,8 +287,7 @@ public class RebalancerConfig {
* @return a fully defined rebalancer configuration
*/
public RebalancerConfig build() {
- return new RebalancerConfig(_mode, _rebalancerRef, _stateModelDefId, _resourceAssignment,
- _bucketSize, _batchMessageMode, _stateModelFactoryId);
+ return new RebalancerConfig(_idealState, _resourceAssignment);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
index 7f33be7..5f22898 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
@@ -19,10 +19,13 @@ package org.apache.helix.api;
* under the License.
*/
-import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.rebalancer.NewRebalancer;
import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
public class RebalancerRef {
+ private static final Logger LOG = Logger.getLogger(RebalancerRef.class);
+
private final String _rebalancerClassName;
public RebalancerRef(String rebalancerClassName) {
@@ -32,18 +35,11 @@ public class RebalancerRef {
/**
* @return
*/
- public Rebalancer getRebalancer() {
+ public NewRebalancer getRebalancer() {
try {
- return (Rebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
- } catch (InstantiationException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (IllegalAccessException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ return (NewRebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
+ } catch (Exception e) {
+ LOG.warn("Exception while invoking custom rebalancer class:" + _rebalancerClassName, e);
}
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index f976fad..0c8b730 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
import org.apache.helix.model.ResourceAssignment;
import com.google.common.collect.ImmutableMap;
@@ -37,11 +38,11 @@ import com.google.common.collect.ImmutableSet;
public class Resource {
private final ResourceId _id;
private final RebalancerConfig _rebalancerConfig;
+ private final SchedulerTaskConfig _schedulerTaskConfig;
private final Map<PartitionId, Partition> _partitionMap;
private final ExternalView _externalView;
- private final ExternalView _pendingExternalView;
/**
* Construct a resource
@@ -50,19 +51,37 @@ public class Resource {
*/
public Resource(ResourceId id, IdealState idealState, ResourceAssignment resourceAssignment) {
_id = id;
- _rebalancerConfig = new RebalancerConfig(idealState.getRebalanceMode(), idealState.getRebalancerRef(),
- idealState.getStateModelDefId(), resourceAssignment, idealState.getBucketSize(),
- idealState.getBatchMessageMode(), Id.stateModelFactory(
- idealState.getStateModelFactoryName()));
+ _rebalancerConfig = new RebalancerConfig(idealState, resourceAssignment);
Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
+ Map<PartitionId, Map<String, String>> schedulerTaskConfig =
+ new HashMap<PartitionId, Map<String, String>>();
+ Map<String, Integer> transitionTimeoutMap = new HashMap<String, Integer>();
for (PartitionId partitionId : idealState.getPartitionSet()) {
partitionMap.put(partitionId, new Partition(partitionId));
+
+ // TODO refactor it
+ Map<String, String> taskConfigMap = idealState.getRecord().getMapField(partitionId.stringify());
+ if (taskConfigMap != null) {
+ schedulerTaskConfig.put(partitionId, taskConfigMap);
+ }
+
+ // TODO refactor it
+ for (String simpleKey : idealState.getRecord().getSimpleFields().keySet()) {
+ if (simpleKey.indexOf("_" + Message.Attributes.TIMEOUT) != -1) {
+ try {
+ int timeout = Integer.parseInt(idealState.getRecord().getSimpleField(simpleKey));
+ transitionTimeoutMap.put(simpleKey, timeout);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
}
_partitionMap = ImmutableMap.copyOf(partitionMap);
+ _schedulerTaskConfig = new SchedulerTaskConfig(transitionTimeoutMap, schedulerTaskConfig);
_externalView = null;
- _pendingExternalView = null; // TODO: stub
}
/**
@@ -75,12 +94,12 @@ public class Resource {
*/
public Resource(ResourceId id, Map<PartitionId, Partition> partitionMap,
ExternalView externalView,
- ExternalView pendingExternalView, RebalancerConfig rebalancerConfig) {
+ RebalancerConfig rebalancerConfig, SchedulerTaskConfig schedulerTaskConfig) {
_id = id;
_partitionMap = ImmutableMap.copyOf(partitionMap);
_externalView = externalView;
- _pendingExternalView = pendingExternalView;
_rebalancerConfig = rebalancerConfig;
+ _schedulerTaskConfig = schedulerTaskConfig;
}
/**
@@ -116,14 +135,6 @@ public class Resource {
return _externalView;
}
- /**
- * Get the pending external view of the resource based on unprocessed messages
- * @return the external view of the resource
- */
- public ExternalView getPendingExternalView() {
- return _pendingExternalView;
- }
-
public RebalancerConfig getRebalancerConfig() {
return _rebalancerConfig;
}
@@ -132,6 +143,10 @@ public class Resource {
return _id;
}
+ public SchedulerTaskConfig getSchedulerTaskConfig() {
+ return _schedulerTaskConfig;
+ }
+
/**
* Assembles a Resource
*/
@@ -139,8 +154,8 @@ public class Resource {
private final ResourceId _id;
private final Map<PartitionId, Partition> _partitionMap;
private ExternalView _externalView;
- private ExternalView _pendingExternalView;
private RebalancerConfig _rebalancerConfig;
+ private SchedulerTaskConfig _schedulerTaskConfig;
/**
* Build a Resource with an id
@@ -184,22 +199,21 @@ public class Resource {
}
/**
- * Set the pending external view of this resource
- * @param extView replica placements as a result of pending messages
+ * Set the rebalancer configuration
+ * @param rebalancerConfig properties of interest for rebalancing
* @return Builder
*/
- public Builder pendingExternalView(ExternalView pendingExtView) {
- _pendingExternalView = pendingExtView;
+ public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
+ _rebalancerConfig = rebalancerConfig;
return this;
}
/**
- * Set the rebalancer configuration
- * @param rebalancerConfig properties of interest for rebalancing
- * @return Builder
+ * @param schedulerTaskConfig
+ * @return
*/
- public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
- _rebalancerConfig = rebalancerConfig;
+ public Builder schedulerTaskConfig(SchedulerTaskConfig schedulerTaskConfig) {
+ _schedulerTaskConfig = schedulerTaskConfig;
return this;
}
@@ -208,8 +222,8 @@ public class Resource {
* @return instantiated Resource
*/
public Resource build() {
- return new Resource(_id, _partitionMap, _externalView, _pendingExternalView,
- _rebalancerConfig);
+ return new Resource(_id, _partitionMap, _externalView, _rebalancerConfig,
+ _schedulerTaskConfig);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/api/SchedulerTaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/SchedulerTaskConfig.java b/helix-core/src/main/java/org/apache/helix/api/SchedulerTaskConfig.java
new file mode 100644
index 0000000..ac7cb3a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/SchedulerTaskConfig.java
@@ -0,0 +1,47 @@
+package org.apache.helix.api;
+
+import java.util.Map;
+
+import org.apache.helix.model.Message;
+
+import com.google.common.collect.ImmutableMap;
+
+public class SchedulerTaskConfig {
+ // TODO refactor using Transition logical model
+ private final Map<String, Integer> _transitionTimeoutMap;
+
+ // TODO refactor this when understand inner message format
+ private final Map<PartitionId, Map<String, String>> _schedulerTaskConfig;
+
+ public SchedulerTaskConfig(Map<String, Integer> transitionTimeoutMap,
+ Map<PartitionId, Map<String, String>> schedulerTaskConfig) {
+ _transitionTimeoutMap = ImmutableMap.copyOf(transitionTimeoutMap);
+ _schedulerTaskConfig = ImmutableMap.copyOf(schedulerTaskConfig);
+ }
+
+ public Map<String, String> getTaskConfig(PartitionId partitionId) {
+ return _schedulerTaskConfig.get(partitionId);
+ }
+
+ public Integer getTransitionTimeout(String transition) {
+ return _transitionTimeoutMap.get(transition);
+ }
+
+ public Integer getTimeout(String transition, PartitionId partitionId) {
+ Integer timeout = getTransitionTimeout(transition);
+ if (timeout == null) {
+ Map<String, String> taskConfig = getTaskConfig(partitionId);
+ if (taskConfig != null) {
+ String timeoutStr = taskConfig.get(Message.Attributes.TIMEOUT.toString());
+ if (timeoutStr != null) {
+ try {
+ timeout = Integer.parseInt(timeoutStr);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+ }
+ return timeout;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 314733f..6570dc4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -52,6 +52,16 @@ import org.apache.helix.controller.stages.ExternalViewComputeStage;
import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.NewCompatibilityCheckStage;
+import org.apache.helix.controller.stages.NewCurrentStateComputationStage;
+import org.apache.helix.controller.stages.NewExternalViewComputeStage;
+import org.apache.helix.controller.stages.NewMessageGenerationStage;
+import org.apache.helix.controller.stages.NewMessageSelectionStage;
+import org.apache.helix.controller.stages.NewMessageThrottleStage;
+import org.apache.helix.controller.stages.NewReadClusterDataStage;
+import org.apache.helix.controller.stages.NewResourceComputationStage;
+import org.apache.helix.controller.stages.NewTaskAssignmentStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.controller.stages.RebalanceIdealStateStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
@@ -175,23 +185,22 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
// cluster data cache refresh
Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new ReadClusterDataStage());
+ dataRefresh.addStage(new NewReadClusterDataStage());
// rebalance pipeline
Pipeline rebalancePipeline = new Pipeline();
- rebalancePipeline.addStage(new CompatibilityCheckStage());
- rebalancePipeline.addStage(new ResourceComputationStage());
- rebalancePipeline.addStage(new CurrentStateComputationStage());
- rebalancePipeline.addStage(new RebalanceIdealStateStage());
- rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new MessageGenerationPhase());
- rebalancePipeline.addStage(new MessageSelectionStage());
- rebalancePipeline.addStage(new MessageThrottleStage());
- rebalancePipeline.addStage(new TaskAssignmentStage());
+ rebalancePipeline.addStage(new NewCompatibilityCheckStage());
+ rebalancePipeline.addStage(new NewResourceComputationStage());
+ rebalancePipeline.addStage(new NewCurrentStateComputationStage());
+ rebalancePipeline.addStage(new NewBestPossibleStateCalcStage());
+ rebalancePipeline.addStage(new NewMessageGenerationStage());
+ rebalancePipeline.addStage(new NewMessageSelectionStage());
+ rebalancePipeline.addStage(new NewMessageThrottleStage());
+ rebalancePipeline.addStage(new NewTaskAssignmentStage());
// external view generation
Pipeline externalViewPipeline = new Pipeline();
- externalViewPipeline.addStage(new ExternalViewComputeStage());
+ externalViewPipeline.addStage(new NewExternalViewComputeStage());
registry.register("idealStateChange", dataRefresh, rebalancePipeline);
registry.register("currentStateChange", dataRefresh, rebalancePipeline, externalViewPipeline);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
index 563b7e2..8821082 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
@@ -38,6 +38,7 @@ import org.apache.helix.api.Resource;
import org.apache.helix.api.State;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.controller.stages.NewCurrentStateOutput;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
@@ -66,7 +67,7 @@ public class NewAutoRebalancer implements NewRebalancer {
@Override
public ResourceAssignment computeResourceMapping(Resource resource, Cluster cluster,
- StateModelDefinition stateModelDef) {
+ StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
// Compute a preference list based on the current ideal state
List<Partition> partitions = new ArrayList<Partition>(resource.getPartitionSet());
List<String> partitionNames = Lists.transform(partitions, Functions.toStringFunction());
@@ -75,15 +76,15 @@ public class NewAutoRebalancer implements NewRebalancer {
Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
int replicas = config.getReplicaCount();
- LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
- stateCountMap =
+ LinkedHashMap<String, Integer> stateCountMap =
ConstraintBasedAssignment.stateCount(stateModelDef, liveParticipants.size(), replicas);
List<ParticipantId> liveParticipantList =
new ArrayList<ParticipantId>(liveParticipants.keySet());
List<ParticipantId> allParticipantList =
new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
List<String> liveNodes = Lists.transform(liveParticipantList, Functions.toStringFunction());
- Map<PartitionId, Map<ParticipantId, State>> currentMapping = currentMapping(resource);
+ Map<PartitionId, Map<ParticipantId, State>> currentMapping =
+ currentMapping(resource, currentStateOutput, stateCountMap);
// If there are nodes tagged with resource, use only those nodes
Set<String> taggedNodes = new HashSet<String>();
@@ -136,21 +137,37 @@ public class NewAutoRebalancer implements NewRebalancer {
Map<ParticipantId, State> bestStateForPartition =
NewConstraintBasedAssignment.computeAutoBestStateForPartition(liveParticipants,
stateModelDef, preferenceList,
- resource.getExternalView().getStateMap(partition.getId()),
+ currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId()),
disabledParticipantsForPartition);
partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
}
return partitionMapping;
}
- private Map<PartitionId, Map<ParticipantId, State>> currentMapping(Resource resource) {
+ private Map<PartitionId, Map<ParticipantId, State>> currentMapping(Resource resource,
+ NewCurrentStateOutput currentStateOutput, Map<String, Integer> stateCountMap) {
Map<PartitionId, Map<ParticipantId, State>> map =
new HashMap<PartitionId, Map<ParticipantId, State>>();
for (Partition partition : resource.getPartitionSet()) {
- Map<ParticipantId, State> stateMap = new HashMap<ParticipantId, State>();
- stateMap.putAll(resource.getExternalView().getStateMap(partition.getId()));
- stateMap.putAll(resource.getPendingExternalView().getStateMap(partition.getId()));
+ Map<ParticipantId, State> curStateMap =
+ currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId());
+ map.put(partition.getId(), new HashMap<ParticipantId, State>());
+ for (ParticipantId node : curStateMap.keySet()) {
+ State state = curStateMap.get(node);
+ if (stateCountMap.containsKey(state)) {
+ map.get(partition.getId()).put(node, state);
+ }
+ }
+
+ Map<ParticipantId, State> pendingStateMap =
+ currentStateOutput.getPendingStateMap(resource.getId(), partition.getId());
+ for (ParticipantId node : pendingStateMap.keySet()) {
+ State state = pendingStateMap.get(node);
+ if (stateCountMap.containsKey(state)) {
+ map.get(partition.getId()).put(node, state);
+ }
+ }
}
return map;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
index 600d848..8d000f5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
@@ -32,6 +32,7 @@ import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
import org.apache.helix.api.State;
import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.controller.stages.NewCurrentStateOutput;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -51,7 +52,7 @@ public class NewCustomRebalancer implements NewRebalancer {
@Override
public ResourceAssignment computeResourceMapping(Resource resource, Cluster cluster,
- StateModelDefinition stateModelDef) {
+ StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing resource:" + resource.getId());
}
@@ -59,13 +60,13 @@ public class NewCustomRebalancer implements NewRebalancer {
RebalancerConfig config = resource.getRebalancerConfig();
for (Partition partition : resource.getPartitionSet()) {
Map<ParticipantId, State> currentStateMap =
- resource.getExternalView().getStateMap(partition.getId());
+ currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId());
Set<ParticipantId> disabledInstancesForPartition =
NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
partition.getId());
Map<ParticipantId, State> bestStateForPartition =
computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap(), stateModelDef,
- config.getResourceAssignment().getReplicaMap(partition.getId()), currentStateMap,
+ config.getPreferenceMap(partition.getId()), currentStateMap,
disabledInstancesForPartition);
partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
}
@@ -74,11 +75,11 @@ public class NewCustomRebalancer implements NewRebalancer {
/**
* compute best state for resource in CUSTOMIZED rebalancer mode
- * @param cache
+ * @param liveParticipantMap
* @param stateModelDef
* @param idealStateMap
* @param currentStateMap
- * @param disabledInstancesForPartition
+ * @param disabledParticipantsForPartition
* @return
*/
private Map<ParticipantId, State> computeCustomizedBestStateForPartition(
@@ -87,7 +88,7 @@ public class NewCustomRebalancer implements NewRebalancer {
Set<ParticipantId> disabledParticipantsForPartition) {
Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
- // if the ideal state is deleted, idealStateMap will be null/empty and
+ // if the resource is deleted, idealStateMap will be null/empty and
// we should drop all resources.
if (currentStateMap != null) {
for (ParticipantId participantId : currentStateMap.keySet()) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
index 253723f..70c9ca7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewRebalancer.java
@@ -21,6 +21,7 @@ package org.apache.helix.controller.rebalancer;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Resource;
+import org.apache.helix.controller.stages.NewCurrentStateOutput;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
@@ -37,7 +38,8 @@ public interface NewRebalancer {
* @param resource the resource for which a mapping will be computed
* @param cluster a snapshot of the entire cluster state
* @param stateModelDef the state model for which to rebalance the resource
+ * @param currentStateOutput a combination of the current states and pending current states
*/
ResourceAssignment computeResourceMapping(final Resource resource, final Cluster cluster,
- final StateModelDefinition stateModelDef);
+ final StateModelDefinition stateModelDef, final NewCurrentStateOutput currentStateOutput);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
index 472e7d3..27bb513 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
@@ -30,6 +30,7 @@ import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
import org.apache.helix.api.State;
import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.controller.stages.NewCurrentStateOutput;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -49,7 +50,7 @@ public class NewSemiAutoRebalancer implements NewRebalancer {
@Override
public ResourceAssignment computeResourceMapping(Resource resource, Cluster cluster,
- StateModelDefinition stateModelDef) {
+ StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing resource:" + resource.getId());
}
@@ -57,7 +58,7 @@ public class NewSemiAutoRebalancer implements NewRebalancer {
RebalancerConfig config = resource.getRebalancerConfig();
for (Partition partition : resource.getPartitionSet()) {
Map<ParticipantId, State> currentStateMap =
- resource.getExternalView().getStateMap(partition.getId());
+ currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId());
Set<ParticipantId> disabledInstancesForPartition =
NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
partition.getId());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
index feb3214..224853b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
@@ -43,8 +43,8 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Sets;
/**
- * Collection of functions that will compute the best possible states given the live instances and
- * an ideal state.
+ * Collection of functions that will compute the best possible state based on the participants and
+ * the rebalancer configuration of a resource.
*/
public class NewConstraintBasedAssignment {
private static Logger logger = Logger.getLogger(NewConstraintBasedAssignment.class);
@@ -57,8 +57,9 @@ public class NewConstraintBasedAssignment {
*/
public static Set<ParticipantId> getDisabledParticipants(
final Map<ParticipantId, Participant> participantMap, final PartitionId partitionId) {
+ Set<ParticipantId> participantSet = new HashSet<ParticipantId>(participantMap.keySet());
Set<ParticipantId> disabledParticipantsForPartition =
- Sets.filter(participantMap.keySet(), new Predicate<ParticipantId>() {
+ Sets.filter(participantSet, new Predicate<ParticipantId>() {
@Override
public boolean apply(ParticipantId participantId) {
return participantMap.get(participantId).getDisablePartitionIds().contains(partitionId);
@@ -87,7 +88,7 @@ public class NewConstraintBasedAssignment {
}
/**
- * compute best state for resource in AUTO ideal state mode
+ * compute best state for resource in SEMI_AUTO and FULL_AUTO modes
* @param liveParticipantMap map of id to live participants
* @param stateModelDef
* @param participantPreferenceList
@@ -102,7 +103,7 @@ public class NewConstraintBasedAssignment {
Set<ParticipantId> disabledParticipantsForPartition) {
Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
- // if the ideal state is deleted, instancePreferenceList will be empty and
+ // if the resource is deleted, instancePreferenceList will be empty and
// we should drop all resources.
if (currentStateMap != null) {
for (ParticipantId participantId : currentStateMap.keySet()) {
@@ -119,7 +120,7 @@ public class NewConstraintBasedAssignment {
}
}
- // ideal state is deleted
+ // resource is deleted
if (participantPreferenceList == null) {
return participantStateMap;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index ae0278b..9abf67c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -26,5 +26,6 @@ public enum AttributeName {
MESSAGES_ALL,
MESSAGES_SELECTED,
MESSAGES_THROTTLE,
- LOCAL_STATE
+ LOCAL_STATE,
+ STATE_MODEL_DEFINITIONS
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
index 995bb74..bc14297 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -20,30 +20,26 @@ package org.apache.helix.controller.stages;
*/
import java.util.Map;
+import java.util.Set;
-import org.apache.helix.HelixManager;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Id;
import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
import org.apache.helix.api.ResourceId;
-import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.AutoRebalancer;
-import org.apache.helix.controller.rebalancer.CustomRebalancer;
import org.apache.helix.controller.rebalancer.NewAutoRebalancer;
import org.apache.helix.controller.rebalancer.NewCustomRebalancer;
import org.apache.helix.controller.rebalancer.NewRebalancer;
import org.apache.helix.controller.rebalancer.NewSemiAutoRebalancer;
-import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
-import org.apache.helix.model.IdealState;
+import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.util.HelixUtil;
+import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
/**
@@ -51,13 +47,14 @@ import org.apache.log4j.Logger;
* IdealState,StateModel,LiveInstance
*/
public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
- private static final Logger LOG = Logger.getLogger(NewBestPossibleStateCalcStage.class
- .getName());
+ private static final Logger LOG = Logger.getLogger(NewBestPossibleStateCalcStage.class.getName());
@Override
public void process(ClusterEvent event) throws Exception {
long startTime = System.currentTimeMillis();
- LOG.info("START BestPossibleStateCalcStage.process()");
+ if (LOG.isInfoEnabled()) {
+ LOG.info("START BestPossibleStateCalcStage.process()");
+ }
NewCurrentStateOutput currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.toString());
@@ -74,49 +71,70 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
long endTime = System.currentTimeMillis();
- LOG.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
+ if (LOG.isInfoEnabled()) {
+ LOG.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
+ }
+ }
+
+ /**
+ * Fallback for cases when the resource has been dropped, but current state exists
+ * @param cluster cluster snapshot
+ * @param resourceId the resource for which to generate an assignment
+ * @param currentStateOutput full snapshot of the current state
+ * @param stateModelDef state model the resource follows
+ * @return assignment for the dropped resource
+ */
+ private ResourceAssignment mapDroppedResource(Cluster cluster, ResourceId resourceId,
+ NewCurrentStateOutput currentStateOutput, StateModelDefinition stateModelDef) {
+ ResourceAssignment partitionMapping = new ResourceAssignment(resourceId);
+ Set<PartitionId> mappedPartitions =
+ currentStateOutput.getCurrentStateMappedPartitions(resourceId);
+ if (mappedPartitions == null) {
+ return partitionMapping;
+ }
+ for (PartitionId partitionId : mappedPartitions) {
+ Set<ParticipantId> disabledParticipantsForPartition =
+ NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+ partitionId);
+ partitionMapping.addReplicaMap(partitionId, NewConstraintBasedAssignment
+ .computeAutoBestStateForPartition(cluster.getLiveParticipantMap(), stateModelDef, null,
+ currentStateOutput.getCurrentStateMap(resourceId, partitionId),
+ disabledParticipantsForPartition));
+ }
+ return partitionMapping;
}
// TODO check this
private NewBestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
Map<ResourceId, Resource> resourceMap, NewCurrentStateOutput currentStateOutput) {
- // for each ideal state
- // read the state model def
- // for each resource
- // get the preference list
- // for each instanceName check if its alive then assign a state
- // ClusterDataCache cache = event.getAttribute("ClusterDataCache");
-
NewBestPossibleStateOutput output = new NewBestPossibleStateOutput();
+ Map<StateModelDefId, StateModelDefinition> stateModelDefs =
+ event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
for (ResourceId resourceId : resourceMap.keySet()) {
LOG.debug("Processing resource:" + resourceId);
-
- Resource resource = resourceMap.get(resourceId);
- // Ideal state may be gone. In that case we need to get the state model name
+ // Resource may be gone. In that case we need to get the state model name
// from the current state
- // IdealState idealState = cache.getIdealState(resourceName);
-
Resource existResource = cluster.getResource(resourceId);
if (existResource == null) {
- // if ideal state is deleted, use an empty one
- LOG.info("resource:" + resourceId + " does not exist anymore");
- // TODO
- // existResource = new Resource();
+ // if resource is deleted, then we do not know which rebalancer to use
+ // instead, just mark all partitions of the resource as dropped
+ if (LOG.isInfoEnabled()) {
+ LOG.info("resource:" + resourceId + " does not exist anymore");
+ }
+ StateModelDefinition stateModelDef =
+ stateModelDefs.get(currentStateOutput.getResourceStateModelDef(resourceId));
+ ResourceAssignment droppedAssignment =
+ mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
+ output.setResourceAssignment(resourceId, droppedAssignment);
+ continue;
}
RebalancerConfig rebalancerConfig = existResource.getRebalancerConfig();
NewRebalancer rebalancer = null;
if (rebalancerConfig.getRebalancerMode() == RebalanceMode.USER_DEFINED
- && rebalancerConfig.getRebalancerClassName() != null) {
- String rebalancerClassName = rebalancerConfig.getRebalancerClassName();
- LOG.info("resource " + resourceId + " use idealStateRebalancer " + rebalancerClassName);
- try {
- rebalancer =
- (NewRebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
- } catch (Exception e) {
- LOG.warn("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
- }
+ && rebalancerConfig.getRebalancerRef() != null) {
+ rebalancer = rebalancerConfig.getRebalancerRef().getRebalancer();
}
if (rebalancer == null) {
if (rebalancerConfig.getRebalancerMode() == RebalanceMode.FULL_AUTO) {
@@ -128,9 +146,15 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
}
}
- // TODO pass state model definition
+ StateModelDefinition stateModelDef =
+ stateModelDefs.get(rebalancerConfig.getStateModelDefId());
ResourceAssignment resourceAssignment =
- rebalancer.computeResourceMapping(resource, cluster, null);
+ rebalancer.computeResourceMapping(existResource, cluster, stateModelDef,
+ currentStateOutput);
+ if (resourceAssignment == null) {
+ resourceAssignment =
+ mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
+ }
output.setResourceAssignment(resourceId, resourceAssignment);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
index 474f463..d5ee850 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
@@ -1,5 +1,6 @@
package org.apache.helix.controller.stages;
+import java.util.HashMap;
import java.util.Map;
import org.apache.helix.api.ResourceId;
@@ -9,11 +10,25 @@ public class NewBestPossibleStateOutput {
Map<ResourceId, ResourceAssignment> _resourceAssignmentMap;
+ public NewBestPossibleStateOutput() {
+ _resourceAssignmentMap = new HashMap<ResourceId, ResourceAssignment>();
+ }
+
/**
- * @param resourceId
- * @param resourceAssignment
+ * Set the computed resource assignment for a resource
+ * @param resourceId the resource to set
+ * @param resourceAssignment the computed assignment
*/
public void setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
_resourceAssignmentMap.put(resourceId, resourceAssignment);
}
+
+ /**
+ * Get the resource assignment computed for a resource
+ * @param resourceId resource to look up
+ * @return ResourceAssignment computed by the best possible state calculation
+ */
+ public ResourceAssignment getResourceAssignment(ResourceId resourceId) {
+ return _resourceAssignmentMap.get(resourceId);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewCompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCompatibilityCheckStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCompatibilityCheckStage.java
new file mode 100644
index 0000000..7478609
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCompatibilityCheckStage.java
@@ -0,0 +1,68 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.HelixVersion;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.log4j.Logger;
+
+/**
+ * controller checks if participant version is compatible
+ */
+public class NewCompatibilityCheckStage extends AbstractBaseStage {
+ private static final Logger LOG = Logger.getLogger(NewCompatibilityCheckStage.class.getName());
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ HelixManager manager = event.getAttribute("helixmanager");
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ if (manager == null || cluster == null) {
+ throw new StageException("Missing attributes in event:" + event
+ + ". Requires HelixManager | DataCache");
+ }
+
+ HelixManagerProperties properties = manager.getProperties();
+ // Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
+ Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
+ for (Participant liveParticipant : liveParticipants.values()) {
+ HelixVersion version = liveParticipant.getRunningInstance().getVersion();
+ String participantVersion = (version != null) ? version.toString() : null;
+ if (!properties.isParticipantCompatible(participantVersion)) {
+ String errorMsg =
+ "incompatible participant. pipeline will not continue. " + "controller: "
+ + manager.getInstanceName() + ", controllerVersion: " + properties.getVersion()
+ + ", minimumSupportedParticipantVersion: "
+ + properties.getProperty("minimum_supported_version.participant")
+ + ", participant: " + liveParticipant.getId() + ", participantVersion: "
+ + participantVersion;
+ LOG.error(errorMsg);
+ throw new StageException(errorMsg);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
index 417512e..d8bbfe3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
@@ -22,6 +22,7 @@ package org.apache.helix.controller.stages;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
@@ -29,7 +30,6 @@ import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
import org.apache.helix.api.StateModelDefId;
import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.Partition;
public class NewCurrentStateOutput {
/**
@@ -135,7 +135,7 @@ public class NewCurrentStateOutput {
* @return state
*/
static State getState(Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap,
- ResourceId resourceId, PartitionId partitionId, ParticipantId participantId) {
+ ResourceId resourceId, PartitionId partitionId, ParticipantId participantId) {
Map<PartitionId, Map<ParticipantId, State>> map = stateMap.get(resourceId);
if (map != null) {
Map<ParticipantId, State> instanceStateMap = map.get(partitionId);
@@ -221,12 +221,25 @@ public class NewCurrentStateOutput {
}
/**
+ * Get the partitions mapped in the current state
+ * @param resourceId resource to look up
+ * @return set of mapped partitions, or empty set if there are none
+ */
+ public Set<PartitionId> getCurrentStateMappedPartitions(ResourceId resourceId) {
+ Map<PartitionId, Map<ParticipantId, State>> currentStateMap = _currentStateMap.get(resourceId);
+ if (currentStateMap != null) {
+ return currentStateMap.keySet();
+ }
+ return Collections.emptySet();
+ }
+
+ /**
* @param resourceId
* @param partitionId
* @return
*/
public Map<ParticipantId, State> getPendingStateMap(ResourceId resourceId, PartitionId partitionId) {
- return getStateMap(_currentStateMap, resourceId, partitionId);
+ return getStateMap(_pendingStateMap, resourceId, partitionId);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationPhase.java
deleted file mode 100644
index 0fdfe56..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationPhase.java
+++ /dev/null
@@ -1,233 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Id;
-import org.apache.helix.api.MessageId;
-import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.ResourceId;
-import org.apache.helix.api.SessionId;
-import org.apache.helix.api.State;
-import org.apache.helix.api.StateModelDefId;
-import org.apache.helix.api.StateModelFactoryId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
-
-/**
- * Compares the currentState, pendingState with IdealState and generate messages
- */
-public class NewMessageGenerationPhase extends AbstractBaseStage {
- private static Logger LOG = Logger.getLogger(NewMessageGenerationPhase.class);
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- HelixManager manager = event.getAttribute("helixmanager");
- Cluster cluster = event.getAttribute("ClusterDataCache");
- Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
- NewCurrentStateOutput currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
- BestPossibleStateOutput bestPossibleStateOutput =
- event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
- if (manager == null || cluster == null || resourceMap == null || currentStateOutput == null
- || bestPossibleStateOutput == null) {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
- }
-
- // Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
- // Map<String, String> sessionIdMap = new HashMap<String, String>();
-
- // for (LiveInstance liveInstance : liveInstances.values()) {
- // sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId().stringify());
- // }
- MessageGenerationOutput output = new MessageGenerationOutput();
-
- for (ResourceId resourceId : resourceMap.keySet()) {
- Resource resource = resourceMap.get(resourceId);
- int bucketSize = resource.getRebalancerConfig().getBucketSize();
-
- // TODO fix it
- StateModelDefinition stateModelDef = null;
- // cache.getStateModelDef(resource.getStateModelDefRef());
-
- for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
- // TODO fix it
- Map<ParticipantId, State> instanceStateMap = null;
- // bestPossibleStateOutput.getInstanceStateMap(resourceId, partition);
-
- // we should generate message based on the desired-state priority
- // so keep generated messages in a temp map keyed by state
- // desired-state->list of generated-messages
- Map<State, List<Message>> messageMap = new HashMap<State, List<Message>>();
-
- for (ParticipantId participantId : instanceStateMap.keySet()) {
- State desiredState = instanceStateMap.get(participantId);
-
- State currentState =
- currentStateOutput.getCurrentState(resourceId, partitionId, participantId);
- if (currentState == null) {
- // TODO fix it
- // currentState = stateModelDef.getInitialStateString();
- }
-
- if (desiredState.equals(currentState)) {
- continue;
- }
-
- State pendingState =
- currentStateOutput.getPendingState(resourceId, partitionId, participantId);
-
- // TODO fix it
- State nextState = new State("");
- // stateModelDef.getNextStateForTransition(currentState, desiredState);
- if (nextState == null) {
- LOG.error("Unable to find a next state for partition: " + partitionId
- + " from stateModelDefinition"
- + stateModelDef.getClass() + " from:" + currentState + " to:" + desiredState);
- continue;
- }
-
- if (pendingState != null) {
- if (nextState.equals(pendingState)) {
- LOG.debug("Message already exists for " + participantId + " to transit "
- + partitionId + " from " + currentState + " to " + nextState);
- } else if (currentState.equals(pendingState)) {
- LOG.info("Message hasn't been removed for " + participantId + " to transit"
- + partitionId + " to " + pendingState + ", desiredState: "
- + desiredState);
- } else {
- LOG.info("IdealState changed before state transition completes for " + partitionId
- + " on " + participantId + ", pendingState: "
- + pendingState + ", currentState: " + currentState + ", nextState: " + nextState);
- }
- } else {
- // TODO check if instance is alive
- SessionId sessionId =
- cluster.getLiveParticipantMap().get(participantId).getRunningInstance()
- .getSessionId();
- Message message =
- createMessage(manager, resourceId, partitionId, participantId, currentState,
- nextState, sessionId, new StateModelDefId(stateModelDef.getId()), resource
- .getRebalancerConfig()
- .getStateModelFactoryId(), bucketSize);
-
- // TODO fix this
- // IdealState idealState = cache.getIdealState(resourceName);
- // if (idealState != null
- // && idealState.getStateModelDefRef().equalsIgnoreCase(
- // DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
- // if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
- // message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
- // idealState.getRecord().getMapField(partition.getPartitionName()));
- // }
- // }
- // Set timeout of needed
- // String stateTransition =
- // currentState + "-" + nextState + "_" + Message.Attributes.TIMEOUT;
- // if (idealState != null) {
- // String timeOutStr = idealState.getRecord().getSimpleField(stateTransition);
- // if (timeOutStr == null
- // && idealState.getStateModelDefRef().equalsIgnoreCase(
- // DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
- // // scheduled task queue
- // if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
- // timeOutStr =
- // idealState.getRecord().getMapField(partition.getPartitionName())
- // .get(Message.Attributes.TIMEOUT.toString());
- // }
- // }
- // if (timeOutStr != null) {
- // try {
- // int timeout = Integer.parseInt(timeOutStr);
- // if (timeout > 0) {
- // message.setExecutionTimeout(timeout);
- // }
- // } catch (Exception e) {
- // logger.error("", e);
- // }
- // }
- // }
- // message.getRecord().setSimpleField("ClusterEventName", event.getName());
-
- if (!messageMap.containsKey(desiredState)) {
- messageMap.put(desiredState, new ArrayList<Message>());
- }
- messageMap.get(desiredState).add(message);
- }
- }
-
- // add generated messages to output according to state priority
- List<String> statesPriorityList = stateModelDef.getStatesPriorityStringList();
- for (String state : statesPriorityList) {
- if (messageMap.containsKey(state)) {
- for (Message message : messageMap.get(state)) {
- // TODO fix it
- // output.addMessage(resourceId, partitionId, message);
- }
- }
- }
-
- } // end of for-each-partition
- }
- event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
- }
-
- private Message createMessage(HelixManager manager, ResourceId resourceId,
- PartitionId partitionId, ParticipantId participantId, State currentState, State nextState,
- SessionId sessionId, StateModelDefId stateModelDefId,
- StateModelFactoryId stateModelFactoryId, int bucketSize) {
- // MessageId uuid = Id.message(UUID.randomUUID().toString());
- // Message message = new Message(MessageType.STATE_TRANSITION, uuid);
- // message.setSrcName(manager.getInstanceName());
- // message.setTgtName(instanceName);
- // message.setMsgState(MessageState.NEW);
- // message.setPartitionId(Id.partition(partitionName));
- // message.setResourceId(Id.resource(resourceName));
- // message.setFromState(State.from(currentState));
- // message.setToState(State.from(nextState));
- // message.setTgtSessionId(Id.session(sessionId));
- // message.setSrcSessionId(Id.session(manager.getSessionId()));
- // message.setStateModelDef(Id.stateModelDef(stateModelDefName));
- // message.setStateModelFactoryName(stateModelFactoryName);
- // message.setBucketSize(bucketSize);
- //
- // return message;
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
new file mode 100644
index 0000000..0a72dc0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
@@ -0,0 +1,211 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.MessageId;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.SessionId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.StateModelFactoryId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * Compares the currentState, pendingState with IdealState and generate messages
+ */
+public class NewMessageGenerationStage extends AbstractBaseStage {
+ private static Logger LOG = Logger.getLogger(NewMessageGenerationStage.class);
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ HelixManager manager = event.getAttribute("helixmanager");
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
+ event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
+ Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ NewCurrentStateOutput currentStateOutput =
+ event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ NewBestPossibleStateOutput bestPossibleStateOutput =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+ if (manager == null || cluster == null || resourceMap == null || currentStateOutput == null
+ || bestPossibleStateOutput == null) {
+ throw new StageException("Missing attributes in event:" + event
+ + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
+ }
+
+ NewMessageOutput output = new NewMessageOutput();
+
+ for (ResourceId resourceId : resourceMap.keySet()) {
+ Resource resource = resourceMap.get(resourceId);
+ int bucketSize = resource.getRebalancerConfig().getBucketSize();
+
+ StateModelDefinition stateModelDef =
+ stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
+
+ ResourceAssignment resourceAssignment =
+ bestPossibleStateOutput.getResourceAssignment(resourceId);
+ for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+ Map<ParticipantId, State> instanceStateMap = resourceAssignment.getReplicaMap(partitionId);
+
+ // we should generate message based on the desired-state priority
+ // so keep generated messages in a temp map keyed by state
+ // desired-state->list of generated-messages
+ Map<State, List<Message>> messageMap = new HashMap<State, List<Message>>();
+
+ for (ParticipantId participantId : instanceStateMap.keySet()) {
+ State desiredState = instanceStateMap.get(participantId);
+
+ State currentState =
+ currentStateOutput.getCurrentState(resourceId, partitionId, participantId);
+ if (currentState == null) {
+ currentState = stateModelDef.getInitialState();
+ }
+
+ if (desiredState.equals(currentState)) {
+ continue;
+ }
+
+ State pendingState =
+ currentStateOutput.getPendingState(resourceId, partitionId, participantId);
+
+ // TODO fix it
+ State nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
+ if (nextState == null) {
+ LOG.error("Unable to find a next state for partition: " + partitionId
+ + " from stateModelDefinition" + stateModelDef.getClass() + " from:" + currentState
+ + " to:" + desiredState);
+ continue;
+ }
+
+ if (pendingState != null) {
+ if (nextState.equals(pendingState)) {
+ LOG.debug("Message already exists for " + participantId + " to transit "
+ + partitionId + " from " + currentState + " to " + nextState);
+ } else if (currentState.equals(pendingState)) {
+ LOG.info("Message hasn't been removed for " + participantId + " to transit"
+ + partitionId + " to " + pendingState + ", desiredState: " + desiredState);
+ } else {
+ LOG.info("IdealState changed before state transition completes for " + partitionId
+ + " on " + participantId + ", pendingState: " + pendingState + ", currentState: "
+ + currentState + ", nextState: " + nextState);
+ }
+ } else {
+ // TODO check if instance is alive
+ SessionId sessionId =
+ cluster.getLiveParticipantMap().get(participantId).getRunningInstance()
+ .getSessionId();
+ Message message =
+ createMessage(manager, resourceId, partitionId, participantId, currentState,
+ nextState, sessionId, new StateModelDefId(stateModelDef.getId()), resource
+ .getRebalancerConfig().getStateModelFactoryId(), bucketSize);
+
+ // TODO refactor set timeout logic, it's really messy
+ RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+ if (rebalancerConfig != null
+ && rebalancerConfig.getStateModelDefId().stringify()
+ .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+ if (resource.getPartitionSet().size() > 0) {
+ // TODO refactor it
+ message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
+ resource.getSchedulerTaskConfig().getTaskConfig(partitionId));
+ }
+ }
+
+ // Set timeout of needed
+ String stateTransition =
+ currentState + "-" + nextState + "_" + Message.Attributes.TIMEOUT;
+ if (resource.getSchedulerTaskConfig() != null) {
+ Integer timeout =
+ resource.getSchedulerTaskConfig().getTimeout(stateTransition, partitionId);
+ if (timeout != null && timeout > 0) {
+ message.setExecutionTimeout(timeout);
+ }
+ }
+ message.getRecord().setSimpleField("ClusterEventName", event.getName());
+
+ if (!messageMap.containsKey(desiredState)) {
+ messageMap.put(desiredState, new ArrayList<Message>());
+ }
+ messageMap.get(desiredState).add(message);
+ }
+ }
+
+ // add generated messages to output according to state priority
+ List<State> statesPriorityList = stateModelDef.getStatesPriorityList();
+ for (State state : statesPriorityList) {
+ if (messageMap.containsKey(state)) {
+ for (Message message : messageMap.get(state)) {
+ output.addMessage(resourceId, partitionId, message);
+ }
+ }
+ }
+
+ } // end of for-each-partition
+ }
+ event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
+ // System.out.println("output: " + output);
+ }
+
+ private Message createMessage(HelixManager manager, ResourceId resourceId,
+ PartitionId partitionId, ParticipantId participantId, State currentState, State nextState,
+ SessionId participantSessionId, StateModelDefId stateModelDefId,
+ StateModelFactoryId stateModelFactoryId, int bucketSize) {
+ MessageId uuid = Id.message(UUID.randomUUID().toString());
+ Message message = new Message(MessageType.STATE_TRANSITION, uuid);
+ message.setSrcName(manager.getInstanceName());
+ message.setTgtName(participantId.stringify());
+ message.setMsgState(MessageState.NEW);
+ message.setPartitionId(partitionId);
+ message.setResourceId(resourceId);
+ message.setFromState(currentState);
+ message.setToState(nextState);
+ message.setTgtSessionId(participantSessionId);
+ message.setSrcSessionId(Id.session(manager.getSessionId()));
+ message.setStateModelDef(stateModelDefId);
+ message.setStateModelFactoryName(stateModelFactoryId.stringify());
+ message.setBucketSize(bucketSize);
+
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5d0e048e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageOutput.java
new file mode 100644
index 0000000..3dd5211
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageOutput.java
@@ -0,0 +1,75 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.model.Message;
+
+public class NewMessageOutput {
+
+ private final Map<ResourceId, Map<PartitionId, List<Message>>> _messagesMap;
+
+ public NewMessageOutput() {
+ _messagesMap = new HashMap<ResourceId, Map<PartitionId, List<Message>>>();
+
+ }
+
+ public void addMessage(ResourceId resourceId, PartitionId partitionId, Message message) {
+ if (!_messagesMap.containsKey(resourceId)) {
+ _messagesMap.put(resourceId, new HashMap<PartitionId, List<Message>>());
+ }
+ if (!_messagesMap.get(resourceId).containsKey(partitionId)) {
+ _messagesMap.get(resourceId).put(partitionId, new ArrayList<Message>());
+
+ }
+ _messagesMap.get(resourceId).get(partitionId).add(message);
+
+ }
+
+ public void setMessages(ResourceId resourceId, PartitionId partitionId,
+ List<Message> selectedMessages) {
+ if (!_messagesMap.containsKey(resourceId)) {
+ _messagesMap.put(resourceId, new HashMap<PartitionId, List<Message>>());
+ }
+ _messagesMap.get(resourceId).put(partitionId, selectedMessages);
+
+ }
+
+ public List<Message> getMessages(ResourceId resourceId, PartitionId partitionId) {
+ Map<PartitionId, List<Message>> map = _messagesMap.get(resourceId);
+ if (map != null) {
+ return map.get(partitionId);
+ }
+ return Collections.emptyList();
+
+ }
+
+ @Override
+ public String toString() {
+ return _messagesMap.toString();
+ }
+}