You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:53 UTC
[45/53] [abbrv] [HELIX-209] Shuffling around rebalancer code to allow
for compatibility
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
deleted file mode 100644
index dfea7fc..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
+++ /dev/null
@@ -1,198 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.ClusterConstraints;
-import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
-import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.ClusterConstraints.ConstraintValue;
-import org.apache.helix.model.ConstraintItem;
-import org.apache.helix.model.Message;
-import org.apache.log4j.Logger;
-
-public class NewMessageThrottleStage extends AbstractBaseStage {
- private static final Logger LOG = Logger.getLogger(NewMessageThrottleStage.class.getName());
-
- int valueOf(String valueStr) {
- int value = Integer.MAX_VALUE;
-
- try {
- ConstraintValue valueToken = ConstraintValue.valueOf(valueStr);
- switch (valueToken) {
- case ANY:
- value = Integer.MAX_VALUE;
- break;
- default:
- LOG.error("Invalid constraintValue token:" + valueStr + ". Use default value:"
- + Integer.MAX_VALUE);
- break;
- }
- } catch (Exception e) {
- try {
- value = Integer.parseInt(valueStr);
- } catch (NumberFormatException ne) {
- LOG.error("Invalid constraintValue string:" + valueStr + ". Use default value:"
- + Integer.MAX_VALUE);
- }
- }
- return value;
- }
-
- /**
- * constraints are selected in the order of the following rules: 1) don't select
- * constraints with CONSTRAINT_VALUE=ANY; 2) if one constraint is more specific than the
- * other, select the most specific one 3) if a message matches multiple constraints of
- * incomparable specificity, select the one with the minimum value 4) if a message
- * matches multiple constraints of incomparable specificity, and they all have the same
- * value, select the first in alphabetic order
- */
- Set<ConstraintItem> selectConstraints(Set<ConstraintItem> items,
- Map<ConstraintAttribute, String> attributes) {
- Map<String, ConstraintItem> selectedItems = new HashMap<String, ConstraintItem>();
- for (ConstraintItem item : items) {
- // don't select constraints with CONSTRAINT_VALUE=ANY
- if (item.getConstraintValue().equals(ConstraintValue.ANY.toString())) {
- continue;
- }
-
- String key = item.filter(attributes).toString();
- if (!selectedItems.containsKey(key)) {
- selectedItems.put(key, item);
- } else {
- ConstraintItem existingItem = selectedItems.get(key);
- if (existingItem.match(item.getAttributes())) {
- // item is more specific than existingItem
- selectedItems.put(key, item);
- } else if (!item.match(existingItem.getAttributes())) {
- // existingItem and item are of incomparable specificity
- int value = valueOf(item.getConstraintValue());
- int existingValue = valueOf(existingItem.getConstraintValue());
- if (value < existingValue) {
- // item's constraint value is less than that of existingItem
- selectedItems.put(key, item);
- } else if (value == existingValue) {
- if (item.toString().compareTo(existingItem.toString()) < 0) {
- // item is ahead of existingItem in alphabetic order
- selectedItems.put(key, item);
- }
- }
- }
- }
- }
- return new HashSet<ConstraintItem>(selectedItems.values());
- }
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- Cluster cluster = event.getAttribute("ClusterDataCache");
- NewMessageOutput msgSelectionOutput =
- event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- Map<ResourceId, ResourceConfig> resourceMap =
- event.getAttribute(AttributeName.RESOURCES.toString());
-
- if (cluster == null || resourceMap == null || msgSelectionOutput == null) {
- throw new StageException("Missing attributes in event: " + event
- + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
- }
-
- NewMessageOutput output = new NewMessageOutput();
-
- // TODO fix it
- ClusterConstraints constraint = cluster.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
- Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
-
- if (constraint != null) {
- // go through all pending messages, they should be counted but not throttled
- for (ParticipantId participantId : cluster.getLiveParticipantMap().keySet()) {
- Participant liveParticipant = cluster.getLiveParticipantMap().get(participantId);
- throttle(throttleCounterMap, constraint, new ArrayList<Message>(liveParticipant
- .getMessageMap().values()), false);
- }
- }
-
- // go through all new messages, throttle if necessary
- // assume messages should be sorted by state transition priority in messageSelection stage
- for (ResourceId resourceId : resourceMap.keySet()) {
- ResourceConfig resource = resourceMap.get(resourceId);
- // TODO fix it
- for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
- List<Message> messages = msgSelectionOutput.getMessages(resourceId, partitionId);
- if (constraint != null && messages != null && messages.size() > 0) {
- messages = throttle(throttleCounterMap, constraint, messages, true);
- }
- output.setMessages(resourceId, partitionId, messages);
- }
- }
-
- event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output);
- }
-
- private List<Message> throttle(Map<String, Integer> throttleMap, ClusterConstraints constraint,
- List<Message> messages, final boolean needThrottle) {
-
- List<Message> throttleOutputMsgs = new ArrayList<Message>();
- for (Message message : messages) {
- Map<ConstraintAttribute, String> msgAttr = ClusterConstraints.toConstraintAttributes(message);
-
- Set<ConstraintItem> matches = constraint.match(msgAttr);
- matches = selectConstraints(matches, msgAttr);
-
- boolean msgThrottled = false;
- for (ConstraintItem item : matches) {
- String key = item.filter(msgAttr).toString();
- if (!throttleMap.containsKey(key)) {
- throttleMap.put(key, valueOf(item.getConstraintValue()));
- }
- int value = throttleMap.get(key);
- throttleMap.put(key, --value);
-
- if (needThrottle && value < 0) {
- msgThrottled = true;
-
- if (LOG.isDebugEnabled()) {
- // TODO: printout constraint item that throttles the message
- LOG.debug("message: " + message + " is throttled by constraint: " + item);
- }
- }
- }
-
- if (!msgThrottled) {
- throttleOutputMsgs.add(message);
- }
- }
-
- return throttleOutputMsgs;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
deleted file mode 100644
index 26050f8..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewReadClusterDataStage.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.accessor.ClusterAccessor;
-import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
-import org.apache.log4j.Logger;
-
-public class NewReadClusterDataStage extends AbstractBaseStage {
- private static final Logger LOG = Logger.getLogger(NewReadClusterDataStage.class.getName());
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- long startTime = System.currentTimeMillis();
- LOG.info("START ReadClusterDataStage.process()");
-
- HelixManager manager = event.getAttribute("helixmanager");
- if (manager == null) {
- throw new StageException("HelixManager attribute value is null");
- }
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- ClusterId clusterId = ClusterId.from(manager.getClusterName());
- ClusterAccessor clusterAccessor = new ClusterAccessor(clusterId, accessor);
-
- Cluster cluster = clusterAccessor.readCluster();
-
- ClusterStatusMonitor clusterStatusMonitor =
- (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
- if (clusterStatusMonitor != null) {
- // TODO fix it
- // int disabledInstances = 0;
- // int disabledPartitions = 0;
- // for (InstanceConfig config : _cache._instanceConfigMap.values()) {
- // if (config.getInstanceEnabled() == false) {
- // disabledInstances++;
- // }
- // if (config.getDisabledPartitions() != null) {
- // disabledPartitions += config.getDisabledPartitions().size();
- // }
- // }
- // clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(),
- // _cache._instanceConfigMap.size(), disabledInstances, disabledPartitions);
- }
-
- event.addAttribute("ClusterDataCache", cluster);
-
- long endTime = System.currentTimeMillis();
- LOG.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
deleted file mode 100644
index b531bd7..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ /dev/null
@@ -1,138 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.model.CurrentState;
-import org.apache.log4j.Logger;
-
-/**
- * This stage computes all the resources in a cluster. The resources are
- * computed from IdealStates -> this gives all the resources currently active
- * CurrentState for liveInstance-> Helps in finding resources that are inactive
- * and needs to be dropped
- */
-public class NewResourceComputationStage extends AbstractBaseStage {
- private static Logger LOG = Logger.getLogger(NewResourceComputationStage.class);
-
- @Override
- public void process(ClusterEvent event) throws StageException {
- Cluster cluster = event.getAttribute("ClusterDataCache");
- if (cluster == null) {
- throw new StageException("Missing attributes in event: " + event + ". Requires Cluster");
- }
-
- Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
- Map<ResourceId, ResourceConfig> csResCfgMap = getCurStateResourceCfgMap(cluster);
-
- // ideal-state may be removed, add all resource config in current-state but not in ideal-state
- for (ResourceId resourceId : csResCfgMap.keySet()) {
- if (!cluster.getResourceMap().keySet().contains(resourceId)) {
- resCfgMap.put(resourceId, csResCfgMap.get(resourceId));
- }
- }
-
- for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
- Resource resource = cluster.getResource(resourceId);
- RebalancerConfig rebalancerCfg = resource.getRebalancerConfig();
-
- ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
- resCfgBuilder.bucketSize(resource.getBucketSize());
- resCfgBuilder.batchMessageMode(resource.getBatchMessageMode());
- resCfgBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
- resCfgBuilder.rebalancerContext(rebalancerCfg.getRebalancerContext(RebalancerContext.class));
- resCfgMap.put(resourceId, resCfgBuilder.build());
- }
-
- event.addAttribute(AttributeName.RESOURCES.toString(), resCfgMap);
- }
-
- /**
- * Get resource config's from current-state
- * @param cluster
- * @return resource config map or empty map if not available
- * @throws StageException
- */
- Map<ResourceId, ResourceConfig> getCurStateResourceCfgMap(Cluster cluster) throws StageException {
- Map<ResourceId, ResourceConfig.Builder> resCfgBuilderMap =
- new HashMap<ResourceId, ResourceConfig.Builder>();
-
- Map<ResourceId, PartitionedRebalancerContext.Builder> rebCtxBuilderMap =
- new HashMap<ResourceId, PartitionedRebalancerContext.Builder>();
-
- for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
- for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
- CurrentState currentState = liveParticipant.getCurrentStateMap().get(resourceId);
-
- if (currentState.getStateModelDefRef() == null) {
- LOG.error("state model def is null." + "resource:" + currentState.getResourceId()
- + ", partitions: " + currentState.getPartitionStateMap().keySet()
- + ", states: " + currentState.getPartitionStateMap().values());
- throw new StageException("State model def is null for resource:"
- + currentState.getResourceId());
- }
-
- if (!resCfgBuilderMap.containsKey(resourceId)) {
- PartitionedRebalancerContext.Builder rebCtxBuilder =
- new PartitionedRebalancerContext.Builder(resourceId);
- rebCtxBuilder.stateModelDefId(currentState.getStateModelDefId());
- rebCtxBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
- .getStateModelFactoryName()));
- rebCtxBuilderMap.put(resourceId, rebCtxBuilder);
-
- ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
- resCfgBuilder.bucketSize(currentState.getBucketSize());
- resCfgBuilder.batchMessageMode(currentState.getBatchMessageMode());
- resCfgBuilderMap.put(resourceId, resCfgBuilder);
- }
-
- PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
- for (PartitionId partitionId : currentState.getTypedPartitionStateMap().keySet()) {
- rebCtxBuilder.addPartition(new Partition(partitionId));
- }
- }
- }
-
- Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
- for (ResourceId resourceId : resCfgBuilderMap.keySet()) {
- ResourceConfig.Builder resCfgBuilder = resCfgBuilderMap.get(resourceId);
- PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
- resCfgBuilder.rebalancerContext(rebCtxBuilder.build());
- resCfgMap.put(resourceId, resCfgBuilder.build());
- }
-
- return resCfgMap;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
deleted file mode 100644
index 51c9284..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
+++ /dev/null
@@ -1,151 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerProperties;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.Message;
-import org.apache.log4j.Logger;
-
-public class NewTaskAssignmentStage extends AbstractBaseStage {
- private static Logger logger = Logger.getLogger(NewTaskAssignmentStage.class);
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- long startTime = System.currentTimeMillis();
- logger.info("START TaskAssignmentStage.process()");
-
- HelixManager manager = event.getAttribute("helixmanager");
- Map<ResourceId, ResourceConfig> resourceMap =
- event.getAttribute(AttributeName.RESOURCES.toString());
- NewMessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
- Cluster cluster = event.getAttribute("ClusterDataCache");
- Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
-
- if (manager == null || resourceMap == null || messageOutput == null || cluster == null
- || liveParticipantMap == null) {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache|liveInstanceMap");
- }
-
- HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
- List<Message> messagesToSend = new ArrayList<Message>();
- for (ResourceId resourceId : resourceMap.keySet()) {
- ResourceConfig resource = resourceMap.get(resourceId);
- for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
- List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
- messagesToSend.addAll(messages);
- }
- }
-
- List<Message> outputMessages =
- batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveParticipantMap,
- manager.getProperties());
- sendMessages(dataAccessor, outputMessages);
-
- long endTime = System.currentTimeMillis();
- logger.info("END TaskAssignmentStage.process(). took: " + (endTime - startTime) + " ms");
-
- }
-
- List<Message> batchMessage(Builder keyBuilder, List<Message> messages,
- Map<ResourceId, ResourceConfig> resourceMap,
- Map<ParticipantId, Participant> liveParticipantMap, HelixManagerProperties properties) {
- // group messages by its CurrentState path + "/" + fromState + "/" + toState
- Map<String, Message> batchMessages = new HashMap<String, Message>();
- List<Message> outputMessages = new ArrayList<Message>();
-
- Iterator<Message> iter = messages.iterator();
- while (iter.hasNext()) {
- Message message = iter.next();
- ResourceId resourceId = message.getResourceId();
- ResourceConfig resource = resourceMap.get(resourceId);
-
- ParticipantId participantId = ParticipantId.from(message.getTgtName());
- Participant liveParticipant = liveParticipantMap.get(participantId);
- String participantVersion = null;
- if (liveParticipant != null) {
- participantVersion = liveParticipant.getRunningInstance().getVersion().toString();
- }
-
- if (resource == null || !resource.getBatchMessageMode() || participantVersion == null
- || !properties.isFeatureSupported("batch_message", participantVersion)) {
- outputMessages.add(message);
- continue;
- }
-
- String key =
- keyBuilder.currentState(message.getTgtName(), message.getTypedTgtSessionId().stringify(),
- message.getResourceId().stringify()).getPath()
- + "/" + message.getTypedFromState() + "/" + message.getTypedToState();
-
- if (!batchMessages.containsKey(key)) {
- Message batchMessage = new Message(message.getRecord());
- batchMessage.setBatchMessageMode(true);
- outputMessages.add(batchMessage);
- batchMessages.put(key, batchMessage);
- }
- batchMessages.get(key).addPartitionName(message.getPartitionId().stringify());
- }
-
- return outputMessages;
- }
-
- protected void sendMessages(HelixDataAccessor dataAccessor, List<Message> messages) {
- if (messages == null || messages.isEmpty()) {
- return;
- }
-
- Builder keyBuilder = dataAccessor.keyBuilder();
-
- List<PropertyKey> keys = new ArrayList<PropertyKey>();
- for (Message message : messages) {
- logger.info("Sending Message " + message.getMessageId() + " to " + message.getTgtName()
- + " transit " + message.getPartitionId() + "|" + message.getPartitionIds() + " from:"
- + message.getTypedFromState() + " to:" + message.getTypedToState());
-
- // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to "
- // + message.getTgtName() + " transit " + message.getPartitionId() + "|"
- // + message.getPartitionIds() + " from: " + message.getFromState() + " to: "
- // + message.getToState());
-
- keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
- }
-
- dataAccessor.createChildren(keys, new ArrayList<Message>(messages));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index be0b7f0..31dbb08 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -35,7 +35,7 @@ public class PersistAssignmentStage extends AbstractBaseStage {
HelixManager helixManager = event.getAttribute("helixmanager");
HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
ResourceAccessor resourceAccessor = new ResourceAccessor(accessor);
- NewBestPossibleStateOutput assignments =
+ BestPossibleStateOutput assignments =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
for (ResourceId resourceId : assignments.getAssignedResources()) {
ResourceAssignment assignment = assignments.getResourceAssignment(resourceId);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index ce81f1f..44fddb6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -21,53 +21,53 @@ package org.apache.helix.controller.stages;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.id.ClusterId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.InstanceConfig;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.log4j.Logger;
-@Deprecated
public class ReadClusterDataStage extends AbstractBaseStage {
- private static final Logger logger = Logger.getLogger(ReadClusterDataStage.class.getName());
- ClusterDataCache _cache;
-
- public ReadClusterDataStage() {
- _cache = new ClusterDataCache();
- }
+ private static final Logger LOG = Logger.getLogger(ReadClusterDataStage.class.getName());
@Override
public void process(ClusterEvent event) throws Exception {
long startTime = System.currentTimeMillis();
- logger.info("START ReadClusterDataStage.process()");
+ LOG.info("START ReadClusterDataStage.process()");
HelixManager manager = event.getAttribute("helixmanager");
if (manager == null) {
throw new StageException("HelixManager attribute value is null");
}
- HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
- _cache.refresh(dataAccessor);
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ ClusterId clusterId = ClusterId.from(manager.getClusterName());
+ ClusterAccessor clusterAccessor = new ClusterAccessor(clusterId, accessor);
+
+ Cluster cluster = clusterAccessor.readCluster();
ClusterStatusMonitor clusterStatusMonitor =
(ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
if (clusterStatusMonitor != null) {
- int disabledInstances = 0;
- int disabledPartitions = 0;
- for (InstanceConfig config : _cache._instanceConfigMap.values()) {
- if (config.getInstanceEnabled() == false) {
- disabledInstances++;
- }
- if (config.getDisabledPartitions() != null) {
- disabledPartitions += config.getDisabledPartitions().size();
- }
- }
- clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(),
- _cache._instanceConfigMap.size(), disabledInstances, disabledPartitions);
+ // TODO fix it
+ // int disabledInstances = 0;
+ // int disabledPartitions = 0;
+ // for (InstanceConfig config : _cache._instanceConfigMap.values()) {
+ // if (config.getInstanceEnabled() == false) {
+ // disabledInstances++;
+ // }
+ // if (config.getDisabledPartitions() != null) {
+ // disabledPartitions += config.getDisabledPartitions().size();
+ // }
+ // }
+ // clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(),
+ // _cache._instanceConfigMap.size(), disabledInstances, disabledPartitions);
}
- event.addAttribute("ClusterDataCache", _cache);
+ event.addAttribute("ClusterDataCache", cluster);
long endTime = System.currentTimeMillis();
- logger.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
+ LOG.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
index ae873c7..859c1d0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
@@ -23,10 +23,8 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.log4j.Logger;
public class ReadHealthDataStage extends AbstractBaseStage {
- private static final Logger LOG = Logger.getLogger(ReadHealthDataStage.class.getName());
HealthDataCache _cache;
public ReadHealthDataStage() {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
deleted file mode 100644
index 949cfca..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.util.HelixUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Check and invoke custom implementation idealstate rebalancers.<br/>
- * If the resourceConfig has specified className of the customized rebalancer, <br/>
- * the rebalancer will be invoked to re-write the idealstate of the resource<br/>
- */
-@Deprecated
-public class RebalanceIdealStateStage extends AbstractBaseStage {
- private static final Logger LOG = Logger.getLogger(RebalanceIdealStateStage.class.getName());
-
- @Override
- public void process(ClusterEvent event) throws Exception {
- HelixManager manager = event.getAttribute("helixmanager");
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- Map<String, IdealState> idealStateMap = cache.getIdealStates();
- CurrentStateOutput currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.toString());
-
- Map<String, IdealState> updatedIdealStates = new HashMap<String, IdealState>();
- for (String resourceName : idealStateMap.keySet()) {
- IdealState currentIdealState = idealStateMap.get(resourceName);
- if (currentIdealState.getRebalanceMode() == RebalanceMode.USER_DEFINED
- && currentIdealState.getRebalancerRef() != null) {
- String rebalancerClassName = currentIdealState.getRebalancerRef().toString();
- LOG.info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
- try {
- Rebalancer balancer =
- (Rebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
- balancer.init(manager);
- Resource resource = new Resource(resourceName);
- for (String partitionName : currentIdealState.getPartitionSet()) {
- resource.addPartition(partitionName);
- }
- ResourceAssignment resourceAssignment =
- balancer.computeResourceMapping(resource, currentIdealState, currentStateOutput,
- cache);
- StateModelDefinition stateModelDef =
- cache.getStateModelDef(currentIdealState.getStateModelDefRef());
- currentIdealState.updateFromAssignment(resourceAssignment, stateModelDef);
- updatedIdealStates.put(resourceName, currentIdealState);
- } catch (Exception e) {
- LOG.error("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
- }
- }
- }
- if (updatedIdealStates.size() > 0) {
- cache.getIdealStates().putAll(updatedIdealStates);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index da38ee2..1fdd892 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -19,16 +19,23 @@ package org.apache.helix.controller.stages;
* under the License.
*/
-import java.util.LinkedHashMap;
+import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelFactoryId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Resource;
import org.apache.log4j.Logger;
/**
@@ -37,102 +44,95 @@ import org.apache.log4j.Logger;
* CurrentState for liveInstance-> Helps in finding resources that are inactive
* and needs to be dropped
*/
-@Deprecated
public class ResourceComputationStage extends AbstractBaseStage {
private static Logger LOG = Logger.getLogger(ResourceComputationStage.class);
@Override
- public void process(ClusterEvent event) throws Exception {
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- if (cache == null) {
- throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
+ public void process(ClusterEvent event) throws StageException {
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ if (cluster == null) {
+ throw new StageException("Missing attributes in event: " + event + ". Requires Cluster");
}
- Map<String, IdealState> idealStates = cache.getIdealStates();
+ Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
+ Map<ResourceId, ResourceConfig> csResCfgMap = getCurStateResourceCfgMap(cluster);
- Map<String, Resource> resourceMap = new LinkedHashMap<String, Resource>();
+ // ideal-state may be removed, add all resource config in current-state but not in ideal-state
+ for (ResourceId resourceId : csResCfgMap.keySet()) {
+ if (!cluster.getResourceMap().keySet().contains(resourceId)) {
+ resCfgMap.put(resourceId, csResCfgMap.get(resourceId));
+ }
+ }
- if (idealStates != null && idealStates.size() > 0) {
- for (IdealState idealState : idealStates.values()) {
- Set<String> partitionSet = idealState.getPartitionSet();
- String resourceName = idealState.getResourceName();
+ for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
+ Resource resource = cluster.getResource(resourceId);
+ RebalancerConfig rebalancerCfg = resource.getRebalancerConfig();
- for (String partition : partitionSet) {
- addPartition(partition, resourceName, resourceMap);
- Resource resource = resourceMap.get(resourceName);
- resource.setStateModelDefRef(idealState.getStateModelDefRef());
- resource.setStateModelFactoryName(idealState.getStateModelFactoryName());
- resource.setBucketSize(idealState.getBucketSize());
- resource.setBatchMessageMode(idealState.getBatchMessageMode());
- }
- }
+ ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
+ resCfgBuilder.bucketSize(resource.getBucketSize());
+ resCfgBuilder.batchMessageMode(resource.getBatchMessageMode());
+ resCfgBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
+ resCfgBuilder.rebalancerContext(rebalancerCfg.getRebalancerContext(RebalancerContext.class));
+ resCfgMap.put(resourceId, resCfgBuilder.build());
}
- // It's important to get partitions from CurrentState as well since the
- // idealState might be removed.
- Map<String, LiveInstance> availableInstances = cache.getLiveInstances();
+ event.addAttribute(AttributeName.RESOURCES.toString(), resCfgMap);
+ }
- if (availableInstances != null && availableInstances.size() > 0) {
- for (LiveInstance instance : availableInstances.values()) {
- String instanceName = instance.getInstanceName();
- String clientSessionId = instance.getTypedSessionId().stringify();
+ /**
+ * Get resource config's from current-state
+ * @param cluster
+ * @return resource config map or empty map if not available
+ * @throws StageException
+ */
+ Map<ResourceId, ResourceConfig> getCurStateResourceCfgMap(Cluster cluster) throws StageException {
+ Map<ResourceId, ResourceConfig.Builder> resCfgBuilderMap =
+ new HashMap<ResourceId, ResourceConfig.Builder>();
+
+ Map<ResourceId, PartitionedRebalancerContext.Builder> rebCtxBuilderMap =
+ new HashMap<ResourceId, PartitionedRebalancerContext.Builder>();
+
+ for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
+ for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
+ CurrentState currentState = liveParticipant.getCurrentStateMap().get(resourceId);
+
+ if (currentState.getStateModelDefRef() == null) {
+ LOG.error("state model def is null." + "resource:" + currentState.getResourceId()
+ + ", partitions: " + currentState.getPartitionStateMap().keySet()
+ + ", states: " + currentState.getPartitionStateMap().values());
+ throw new StageException("State model def is null for resource:"
+ + currentState.getResourceId());
+ }
- Map<String, CurrentState> currentStateMap =
- cache.getCurrentState(instanceName, clientSessionId);
- if (currentStateMap == null || currentStateMap.size() == 0) {
- continue;
+ if (!resCfgBuilderMap.containsKey(resourceId)) {
+ PartitionedRebalancerContext.Builder rebCtxBuilder =
+ new PartitionedRebalancerContext.Builder(resourceId);
+ rebCtxBuilder.stateModelDefId(currentState.getStateModelDefId());
+ rebCtxBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
+ .getStateModelFactoryName()));
+ rebCtxBuilderMap.put(resourceId, rebCtxBuilder);
+
+ ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
+ resCfgBuilder.bucketSize(currentState.getBucketSize());
+ resCfgBuilder.batchMessageMode(currentState.getBatchMessageMode());
+ resCfgBuilderMap.put(resourceId, resCfgBuilder);
}
- for (CurrentState currentState : currentStateMap.values()) {
-
- String resourceName = currentState.getResourceName();
- Map<String, String> resourceStateMap = currentState.getPartitionStateMap();
-
- // don't overwrite ideal state settings
- if (!resourceMap.containsKey(resourceName)) {
- addResource(resourceName, resourceMap);
- Resource resource = resourceMap.get(resourceName);
- resource.setStateModelDefRef(currentState.getStateModelDefRef());
- resource.setStateModelFactoryName(currentState.getStateModelFactoryName());
- resource.setBucketSize(currentState.getBucketSize());
- resource.setBatchMessageMode(currentState.getBatchMessageMode());
- }
-
- if (currentState.getStateModelDefRef() == null) {
- LOG.error("state model def is null." + "resource:" + currentState.getResourceName()
- + ", partitions: " + currentState.getPartitionStateMap().keySet()
- + ", states: " + currentState.getPartitionStateMap().values());
- throw new StageException("State model def is null for resource:"
- + currentState.getResourceName());
- }
-
- for (String partition : resourceStateMap.keySet()) {
- addPartition(partition, resourceName, resourceMap);
- }
+
+ PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
+ for (PartitionId partitionId : currentState.getTypedPartitionStateMap().keySet()) {
+ rebCtxBuilder.addPartition(new Partition(partitionId));
}
}
}
- event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
- }
-
- private void addResource(String resource, Map<String, Resource> resourceMap) {
- if (resource == null || resourceMap == null) {
- return;
- }
- if (!resourceMap.containsKey(resource)) {
- resourceMap.put(resource, new Resource(resource));
- }
- }
-
- private void addPartition(String partition, String resourceName, Map<String, Resource> resourceMap) {
- if (resourceName == null || partition == null || resourceMap == null) {
- return;
- }
- if (!resourceMap.containsKey(resourceName)) {
- resourceMap.put(resourceName, new Resource(resourceName));
+ Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
+ for (ResourceId resourceId : resCfgBuilderMap.keySet()) {
+ ResourceConfig.Builder resCfgBuilder = resCfgBuilderMap.get(resourceId);
+ PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
+ resCfgBuilder.rebalancerContext(rebCtxBuilder.build());
+ resCfgMap.put(resourceId, resCfgBuilder.build());
}
- Resource resource = resourceMap.get(resourceName);
- resource.addPartition(partition);
+ return resCfgMap;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index c942db9..02188be 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -30,16 +30,17 @@ import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
import org.apache.log4j.Logger;
-@Deprecated
public class TaskAssignmentStage extends AbstractBaseStage {
private static Logger logger = Logger.getLogger(TaskAssignmentStage.class);
@@ -49,30 +50,30 @@ public class TaskAssignmentStage extends AbstractBaseStage {
logger.info("START TaskAssignmentStage.process()");
HelixManager manager = event.getAttribute("helixmanager");
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
- MessageThrottleStageOutput messageOutput =
- event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
- ClusterDataCache cache = event.getAttribute("ClusterDataCache");
- Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
-
- if (manager == null || resourceMap == null || messageOutput == null || cache == null
- || liveInstanceMap == null) {
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
+
+ if (manager == null || resourceMap == null || messageOutput == null || cluster == null
+ || liveParticipantMap == null) {
throw new StageException("Missing attributes in event:" + event
+ ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache|liveInstanceMap");
}
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
List<Message> messagesToSend = new ArrayList<Message>();
- for (String resourceName : resourceMap.keySet()) {
- Resource resource = resourceMap.get(resourceName);
- for (Partition partition : resource.getPartitions()) {
- List<Message> messages = messageOutput.getMessages(resourceName, partition);
+ for (ResourceId resourceId : resourceMap.keySet()) {
+ ResourceConfig resource = resourceMap.get(resourceId);
+ for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
+ List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
messagesToSend.addAll(messages);
}
}
List<Message> outputMessages =
- batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveInstanceMap,
+ batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveParticipantMap,
manager.getProperties());
sendMessages(dataAccessor, outputMessages);
@@ -82,8 +83,8 @@ public class TaskAssignmentStage extends AbstractBaseStage {
}
List<Message> batchMessage(Builder keyBuilder, List<Message> messages,
- Map<String, Resource> resourceMap, Map<String, LiveInstance> liveInstanceMap,
- HelixManagerProperties properties) {
+ Map<ResourceId, ResourceConfig> resourceMap,
+ Map<ParticipantId, Participant> liveParticipantMap, HelixManagerProperties properties) {
// group messages by its CurrentState path + "/" + fromState + "/" + toState
Map<String, Message> batchMessages = new HashMap<String, Message>();
List<Message> outputMessages = new ArrayList<Message>();
@@ -92,13 +93,13 @@ public class TaskAssignmentStage extends AbstractBaseStage {
while (iter.hasNext()) {
Message message = iter.next();
ResourceId resourceId = message.getResourceId();
- Resource resource = resourceMap.get(resourceId.stringify());
+ ResourceConfig resource = resourceMap.get(resourceId);
- String instanceName = message.getTgtName();
- LiveInstance liveInstance = liveInstanceMap.get(instanceName);
+ ParticipantId participantId = ParticipantId.from(message.getTgtName());
+ Participant liveParticipant = liveParticipantMap.get(participantId);
String participantVersion = null;
- if (liveInstance != null) {
- participantVersion = liveInstance.getTypedHelixVersion().toString();
+ if (liveParticipant != null) {
+ participantVersion = liveParticipant.getRunningInstance().getVersion().toString();
}
if (resource == null || !resource.getBatchMessageMode() || participantVersion == null
@@ -137,10 +138,10 @@ public class TaskAssignmentStage extends AbstractBaseStage {
+ " transit " + message.getPartitionId() + "|" + message.getPartitionIds() + " from:"
+ message.getTypedFromState() + " to:" + message.getTypedToState());
- // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to " +
- // message.getTgtName()
- // + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
- // + " from: " + message.getFromState() + " to: " + message.getToState());
+ // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to "
+ // + message.getTgtName() + " transit " + message.getPartitionId() + "|"
+ // + message.getPartitionIds() + " from: " + message.getFromState() + " to: "
+ // + message.getToState());
keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 7d84258..09aed50 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -38,7 +38,8 @@ import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.rebalancer.context.RebalancerRef;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
import org.apache.log4j.Logger;
import com.google.common.base.Function;
@@ -169,7 +170,7 @@ public class IdealState extends HelixProperty {
}
/**
- * Define a custom rebalancer that implements {@link Rebalancer}
+ * Define a custom rebalancer that implements {@link HelixRebalancer}
* @param rebalancerClassName the name of the custom rebalancing class
*/
public void setRebalancerClassName(String rebalancerClassName) {
@@ -310,6 +311,15 @@ public class IdealState extends HelixProperty {
/**
* Set the current mapping of a partition
+ * @param partitionName the partition to set
+ * @param instanceStateMap (participant name, state) pairs
+ */
+ public void setInstanceStateMap(String partitionName, Map<String, String> instanceStateMap) {
+ _record.setMapField(partitionName, instanceStateMap);
+ }
+
+ /**
+ * Set the current mapping of a partition
* @param partitionId the partition to set
* @param participantStateMap (participant id, state) pairs
*/
@@ -385,6 +395,15 @@ public class IdealState extends HelixProperty {
/**
* Set the preference list of a partition
+ * @param partitionName the name of the partition to set
+ * @param preferenceList a list of participants that can serve replicas of the partition
+ */
+ public void setPreferenceList(String partitionName, List<String> preferenceList) {
+ _record.setListField(partitionName, preferenceList);
+ }
+
+ /**
+ * Set the preference list of a partition
* @param partitionId the id of the partition to set
* @param preferenceList a list of participants that can serve replicas of the partition
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 2bd313a..d465a80 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -910,7 +910,7 @@ public class Message extends HelixProperty {
* Get controller message id, used for scheduler-task-queue state model only
* @return controller message id
*/
- public String getControllerMessagId() {
+ public String getControllerMessageId() {
return _record.getSimpleField(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
index 96d0ca7..8672e7e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -33,7 +33,6 @@ import org.apache.helix.api.id.ResourceId;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
@@ -48,10 +47,9 @@ import com.google.common.collect.Maps;
* can be in s1.
*/
public class ResourceAssignment extends HelixProperty {
-
/**
* Initialize an empty mapping
- * @param resourceName the resource being mapped
+ * @param resourceId the resource being mapped
*/
public ResourceAssignment(ResourceId resourceId) {
super(resourceId.stringify());
@@ -86,14 +84,6 @@ public class ResourceAssignment extends HelixProperty {
}
/**
- * Get the currently mapped partitions
- * @return list of Partition objects (immutable)
- */
- public List<String> getMappedPartitions() {
- return Lists.newArrayList(_record.getMapFields().keySet());
- }
-
- /**
* Get the entire map of a resource
* @return map of partition to participant to state
*/
@@ -121,19 +111,6 @@ public class ResourceAssignment extends HelixProperty {
}
/**
- * Get the participant, state pairs for a partition
- * @param partition the Partition to look up
- * @return map of (participant id, state)
- */
- public Map<String, String> getReplicaMap(String partitionId) {
- Map<String, String> rawReplicaMap = _record.getMapField(partitionId);
- if (rawReplicaMap == null) {
- return Collections.emptyMap();
- }
- return rawReplicaMap;
- }
-
- /**
* Add participant, state pairs for a partition
* @param partitionId the partition to set
* @param replicaMap map of (participant name, state)
@@ -147,15 +124,6 @@ public class ResourceAssignment extends HelixProperty {
}
/**
- * Add participant, state pairs for a partition
- * @param partitionId the partition to set
- * @param replicaMap map of (participant name, state)
- */
- public void addReplicaMap(String partitionId, Map<String, String> replicaMap) {
- _record.setMapField(partitionId, replicaMap);
- }
-
- /**
* Helper for converting a map of strings to a concrete replica map
* @param rawMap map of participant name to state name
* @return map of participant id to state
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
index a9a6e49..2e759e6 100644
--- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
@@ -153,7 +153,7 @@ public class StateModelDefinition extends HelixProperty {
* Get an ordered priority list of transitions
* @return transitions in the form SRC-DEST, the first of which is highest priority
*/
- public List<String> getStateTransitionPriorityStringList() {
+ public List<String> getStateTransitionPriorityList() {
return _stateTransitionPriorityList;
}
@@ -161,9 +161,9 @@ public class StateModelDefinition extends HelixProperty {
* Get an ordered priority list of transitions
* @return Transition objects, the first of which is highest priority (immutable)
*/
- public List<Transition> getStateTransitionPriorityList() {
+ public List<Transition> getTypedStateTransitionPriorityList() {
ImmutableList.Builder<Transition> builder = new ImmutableList.Builder<Transition>();
- for (String transition : getStateTransitionPriorityStringList()) {
+ for (String transition : getStateTransitionPriorityList()) {
String fromState = transition.substring(0, transition.indexOf('-'));
String toState = transition.substring(transition.indexOf('-') + 1);
builder.add(Transition.from(State.from(fromState), State.from(toState)));
@@ -283,8 +283,8 @@ public class StateModelDefinition extends HelixProperty {
Map<String, String> stateConstraintMap;
/**
- * Start building a state model with a name
- * @param name state model name
+ * Start building a state model with a id
+ * @param stateModelDefId state model id
*/
public Builder(StateModelDefId stateModelDefId) {
this._statemodelName = stateModelDefId.stringify();
@@ -294,9 +294,17 @@ public class StateModelDefinition extends HelixProperty {
}
/**
+ * Start building a state model with a name
+ * @param stateModelDefId state model name
+ */
+ public Builder(String stateModelName) {
+ this(StateModelDefId.from(stateModelName));
+ }
+
+ /**
* initial state of a replica when it starts, most commonly used initial
* state is OFFLINE
- * @param state
+ * @param initialState
*/
public Builder initialState(State initialState) {
return initialState(initialState.toString());
@@ -305,7 +313,7 @@ public class StateModelDefinition extends HelixProperty {
/**
* initial state of a replica when it starts, most commonly used initial
* state is OFFLINE
- * @param state
+ * @param initialState
*/
public Builder initialState(String initialState) {
this.initialState = initialState;
@@ -318,7 +326,8 @@ public class StateModelDefinition extends HelixProperty {
* STATE2 has a constraint of 3 but only one node is up then Helix will uses
* the priority to see STATE constraint has to be given higher preference <br/>
* Use -1 to indicates states with no constraints, like OFFLINE
- * @param states
+ * @param state the state to add
+ * @param priority the state priority, lower number is higher priority
*/
public Builder addState(State state, int priority) {
return addState(state.toString(), priority);
@@ -330,7 +339,8 @@ public class StateModelDefinition extends HelixProperty {
* STATE2 has a constraint of 3 but only one node is up then Helix will uses
* the priority to see STATE constraint has to be given higher preference <br/>
* Use -1 to indicates states with no constraints, like OFFLINE
- * @param states
+ * @param state the state to add
+ * @param priority the state priority, lower number is higher priority
*/
public Builder addState(String state, int priority) {
statesMap.put(state, priority);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/builder/AutoModeISBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/AutoModeISBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/AutoModeISBuilder.java
index 72b2bc9..cda2f9e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/AutoModeISBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/AutoModeISBuilder.java
@@ -22,24 +22,84 @@ package org.apache.helix.model.builder;
import java.util.ArrayList;
import java.util.Arrays;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
import org.apache.helix.model.IdealState.RebalanceMode;
+/**
+ * IdealState builder for SEMI_AUTO mode
+ */
public class AutoModeISBuilder extends IdealStateBuilder {
+ /**
+ * Start building a SEMI_AUTO IdealState
+ * @param resourceName the resource
+ */
public AutoModeISBuilder(String resourceName) {
super(resourceName);
setRebalancerMode(RebalanceMode.SEMI_AUTO);
}
- public void add(String partitionName) {
+ /**
+ * Start building a SEMI_AUTO IdealState
+ * @param resourceId the resource
+ */
+ public AutoModeISBuilder(ResourceId resourceId) {
+ this(resourceId.stringify());
+ }
+
+ /**
+ * Add a partition; Helix will assign replicas of the partition according to preference lists
+ * @param partitionName the name of the new partition
+ * @return AutoModeISBuilder
+ */
+ public AutoModeISBuilder add(String partitionName) {
if (_record.getListField(partitionName) == null) {
_record.setListField(partitionName, new ArrayList<String>());
}
+ return this;
+ }
+
+ /**
+ * Add a partition; Helix will assign replicas of the partition according to preference lists
+ * @param partitionId the id of the new partition
+ * @return AutoModeISBuilder
+ */
+ public AutoModeISBuilder add(PartitionId partitionId) {
+ if (partitionId != null) {
+ add(partitionId.stringify());
+ }
+ return this;
}
+ /**
+ * Define where replicas of a partition should live
+ * @param partitionName the partition
+ * @param instanceNames ordered list of participant names
+ * @return AutoModeISBuilder
+ */
public AutoModeISBuilder assignPreferenceList(String partitionName, String... instanceNames) {
add(partitionName);
_record.getListField(partitionName).addAll(Arrays.asList(instanceNames));
return this;
}
+ /**
+ * Define where replicas of a partition should live
+ * @param partitionId the partition
+ * @param participantIds ordered list of participant ids
+ * @return AutoModeISBuilder
+ */
+ public AutoModeISBuilder assignPreferenceList(PartitionId partitionId,
+ ParticipantId... participantIds) {
+ if (partitionId != null) {
+ String[] participantNames = new String[participantIds.length];
+ for (int i = 0; i < participantIds.length; i++) {
+ participantNames[i] = participantIds[i].stringify();
+ }
+ assignPreferenceList(partitionId.stringify(), participantNames);
+ }
+ return this;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/builder/AutoRebalanceModeISBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/AutoRebalanceModeISBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/AutoRebalanceModeISBuilder.java
index bfb958d..8ac3b82 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/AutoRebalanceModeISBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/AutoRebalanceModeISBuilder.java
@@ -21,24 +21,54 @@ package org.apache.helix.model.builder;
import java.util.ArrayList;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
import org.apache.helix.model.IdealState.RebalanceMode;
+/**
+ * IdealState builder for FULL_AUTO mode
+ */
public class AutoRebalanceModeISBuilder extends IdealStateBuilder {
+ /**
+ * Start building a SEMI_AUTO IdealState
+ * @param resourceName the resource
+ */
public AutoRebalanceModeISBuilder(String resourceName) {
super(resourceName);
setRebalancerMode(RebalanceMode.FULL_AUTO);
}
/**
+ * Start building a SEMI_AUTO IdealState
+ * @param resourceId the resource
+ */
+ public AutoRebalanceModeISBuilder(ResourceId resourceId) {
+ this(resourceId.stringify());
+ }
+
+ /**
* Add a partition, Helix will automatically assign the placement and state
* for this partition at runtime.
- * @param partitionName
+ * @param partitionName the partition to add
+ * @return AutoRebalanceModeISBuilder
*/
public AutoRebalanceModeISBuilder add(String partitionName) {
if (_record.getListField(partitionName) == null) {
_record.setListField(partitionName, new ArrayList<String>());
}
+ return this;
+ }
+ /**
+ * Add a partition, Helix will automatically assign the placement and state
+ * for this partition at runtime.
+ * @param partitionId the partition to add
+ * @return AutoRebalanceModeISBuilder
+ */
+ public AutoRebalanceModeISBuilder add(PartitionId partitionId) {
+ if (partitionId != null) {
+ add(partitionId.stringify());
+ }
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/builder/ClusterConstraintsBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/ClusterConstraintsBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/ClusterConstraintsBuilder.java
index f329daa..13b2a7e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/ClusterConstraintsBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/ClusterConstraintsBuilder.java
@@ -25,11 +25,8 @@ import java.util.Map;
import org.apache.helix.api.id.ConstraintId;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.log4j.Logger;
public class ClusterConstraintsBuilder {
- private static Logger LOG = Logger.getLogger(ClusterConstraintsBuilder.class);
-
final private ConstraintType _constraintType;
/**
@@ -61,6 +58,11 @@ public class ClusterConstraintsBuilder {
return this;
}
+ public ClusterConstraintsBuilder addConstraintAttribute(String constraintId, String attribute,
+ String value) {
+ return addConstraintAttribute(ConstraintId.from(constraintId), attribute, value);
+ }
+
public ClusterConstraints build() {
ClusterConstraints constraints = new ClusterConstraints(_constraintType);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/builder/CurrentStateBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/CurrentStateBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/CurrentStateBuilder.java
index 0519979..fb6235f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/CurrentStateBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/CurrentStateBuilder.java
@@ -9,6 +9,7 @@ import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SessionId;
import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.id.StateModelFactoryId;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.CurrentState.CurrentStateProperty;
@@ -39,7 +40,7 @@ public class CurrentStateBuilder {
private final Map<PartitionId, State> _partitionStateMap;
private SessionId _sessionId;
private StateModelDefId _stateModelDefId;
- private String _stateModelFactoryName;
+ private StateModelFactoryId _stateModelFactoryId;
/**
* Build a current state for a given resource
@@ -93,11 +94,11 @@ public class CurrentStateBuilder {
/**
* Set the name of the state model factory
- * @param stateModelFactoryName state model factory identifier
+ * @param stateModelFactoryIde state model factory identifier
* @return CurrentStateBuilder
*/
- public CurrentStateBuilder stateModelFactory(String stateModelFactoryName) {
- _stateModelFactoryName = stateModelFactoryName;
+ public CurrentStateBuilder stateModelFactory(StateModelFactoryId stateModelFactoryId) {
+ _stateModelFactoryId = stateModelFactoryId;
return this;
}
@@ -117,7 +118,7 @@ public class CurrentStateBuilder {
record.setSimpleField(CurrentStateProperty.STATE_MODEL_DEF.toString(),
_stateModelDefId.toString());
record.setSimpleField(CurrentStateProperty.STATE_MODEL_FACTORY_NAME.toString(),
- _stateModelFactoryName);
+ _stateModelFactoryId.toString());
return new CurrentState(record);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/builder/CustomModeISBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/CustomModeISBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/CustomModeISBuilder.java
index 65c40a0..566452a 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/CustomModeISBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/CustomModeISBuilder.java
@@ -22,31 +22,63 @@ package org.apache.helix.model.builder;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
import org.apache.helix.model.IdealState.RebalanceMode;
+/**
+ * IdealState builder for CUSTOMIZED mode
+ */
public class CustomModeISBuilder extends IdealStateBuilder {
-
+ /**
+ * Start building a CUSTOMIZED IdealState
+ * @param resourceName the resource
+ */
public CustomModeISBuilder(String resourceName) {
super(resourceName);
setRebalancerMode(RebalanceMode.CUSTOMIZED);
}
/**
+ * Start building a SEMI_AUTO IdealState
+ * @param resourceId the resource
+ */
+ public CustomModeISBuilder(ResourceId resourceId) {
+ this(resourceId.stringify());
+ }
+
+ /**
* Add a sub-resource
- * @param partitionName
+ * @param partitionName partition to add
+ * @return CustomModeISBuilder
*/
- public void add(String partitionName) {
+ public CustomModeISBuilder add(String partitionName) {
if (_record.getMapField(partitionName) == null) {
_record.setMapField(partitionName, new TreeMap<String, String>());
}
+ return this;
+ }
+
+ /**
+ * Add a sub-resource
+ * @param partitionId partition to add
+ * @return CustomModeISBuilder
+ */
+ public CustomModeISBuilder add(PartitionId partitionId) {
+ if (partitionId != null) {
+ add(partitionId.stringify());
+ }
+ return this;
}
/**
* add an instance->state assignment
- * @param partitionName
- * @param instanceName
- * @param state
- * @return
+ * @param partitionName partition to update
+ * @param instanceName participant name
+ * @param state state the replica should be in
+ * @return CustomModeISBuilder
*/
public CustomModeISBuilder assignInstanceAndState(String partitionName, String instanceName,
String state) {
@@ -56,4 +88,19 @@ public class CustomModeISBuilder extends IdealStateBuilder {
return this;
}
+ /**
+ * add an instance->state assignment
+ * @param partitionId partition to update
+ * @param participantId participant to assign to
+ * @param state state the replica should be in
+ * @return CustomModeISBuilder
+ */
+ public CustomModeISBuilder assignParticipantAndState(PartitionId partitionId,
+ ParticipantId participantId, State state) {
+ if (partitionId != null && participantId != null && state != null) {
+ assignInstanceAndState(partitionId.stringify(), participantId.stringify(), state.toString());
+ }
+ return this;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/model/builder/ExternalViewBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/ExternalViewBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/ExternalViewBuilder.java
deleted file mode 100644
index 5fe099f..0000000
--- a/helix-core/src/main/java/org/apache/helix/model/builder/ExternalViewBuilder.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.helix.model.builder;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public class ExternalViewBuilder {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index f591a24..1563769 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -55,10 +55,10 @@ import org.apache.helix.controller.pipeline.StageContext;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.NewBestPossibleStateOutput;
-import org.apache.helix.controller.stages.NewCurrentStateComputationStage;
-import org.apache.helix.controller.stages.NewResourceComputationStage;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
@@ -250,7 +250,7 @@ public class ClusterStateVerifier {
ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
Cluster cluster = clusterAccessor.readCluster();
// calculate best possible state
- NewBestPossibleStateOutput bestPossOutput = ClusterStateVerifier.calcBestPossState(cluster);
+ BestPossibleStateOutput bestPossOutput = ClusterStateVerifier.calcBestPossState(cluster);
// set error states
if (errStates != null) {
@@ -416,19 +416,19 @@ public class ClusterStateVerifier {
* @throws Exception
*/
- static NewBestPossibleStateOutput calcBestPossState(Cluster cluster) throws Exception {
+ static BestPossibleStateOutput calcBestPossState(Cluster cluster) throws Exception {
ClusterEvent event = new ClusterEvent("sampleEvent");
event.addAttribute("ClusterDataCache", cluster);
- NewResourceComputationStage rcState = new NewResourceComputationStage();
- NewCurrentStateComputationStage csStage = new NewCurrentStateComputationStage();
- NewBestPossibleStateCalcStage bpStage = new NewBestPossibleStateCalcStage();
+ ResourceComputationStage rcState = new ResourceComputationStage();
+ CurrentStateComputationStage csStage = new CurrentStateComputationStage();
+ BestPossibleStateCalcStage bpStage = new BestPossibleStateCalcStage();
runStage(event, rcState);
runStage(event, csStage);
runStage(event, bpStage);
- NewBestPossibleStateOutput output =
+ BestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
return output;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index c478bbb..369ad68 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -36,8 +36,8 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.NewBestPossibleStateOutput;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -130,13 +130,13 @@ public class TestNewStages extends ZkUnitTestBase {
// Run the stage
try {
- new NewBestPossibleStateCalcStage().process(event);
+ new BestPossibleStateCalcStage().process(event);
} catch (Exception e) {
Assert.fail(e.toString());
}
// Verify the result
- NewBestPossibleStateOutput bestPossibleStateOutput =
+ BestPossibleStateOutput bestPossibleStateOutput =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
Assert.assertNotNull(bestPossibleStateOutput);
ResourceId resourceId = ResourceId.from("TestDB0");
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
index 18e8f4d..cb60691 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
@@ -62,12 +62,12 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
- NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
+ ReadClusterDataStage stage1 = new ReadClusterDataStage();
runStage(event, stage1);
- NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
+ BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
runStage(event, stage2);
- NewBestPossibleStateOutput output =
+ BestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
for (int p = 0; p < 5; p++) {
Map<ParticipantId, State> replicaMap =
@@ -100,12 +100,12 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
- NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
+ ReadClusterDataStage stage1 = new ReadClusterDataStage();
runStage(event, stage1);
- NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
+ BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
runStage(event, stage2);
- NewBestPossibleStateOutput output =
+ BestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
for (int p = 0; p < 5; p++) {
Map<ParticipantId, State> replicaMap =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5405df1e/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
index d3f348e..d116182 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
@@ -52,12 +52,12 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest {
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
- NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
+ ReadClusterDataStage stage1 = new ReadClusterDataStage();
runStage(event, stage1);
- NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
+ BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
runStage(event, stage2);
- NewBestPossibleStateOutput output =
+ BestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
for (int p = 0; p < 5; p++) {
Map<ParticipantId, State> replicaMap =