You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/05 02:18:36 UTC
git commit: [HELIX-18] Added more functionality to the new cluster
setup
Updated Branches:
refs/heads/helix-logical-model 129590d45 -> aa1bd8524
[HELIX-18] Added more functionality to the new cluster setup
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/aa1bd852
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/aa1bd852
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/aa1bd852
Branch: refs/heads/helix-logical-model
Commit: aa1bd8524c81982f71bcce8c986a9b72f87ae2f2
Parents: 129590d
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Oct 4 17:17:48 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Oct 4 17:17:48 2013 -0700
----------------------------------------------------------------------
.../helix/api/accessor/ClusterAccessor.java | 46 ++--
.../helix/api/accessor/ParticipantAccessor.java | 234 ++++++++++++++++++-
.../helix/api/accessor/ResourceAccessor.java | 91 ++++++++
.../rebalancer/context/CustomRebalancer.java | 26 +--
.../context/CustomRebalancerContext.java | 49 +++-
.../rebalancer/context/FullAutoRebalancer.java | 41 ++--
.../context/PartitionedRebalancerContext.java | 13 ++
.../rebalancer/context/RebalancerConfig.java | 9 +
.../rebalancer/context/SemiAutoRebalancer.java | 13 +-
.../context/SemiAutoRebalancerContext.java | 59 +++++
.../util/NewConstraintBasedAssignment.java | 50 ++--
.../stages/NewBestPossibleStateCalcStage.java | 9 +-
.../strategy/AutoRebalanceStrategy.java | 56 ++++-
.../org/apache/helix/tools/NewClusterSetup.java | 128 +++++++++-
.../strategy/TestAutoRebalanceStrategy.java | 37 +--
.../strategy/TestNewAutoRebalanceStrategy.java | 9 +-
16 files changed, 747 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aa1bd852/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 5c7df85..6061c47 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -39,6 +39,7 @@ import org.apache.helix.api.config.ParticipantConfig;
import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ConstraintId;
import org.apache.helix.api.id.ControllerId;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
@@ -351,6 +352,31 @@ public class ClusterAccessor {
}
/**
+ * Get cluster constraints of a given type
+ * @param type ConstraintType value
+ * @return ClusterConstraints, or null if none present
+ */
+ public ClusterConstraints readConstraints(ConstraintType type) {
+ return _accessor.getProperty(_keyBuilder.constraint(type.toString()));
+ }
+
+ /**
+ * Remove a constraint from the cluster
+ * @param type the constraint type
+ * @param constraintId the constraint id
+ * @return true if removed, false otherwise
+ */
+ public boolean removeConstraint(ConstraintType type, ConstraintId constraintId) {
+ ClusterConstraints constraints = _accessor.getProperty(_keyBuilder.constraint(type.toString()));
+ if (constraints == null || constraints.getConstraintItem(constraintId) == null) {
+ LOG.error("Constraint with id " + constraintId + " not present");
+ return false;
+ }
+ constraints.removeConstraintItem(constraintId);
+ return _accessor.setProperty(_keyBuilder.constraint(type.toString()), constraints);
+ }
+
+ /**
* Read the user config of the cluster
* @return UserConfig, or null
*/
@@ -600,24 +626,8 @@ public class ClusterAccessor {
* @return true if participant dropped, false if there was an error
*/
public boolean dropParticipantFromCluster(ParticipantId participantId) {
- if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) {
- LOG.error("Config for participant: " + participantId + " does NOT exist in cluster: "
- + _clusterId);
- return false;
- }
-
- if (_accessor.getProperty(_keyBuilder.instance(participantId.stringify())) == null) {
- LOG.error("Participant: " + participantId + " structure does NOT exist in cluster: "
- + _clusterId);
- return false;
- }
-
- // delete participant config path
- _accessor.removeProperty(_keyBuilder.instanceConfig(participantId.stringify()));
-
- // delete participant path
- _accessor.removeProperty(_keyBuilder.instance(participantId.stringify()));
- return true;
+ ParticipantAccessor accessor = new ParticipantAccessor(_accessor);
+ return accessor.dropParticipant(participantId);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aa1bd852/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index e139c2e..c53bcd8 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -19,6 +19,8 @@ package org.apache.helix.api.accessor;
* under the License.
*/
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -26,17 +28,21 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import org.I0Itec.zkclient.DataUpdater;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.Participant;
+import org.apache.helix.api.Resource;
import org.apache.helix.api.RunningInstance;
import org.apache.helix.api.Scope;
+import org.apache.helix.api.State;
import org.apache.helix.api.config.ParticipantConfig;
import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.MessageId;
@@ -44,15 +50,27 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SessionId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
public class ParticipantAccessor {
private static final Logger LOG = Logger.getLogger(ParticipantAccessor.class);
@@ -248,14 +266,131 @@ public class ParticipantAccessor {
}
/**
+ * Reset partitions assigned to a set of participants
+ * @param resetParticipantIdSet the participants to reset
+ * @return true if reset, false otherwise
+ */
+ public boolean resetParticipants(Set<ParticipantId> resetParticipantIdSet) {
+ List<ExternalView> extViews = _accessor.getChildValues(_keyBuilder.externalViews());
+ for (ParticipantId participantId : resetParticipantIdSet) {
+ for (ExternalView extView : extViews) {
+ Set<PartitionId> resetPartitionIdSet = Sets.newHashSet();
+ for (PartitionId partitionId : extView.getPartitionSet()) {
+ Map<ParticipantId, State> stateMap = extView.getStateMap(partitionId);
+ if (stateMap.containsKey(participantId)
+ && stateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR))) {
+ resetPartitionIdSet.add(partitionId);
+ }
+ }
+ resetPartitionsForParticipant(participantId, extView.getResourceId(), resetPartitionIdSet);
+ }
+ }
+ return true;
+ }
+
+ /**
* reset partitions on a participant
* @param participantId
* @param resourceId
* @param resetPartitionIdSet
+ * @return true if partitions reset, false otherwise
*/
- public void resetPartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
+ public boolean resetPartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
Set<PartitionId> resetPartitionIdSet) {
- // TODO impl this
+ // make sure the participant is running
+ Participant participant = readParticipant(participantId);
+ if (!participant.isAlive()) {
+ LOG.error("Cannot reset partitions because the participant is not running");
+ return false;
+ }
+ RunningInstance runningInstance = participant.getRunningInstance();
+
+ // check that the resource exists
+ ResourceAccessor resourceAccessor = new ResourceAccessor(_accessor);
+ Resource resource = resourceAccessor.readResource(resourceId);
+ if (resource == null || resource.getRebalancerConfig() == null) {
+ LOG.error("Cannot reset partitions because the resource is not present");
+ return false;
+ }
+
+ // need the rebalancer context for the resource
+ RebalancerContext context =
+ resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
+ if (context == null) {
+ LOG.error("Rebalancer context for resource does not exist");
+ return false;
+ }
+
+ // ensure that all partitions to reset exist
+ Set<PartitionId> partitionSet = ImmutableSet.copyOf(context.getSubUnitIdSet());
+ if (!partitionSet.containsAll(resetPartitionIdSet)) {
+ LOG.error("Not all of the specified partitions to reset exist for the resource");
+ return false;
+ }
+
+ // check for a valid current state that has all specified partitions in ERROR state
+ CurrentState currentState = participant.getCurrentStateMap().get(resourceId);
+ if (currentState == null) {
+ LOG.error("The participant does not have a current state for the resource");
+ return false;
+ }
+ for (PartitionId partitionId : resetPartitionIdSet) {
+ if (!currentState.getState(partitionId).equals(State.from(HelixDefinedState.ERROR))) {
+ LOG.error("Partition " + partitionId + " is not in error state, aborting reset");
+ return false;
+ }
+ }
+
+ // make sure that there are no pending transition messages
+ for (Message message : participant.getMessageMap().values()) {
+ if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())
+ || !runningInstance.getSessionId().equals(message.getTgtSessionId())
+ || !resourceId.equals(message.getResourceId())
+ || !resetPartitionIdSet.contains(message.getPartitionId())) {
+ continue;
+ }
+ LOG.error("Cannot reset partitions because of the following pending message: " + message);
+ return false;
+ }
+
+ // set up the source id
+ String adminName = null;
+ try {
+ adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
+ } catch (UnknownHostException e) {
+ // can ignore it
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
+ }
+ adminName = "UNKNOWN";
+ }
+
+ // build messages to signal the transition
+ StateModelDefId stateModelDefId = context.getStateModelDefId();
+ StateModelDefinition stateModelDef =
+ _accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify()));
+ Map<MessageId, Message> messageMap = Maps.newHashMap();
+ for (PartitionId partitionId : resetPartitionIdSet) {
+ // send ERROR to initialState message
+ MessageId msgId = MessageId.from(UUID.randomUUID().toString());
+ Message message = new Message(MessageType.STATE_TRANSITION, msgId);
+ message.setSrcName(adminName);
+ message.setTgtName(participantId.stringify());
+ message.setMsgState(MessageState.NEW);
+ message.setPartitionId(partitionId);
+ message.setResourceId(resourceId);
+ message.setTgtSessionId(runningInstance.getSessionId());
+ message.setStateModelDef(stateModelDefId);
+ message.setFromState(State.from(HelixDefinedState.ERROR.toString()));
+ message.setToState(stateModelDef.getInitialState());
+ message.setStateModelFactoryId(context.getStateModelFactoryId());
+
+ messageMap.put(message.getMsgId(), message);
+ }
+
+ // send the messages
+ insertMessagesToParticipant(participantId, messageMap);
+ return true;
}
/**
@@ -474,4 +609,99 @@ public class ParticipantAccessor {
_accessor.removeProperty(_keyBuilder.currentState(participantId.stringify(),
sessionId.stringify(), resourceId.stringify()));
}
+
+ /**
+ * drop a participant from cluster
+ * @param participantId
+ * @return true if participant dropped, false if there was an error
+ */
+ boolean dropParticipant(ParticipantId participantId) {
+ if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) {
+ LOG.error("Config for participant: " + participantId + " does NOT exist in cluster");
+ return false;
+ }
+
+ if (_accessor.getProperty(_keyBuilder.instance(participantId.stringify())) == null) {
+ LOG.error("Participant: " + participantId + " structure does NOT exist in cluster");
+ return false;
+ }
+
+ // delete participant config path
+ _accessor.removeProperty(_keyBuilder.instanceConfig(participantId.stringify()));
+
+ // delete participant path
+ _accessor.removeProperty(_keyBuilder.instance(participantId.stringify()));
+ return true;
+ }
+
+ /**
+ * Let a new participant take the place of an existing participant
+ * @param oldParticipantId the participant to drop
+ * @param newParticipantId the participant that takes its place
+ * @return true if swap successful, false otherwise
+ */
+ public boolean swapParticipants(ParticipantId oldParticipantId, ParticipantId newParticipantId) {
+ Participant oldParticipant = readParticipant(oldParticipantId);
+ if (oldParticipant == null) {
+ LOG.error("Could not swap participants because the old participant does not exist");
+ return false;
+ }
+ if (oldParticipant.isEnabled()) {
+ LOG.error("Could not swap participants because the old participant is still enabled");
+ return false;
+ }
+ if (oldParticipant.isAlive()) {
+ LOG.error("Could not swap participants because the old participant is still live");
+ return false;
+ }
+ Participant newParticipant = readParticipant(newParticipantId);
+ if (newParticipant == null) {
+ LOG.error("Could not swap participants because the new participant does not exist");
+ return false;
+ }
+ dropParticipant(oldParticipantId);
+ ResourceAccessor resourceAccessor = new ResourceAccessor(_accessor);
+ Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
+ for (String resourceName : idealStateMap.keySet()) {
+ IdealState idealState = idealStateMap.get(resourceName);
+ swapParticipantsInIdealState(idealState, oldParticipantId, newParticipantId);
+ PartitionedRebalancerContext context = PartitionedRebalancerContext.from(idealState);
+ resourceAccessor.setRebalancerContext(ResourceId.from(resourceName), context);
+ _accessor.setProperty(_keyBuilder.idealState(resourceName), idealState);
+ }
+ return true;
+ }
+
+ /**
+ * Replace occurrences of participants in preference lists and maps
+ * @param idealState the current ideal state
+ * @param oldParticipantId the participant to drop
+ * @param newParticipantId the participant that replaces it
+ */
+ private void swapParticipantsInIdealState(IdealState idealState, ParticipantId oldParticipantId,
+ ParticipantId newParticipantId) {
+ for (PartitionId partitionId : idealState.getPartitionSet()) {
+ List<ParticipantId> oldPreferenceList = idealState.getPreferenceList(partitionId);
+ if (oldPreferenceList != null) {
+ List<ParticipantId> newPreferenceList = Lists.newArrayList();
+ for (ParticipantId participantId : oldPreferenceList) {
+ if (participantId.equals(oldParticipantId)) {
+ newPreferenceList.add(newParticipantId);
+ } else if (!participantId.equals(newParticipantId)) {
+ newPreferenceList.add(participantId);
+ }
+ }
+ idealState.setPreferenceList(partitionId, newPreferenceList);
+ }
+ Map<ParticipantId, State> preferenceMap = idealState.getParticipantStateMap(partitionId);
+ if (preferenceMap != null) {
+ if (preferenceMap.containsKey(oldParticipantId)) {
+ State state = preferenceMap.get(oldParticipantId);
+ preferenceMap.remove(oldParticipantId);
+ preferenceMap.put(newParticipantId, state);
+ }
+ idealState.setParticipantStateMap(partitionId, preferenceMap);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aa1bd852/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index e5c9443..c65cb44 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -20,11 +20,14 @@ package org.apache.helix.api.accessor;
*/
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.helix.HelixConstants.StateModelToken;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
import org.apache.helix.PropertyKey;
import org.apache.helix.api.Resource;
import org.apache.helix.api.Scope;
@@ -43,10 +46,15 @@ import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfiguration;
+import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
public class ResourceAccessor {
private static final Logger LOG = Logger.getLogger(ResourceAccessor.class);
private final HelixDataAccessor _accessor;
@@ -144,6 +152,19 @@ public class ResourceAccessor {
RebalancerConfig config = new RebalancerConfig(context);
ResourceConfiguration resourceConfig = new ResourceConfiguration(resourceId);
resourceConfig.addNamespacedConfig(config.toNamespacedConfig());
+
+ // update the ideal state if applicable
+ IdealState oldIdealState =
+ _accessor.getProperty(_keyBuilder.idealState(resourceId.stringify()));
+ if (oldIdealState != null) {
+ IdealState idealState =
+ rebalancerConfigToIdealState(config, oldIdealState.getBucketSize(),
+ oldIdealState.getBatchMessageMode());
+ if (idealState != null) {
+ _accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
+ }
+ }
+
return _accessor.updateProperty(_keyBuilder.resourceConfig(resourceId.stringify()),
resourceConfig);
}
@@ -252,6 +273,76 @@ public class ResourceAccessor {
}
/**
+ * reset resources for all participants
+ * @param resetResourceIdSet the resources to reset
+ * @return true if they were reset, false otherwise
+ */
+ public boolean resetResources(Set<ResourceId> resetResourceIdSet) {
+ ParticipantAccessor accessor = new ParticipantAccessor(_accessor);
+ List<ExternalView> extViews = _accessor.getChildValues(_keyBuilder.externalViews());
+ for (ExternalView extView : extViews) {
+ if (!resetResourceIdSet.contains(extView.getResourceId())) {
+ continue;
+ }
+
+ Map<ParticipantId, Set<PartitionId>> resetPartitionIds = Maps.newHashMap();
+ for (PartitionId partitionId : extView.getPartitionSet()) {
+ Map<ParticipantId, State> stateMap = extView.getStateMap(partitionId);
+ for (ParticipantId participantId : stateMap.keySet()) {
+ State state = stateMap.get(participantId);
+ if (state.equals(State.from(HelixDefinedState.ERROR))) {
+ if (!resetPartitionIds.containsKey(participantId)) {
+ resetPartitionIds.put(participantId, new HashSet<PartitionId>());
+ }
+ resetPartitionIds.get(participantId).add(partitionId);
+ }
+ }
+ }
+ for (ParticipantId participantId : resetPartitionIds.keySet()) {
+ accessor.resetPartitionsForParticipant(participantId, extView.getResourceId(),
+ resetPartitionIds.get(participantId));
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Generate a default assignment for partitioned resources
+ * @param resourceId the resource to update
+ * @param replicaCount the new replica count (or -1 to use the existing one)
+ * @param participantGroupTag the new participant group tag (or null to use the existing one)
+ * @return true if assignment successful, false otherwise
+ */
+ public boolean generateDefaultAssignment(ResourceId resourceId, int replicaCount,
+ String participantGroupTag) {
+ Resource resource = readResource(resourceId);
+ RebalancerConfig config = resource.getRebalancerConfig();
+ PartitionedRebalancerContext context =
+ config.getRebalancerContext(PartitionedRebalancerContext.class);
+ if (context == null) {
+ LOG.error("Only partitioned resource types are supported");
+ return false;
+ }
+ if (replicaCount != -1) {
+ context.setReplicaCount(replicaCount);
+ }
+ if (participantGroupTag != null) {
+ context.setParticipantGroupTag(participantGroupTag);
+ }
+ StateModelDefinition stateModelDef =
+ _accessor.getProperty(_keyBuilder.stateModelDef(context.getStateModelDefId().stringify()));
+ List<InstanceConfig> participantConfigs =
+ _accessor.getChildValues(_keyBuilder.instanceConfigs());
+ Set<ParticipantId> participantSet = Sets.newHashSet();
+ for (InstanceConfig participantConfig : participantConfigs) {
+ participantSet.add(participantConfig.getParticipantId());
+ }
+ context.generateDefaultConfiguration(stateModelDef, participantSet);
+ setRebalancerContext(resourceId, context);
+ return true;
+ }
+
+ /**
* Get an ideal state from a rebalancer config if the resource is partitioned
* @param config RebalancerConfig instance
* @param bucketSize bucket size to use
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aa1bd852/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java
index fb0c512..d245fae 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancer.java
@@ -7,7 +7,6 @@ import java.util.Set;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Participant;
import org.apache.helix.api.State;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
@@ -46,8 +45,8 @@ public class CustomRebalancer implements Rebalancer {
}
@Override
- public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster,
- ResourceCurrentState currentState) {
+ public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+ Cluster cluster, ResourceCurrentState currentState) {
CustomRebalancerContext config =
rebalancerConfig.getRebalancerContext(CustomRebalancerContext.class);
StateModelDefinition stateModelDef =
@@ -63,8 +62,9 @@ public class CustomRebalancer implements Rebalancer {
NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
partition);
Map<ParticipantId, State> bestStateForPartition =
- computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap(), stateModelDef,
- config.getPreferenceMap(partition), currentStateMap, disabledInstancesForPartition);
+ computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap().keySet(),
+ stateModelDef, config.getPreferenceMap(partition), currentStateMap,
+ disabledInstancesForPartition);
partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
@@ -74,14 +74,14 @@ public class CustomRebalancer implements Rebalancer {
* compute best state for resource in CUSTOMIZED rebalancer mode
* @param liveParticipantMap
* @param stateModelDef
- * @param idealStateMap
+ * @param preferenceMap
* @param currentStateMap
* @param disabledParticipantsForPartition
* @return
*/
private Map<ParticipantId, State> computeCustomizedBestStateForPartition(
- Map<ParticipantId, Participant> liveParticipantMap, StateModelDefinition stateModelDef,
- Map<ParticipantId, State> idealStateMap, Map<ParticipantId, State> currentStateMap,
+ Set<ParticipantId> liveParticipantSet, StateModelDefinition stateModelDef,
+ Map<ParticipantId, State> preferenceMap, Map<ParticipantId, State> currentStateMap,
Set<ParticipantId> disabledParticipantsForPartition) {
Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
@@ -89,7 +89,7 @@ public class CustomRebalancer implements Rebalancer {
// we should drop all resources.
if (currentStateMap != null) {
for (ParticipantId participantId : currentStateMap.keySet()) {
- if ((idealStateMap == null || !idealStateMap.containsKey(participantId))
+ if ((preferenceMap == null || !preferenceMap.containsKey(participantId))
&& !disabledParticipantsForPartition.contains(participantId)) {
// if dropped and not disabled, transit to DROPPED
participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
@@ -103,18 +103,18 @@ public class CustomRebalancer implements Rebalancer {
}
// ideal state is deleted
- if (idealStateMap == null) {
+ if (preferenceMap == null) {
return participantStateMap;
}
- for (ParticipantId participantId : idealStateMap.keySet()) {
+ for (ParticipantId participantId : preferenceMap.keySet()) {
boolean notInErrorState =
currentStateMap == null || currentStateMap.get(participantId) == null
|| !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
- if (liveParticipantMap.containsKey(participantId) && notInErrorState
+ if (liveParticipantSet.contains(participantId) && notInErrorState
&& !disabledParticipantsForPartition.contains(participantId)) {
- participantStateMap.put(participantId, idealStateMap.get(participantId));
+ participantStateMap.put(participantId, preferenceMap.get(participantId));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aa1bd852/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
index 3ccce3d..904907e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
@@ -1,14 +1,25 @@
package org.apache.helix.controller.rebalancer.context;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.helix.api.State;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
import org.codehaus.jackson.annotate.JsonIgnore;
-import org.testng.collections.Maps;
+
+import com.google.common.collect.Maps;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -72,6 +83,42 @@ public class CustomRebalancerContext extends PartitionedRebalancerContext {
}
/**
+ * Generate preference maps based on a default cluster setup
+ * @param stateModelDef the state model definition to follow
+ * @param participantSet the set of participant ids to configure for
+ */
+ @Override
+ @JsonIgnore
+ public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+ Set<ParticipantId> participantSet) {
+ // compute default upper bounds
+ Map<State, String> upperBounds = Maps.newHashMap();
+ for (State state : stateModelDef.getStatesPriorityList()) {
+ upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
+ }
+
+ // determine the current mapping
+ Map<PartitionId, Map<ParticipantId, State>> currentMapping = getPreferenceMaps();
+
+ // determine the preference maps
+ LinkedHashMap<State, Integer> stateCounts =
+ NewConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
+ getReplicaCount());
+ ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+ List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
+ List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
+ AutoRebalanceStrategy strategy =
+ new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
+ getMaxPartitionsPerParticipant(), placementScheme);
+ Map<String, Map<String, String>> rawPreferenceMaps =
+ strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
+ .getMapFields();
+ Map<PartitionId, Map<ParticipantId, State>> preferenceMaps =
+ Maps.newHashMap(ResourceAssignment.replicaMapsFromStringMaps(rawPreferenceMaps));
+ setPreferenceMaps(preferenceMaps);
+ }
+
+ /**
* Build a CustomRebalancerContext. By default, it corresponds to {@link CustomRebalancer}
*/
public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aa1bd852/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancer.java
index 189df64..521af5c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancer.java
@@ -26,7 +26,6 @@ import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
import com.google.common.base.Function;
-import com.google.common.base.Functions;
import com.google.common.collect.Lists;
/*
@@ -60,15 +59,14 @@ public class FullAutoRebalancer implements Rebalancer {
}
@Override
- public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster,
- ResourceCurrentState currentState) {
+ public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+ Cluster cluster, ResourceCurrentState currentState) {
FullAutoRebalancerContext config =
rebalancerConfig.getRebalancerContext(FullAutoRebalancerContext.class);
StateModelDefinition stateModelDef =
cluster.getStateModelMap().get(config.getStateModelDefId());
// Compute a preference list based on the current ideal state
List<PartitionId> partitions = new ArrayList<PartitionId>(config.getPartitionSet());
- List<String> partitionNames = Lists.transform(partitions, Functions.toStringFunction());
Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
int replicas = -1;
@@ -79,31 +77,29 @@ public class FullAutoRebalancer implements Rebalancer {
}
// count how many replicas should be in each state
+ Map<State, String> upperBounds =
+ NewConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
+ cluster.getConfig());
LinkedHashMap<State, Integer> stateCountMap =
- NewConstraintBasedAssignment.stateCount(cluster.getConfig(), config.getResourceId(),
- stateModelDef, liveParticipants.size(), replicas);
- LinkedHashMap<String, Integer> rawStateCountMap = new LinkedHashMap<String, Integer>();
- for (State state : stateCountMap.keySet()) {
- rawStateCountMap.put(state.toString(), stateCountMap.get(state));
- }
+ NewConstraintBasedAssignment.stateCount(upperBounds, stateModelDef,
+ liveParticipants.size(), replicas);
// get the participant lists
List<ParticipantId> liveParticipantList =
new ArrayList<ParticipantId>(liveParticipants.keySet());
List<ParticipantId> allParticipantList =
new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
- List<String> liveNodes = Lists.transform(liveParticipantList, Functions.toStringFunction());
// compute the current mapping from the current state
Map<PartitionId, Map<ParticipantId, State>> currentMapping =
currentMapping(config, currentState, stateCountMap);
// If there are nodes tagged with resource, use only those nodes
- Set<String> taggedNodes = new HashSet<String>();
+ Set<ParticipantId> taggedNodes = new HashSet<ParticipantId>();
if (config.getParticipantGroupTag() != null) {
for (ParticipantId participantId : liveParticipantList) {
if (liveParticipants.get(participantId).hasTag(config.getParticipantGroupTag())) {
- taggedNodes.add(participantId.stringify());
+ taggedNodes.add(participantId);
}
}
}
@@ -112,26 +108,25 @@ public class FullAutoRebalancer implements Rebalancer {
LOG.info("found the following instances with tag " + config.getResourceId() + " "
+ taggedNodes);
}
- liveNodes = new ArrayList<String>(taggedNodes);
+ liveParticipantList = new ArrayList<ParticipantId>(taggedNodes);
}
// determine which nodes the replicas should live on
- List<String> allNodes = Lists.transform(allParticipantList, Functions.toStringFunction());
int maxPartition = config.getMaxPartitionsPerParticipant();
if (LOG.isInfoEnabled()) {
LOG.info("currentMapping: " + currentMapping);
LOG.info("stateCountMap: " + stateCountMap);
- LOG.info("liveNodes: " + liveNodes);
- LOG.info("allNodes: " + allNodes);
+ LOG.info("liveNodes: " + liveParticipantList);
+ LOG.info("allNodes: " + allParticipantList);
LOG.info("maxPartition: " + maxPartition);
}
ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
_algorithm =
- new AutoRebalanceStrategy(config.getResourceId().stringify(), partitionNames,
- rawStateCountMap, maxPartition, placementScheme);
+ new AutoRebalanceStrategy(config.getResourceId(), partitions, stateCountMap, maxPartition,
+ placementScheme);
ZNRecord newMapping =
- _algorithm.computePartitionAssignment(liveNodes,
- ResourceAssignment.stringMapsFromReplicaMaps(currentMapping), allNodes);
+ _algorithm.typedComputePartitionAssignment(liveParticipantList, currentMapping,
+ allParticipantList);
if (LOG.isInfoEnabled()) {
LOG.info("newMapping: " + newMapping);
@@ -159,8 +154,8 @@ public class FullAutoRebalancer implements Rebalancer {
preferenceList =
NewConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
Map<ParticipantId, State> bestStateForPartition =
- NewConstraintBasedAssignment.computeAutoBestStateForPartition(cluster.getConfig(),
- config.getResourceId(), liveParticipants, stateModelDef, preferenceList,
+ NewConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
+ liveParticipants.keySet(), stateModelDef, preferenceList,
currentState.getCurrentStateMap(config.getResourceId(), partition),
disabledParticipantsForPartition);
partitionMapping.addReplicaMap(partition, bestStateForPartition);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aa1bd852/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
index d202e82..decc78d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
@@ -7,10 +7,12 @@ import java.util.Set;
import org.apache.helix.HelixConstants.StateModelToken;
import org.apache.helix.api.Partition;
+import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
@@ -151,6 +153,17 @@ public class PartitionedRebalancerContext extends BasicRebalancerContext impleme
}
/**
+ * Generate a default configuration given the state model and a participant.
+ * @param stateModelDef the state model definition to follow
+ * @param participantSet the set of participant ids to configure for
+ */
+ @JsonIgnore
+ public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+ Set<ParticipantId> participantSet) {
+ // the base context does not understand enough to know do to anything
+ }
+
+ /**
* Convert a physically-stored IdealState into a rebalancer context for a partitioned resource
* @param idealState populated IdealState
* @return PartitionedRebalancerContext
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aa1bd852/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
index c64941c..924b8a1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
@@ -165,4 +165,13 @@ public final class RebalancerConfig {
public static RebalancerConfig from(ResourceConfiguration resourceConfiguration) {
return new RebalancerConfig(resourceConfiguration);
}
+
+ /**
+ * Get a RebalancerConfig from a RebalancerContext
+ * @param context instantiated RebalancerContext
+ * @return RebalancerConfig
+ */
+ public static RebalancerConfig from(RebalancerContext context) {
+ return new RebalancerConfig(context);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aa1bd852/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
index c112fcf..3f0dd13 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
@@ -47,8 +47,8 @@ public class SemiAutoRebalancer implements Rebalancer {
}
@Override
- public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster,
- ResourceCurrentState currentState) {
+ public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+ Cluster cluster, ResourceCurrentState currentState) {
SemiAutoRebalancerContext config =
rebalancerConfig.getRebalancerContext(SemiAutoRebalancerContext.class);
StateModelDefinition stateModelDef =
@@ -66,10 +66,13 @@ public class SemiAutoRebalancer implements Rebalancer {
List<ParticipantId> preferenceList =
NewConstraintBasedAssignment.getPreferenceList(cluster, partition,
config.getPreferenceList(partition));
+ Map<State, String> upperBounds =
+ NewConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
+ cluster.getConfig());
Map<ParticipantId, State> bestStateForPartition =
- NewConstraintBasedAssignment.computeAutoBestStateForPartition(cluster.getConfig(),
- config.getResourceId(), cluster.getLiveParticipantMap(), stateModelDef,
- preferenceList, currentStateMap, disabledInstancesForPartition);
+ NewConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
+ .getLiveParticipantMap().keySet(), stateModelDef, preferenceList, currentStateMap,
+ disabledInstancesForPartition);
partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aa1bd852/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
index d6d163c..71b5076 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
@@ -1,12 +1,23 @@
package org.apache.helix.controller.rebalancer.context;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import org.apache.helix.api.State;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonProperty;
@@ -75,6 +86,54 @@ public final class SemiAutoRebalancerContext extends PartitionedRebalancerContex
}
/**
+ * Generate preference lists based on a default cluster setup
+ * @param stateModelDef the state model definition to follow
+ * @param participantSet the set of participant ids to configure for
+ */
+ @Override
+ @JsonIgnore
+ public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+ Set<ParticipantId> participantSet) {
+ // compute default upper bounds
+ Map<State, String> upperBounds = Maps.newHashMap();
+ for (State state : stateModelDef.getStatesPriorityList()) {
+ upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
+ }
+
+ // determine the current mapping
+ Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
+ for (PartitionId partitionId : getPartitionSet()) {
+ List<ParticipantId> preferenceList = getPreferenceList(partitionId);
+ if (preferenceList != null && !preferenceList.isEmpty()) {
+ Set<ParticipantId> disabledParticipants = Collections.emptySet();
+ Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
+ Map<ParticipantId, State> initialMap =
+ NewConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
+ participantSet, stateModelDef, preferenceList, emptyCurrentState,
+ disabledParticipants);
+ currentMapping.put(partitionId, initialMap);
+ }
+ }
+
+ // determine the preference
+ LinkedHashMap<State, Integer> stateCounts =
+ NewConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
+ getReplicaCount());
+ ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+ List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
+ List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
+ AutoRebalanceStrategy strategy =
+ new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
+ getMaxPartitionsPerParticipant(), placementScheme);
+ Map<String, List<String>> rawPreferenceLists =
+ strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
+ .getListFields();
+ Map<PartitionId, List<ParticipantId>> preferenceLists =
+ Maps.newHashMap(IdealState.preferenceListsFromStringLists(rawPreferenceLists));
+ setPreferenceLists(preferenceLists);
+ }
+
+ /**
* Build a SemiAutoRebalancerContext. By default, it corresponds to {@link SemiAutoRebalancer}
*/
public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aa1bd852/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
index 7bc2769..d5531b1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
@@ -42,6 +42,7 @@ import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
import com.google.common.base.Predicate;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
@@ -90,8 +91,28 @@ public class NewConstraintBasedAssignment {
}
/**
+ * Get a map of state to upper bound constraint given a cluster
+ * @param stateModelDef the state model definition to check
+ * @param resourceId the resource that is constraint
+ * @param cluster the cluster the resource belongs to
+ * @return map of state to upper bound
+ */
+ public static Map<State, String> stateConstraints(StateModelDefinition stateModelDef,
+ ResourceId resourceId, ClusterConfig cluster) {
+ Map<State, String> stateMap = Maps.newHashMap();
+ for (State state : stateModelDef.getStatesPriorityList()) {
+ String num =
+ cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
+ stateModelDef.getStateModelDefId(), state);
+ stateMap.put(state, num);
+ }
+ return stateMap;
+ }
+
+ /**
* compute best state for resource in SEMI_AUTO and FULL_AUTO modes
- * @param liveParticipantMap map of id to live participants
+ * @param upperBounds map of state to upper bound
+ * @param liveParticipantSet set of live participant ids
* @param stateModelDef
* @param participantPreferenceList
* @param currentStateMap
@@ -99,8 +120,8 @@ public class NewConstraintBasedAssignment {
* @param disabledParticipantsForPartition
* @return
*/
- public static Map<ParticipantId, State> computeAutoBestStateForPartition(ClusterConfig cluster,
- ResourceId resourceId, Map<ParticipantId, Participant> liveParticipantMap,
+ public static Map<ParticipantId, State> computeAutoBestStateForPartition(
+ Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
@@ -131,12 +152,10 @@ public class NewConstraintBasedAssignment {
boolean assigned[] = new boolean[participantPreferenceList.size()];
for (State state : statesPriorityList) {
- String num =
- cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
- stateModelDef.getStateModelDefId(), state);
+ String num = upperBounds.get(state);
int stateCount = -1;
if ("N".equals(num)) {
- Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantMap.keySet());
+ Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantSet);
liveAndEnabled.removeAll(disabledParticipantsForPartition);
stateCount = liveAndEnabled.size();
} else if ("R".equals(num)) {
@@ -159,7 +178,7 @@ public class NewConstraintBasedAssignment {
|| !currentStateMap.get(participantId)
.equals(State.from(HelixDefinedState.ERROR));
- if (liveParticipantMap.containsKey(participantId) && !assigned[i] && notInErrorState
+ if (liveParticipantSet.contains(participantId) && !assigned[i] && notInErrorState
&& !disabledParticipantsForPartition.contains(participantId)) {
participantStateMap.put(participantId, state);
count = count + 1;
@@ -176,23 +195,20 @@ public class NewConstraintBasedAssignment {
/**
* Get the number of replicas that should be in each state for a partition
- * @param cluster cluster configuration
- * @param resourceId the resource for which to get the state count
+ * @param upperBounds map of state to upper bound
* @param stateModelDef StateModelDefinition object
* @param liveNodesNb number of live nodes
* @param total number of replicas
* @return state count map: state->count
*/
- public static LinkedHashMap<State, Integer> stateCount(ClusterConfig cluster,
- ResourceId resourceId, StateModelDefinition stateModelDef, int liveNodesNb, int totalReplicas) {
+ public static LinkedHashMap<State, Integer> stateCount(Map<State, String> upperBounds,
+ StateModelDefinition stateModelDef, int liveNodesNb, int totalReplicas) {
LinkedHashMap<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
List<State> statesPriorityList = stateModelDef.getStatesPriorityList();
int replicas = totalReplicas;
for (State state : statesPriorityList) {
- String num =
- cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
- stateModelDef.getStateModelDefId(), state);
+ String num = upperBounds.get(state);
if ("N".equals(num)) {
stateCountMap.put(state, liveNodesNb);
} else if ("R".equals(num)) {
@@ -216,9 +232,7 @@ public class NewConstraintBasedAssignment {
// get state count for R
for (State state : statesPriorityList) {
- String num =
- cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
- stateModelDef.getStateModelDefId(), state);
+ String num = upperBounds.get(state);
if ("R".equals(num)) {
stateCountMap.put(state, replicas);
// should have at most one state using R
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aa1bd852/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
index 11d7969..8b56bec 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.helix.HelixManager;
import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
@@ -94,10 +95,12 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
Set<ParticipantId> disabledParticipantsForPartition =
NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
partitionId);
+ Map<State, String> upperBounds =
+ NewConstraintBasedAssignment.stateConstraints(stateModelDef, resourceId,
+ cluster.getConfig());
partitionMapping.addReplicaMap(partitionId, NewConstraintBasedAssignment
- .computeAutoBestStateForPartition(cluster.getConfig(), resourceId,
- cluster.getLiveParticipantMap(), stateModelDef, null,
- currentStateOutput.getCurrentStateMap(resourceId, partitionId),
+ .computeAutoBestStateForPartition(upperBounds, cluster.getLiveParticipantMap().keySet(),
+ stateModelDef, null, currentStateOutput.getCurrentStateMap(resourceId, partitionId),
disabledParticipantsForPartition));
}
return partitionMapping;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aa1bd852/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 85741ed..d3f89ef 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -34,8 +34,16 @@ import java.util.TreeSet;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.model.ResourceAssignment;
import org.apache.log4j.Logger;
+import com.google.common.base.Functions;
+import com.google.common.collect.Lists;
+
public class AutoRebalanceStrategy {
private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);
@@ -79,7 +87,7 @@ public class AutoRebalanceStrategy {
}
/**
- * Initialize the strategy with a default placement scheme and no
+ * Initialize the strategy with a default placement scheme
* @see #AutoRebalanceStrategy(String, List, LinkedHashMap, int, ReplicaPlacementScheme)
*/
public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
@@ -88,6 +96,52 @@ public class AutoRebalanceStrategy {
}
/**
+ * Constructor to support logically-typed Helix components
+ * @param resourceId the resource for which to compute an assignment
+ * @param partitions the partitions of the resource
+ * @param states the states and counts for each state
+ * @param maximumPerNode the maximum number of replicas per node
+ * @param placementScheme the scheme to use for preferred replica locations. If null, this is
+ * {@link DefaultPlacementScheme}
+ */
+ public AutoRebalanceStrategy(ResourceId resourceId, final List<PartitionId> partitions,
+ final LinkedHashMap<State, Integer> states, int maximumPerNode,
+ ReplicaPlacementScheme placementScheme) {
+ LinkedHashMap<String, Integer> rawStateCountMap = new LinkedHashMap<String, Integer>();
+ for (State state : states.keySet()) {
+ rawStateCountMap.put(state.toString(), states.get(state));
+ }
+ List<String> partitionNames = Lists.transform(partitions, Functions.toStringFunction());
+ _resourceName = resourceId.stringify();
+ _partitions = partitionNames;
+ _states = rawStateCountMap;
+ _maximumPerNode = maximumPerNode;
+ if (placementScheme != null) {
+ _placementScheme = placementScheme;
+ } else {
+ _placementScheme = new DefaultPlacementScheme();
+ }
+ }
+
+ /**
+ * Wrap {@link #computePartitionAssignment(List, Map, List)} with a function that takes concrete
+ * types
+ * @param liveNodes list of live participant ids
+ * @param currentMapping map of partition id to map of participant id to state
+ * @param allNodes list of all participant ids
+ * @return the preference list and replica mapping
+ */
+ public ZNRecord typedComputePartitionAssignment(final List<ParticipantId> liveNodes,
+ final Map<PartitionId, Map<ParticipantId, State>> currentMapping,
+ final List<ParticipantId> allNodes) {
+ final List<String> rawLiveNodes = Lists.transform(liveNodes, Functions.toStringFunction());
+ final List<String> rawAllNodes = Lists.transform(allNodes, Functions.toStringFunction());
+ final Map<String, Map<String, String>> rawCurrentMapping =
+ ResourceAssignment.stringMapsFromReplicaMaps(currentMapping);
+ return computePartitionAssignment(rawLiveNodes, rawCurrentMapping, rawAllNodes);
+ }
+
+ /**
* Determine a preference list and mapping of partitions to nodes for all replicas
* @param liveNodes the current list of live participants
* @param currentMapping the current assignment of replicas to nodes
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aa1bd852/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
index 1f44e69..4c7fc66 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
@@ -25,6 +25,7 @@ import org.apache.commons.cli.ParseException;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
import org.apache.helix.api.Resource;
import org.apache.helix.api.RunningInstance;
@@ -52,6 +53,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.IdealState;
@@ -60,6 +62,7 @@ import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.util.HelixUtil;
import org.apache.log4j.Logger;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -288,6 +291,18 @@ public class NewClusterSetup {
}
+ void rebalance(String[] optValues, String[] groupTagValues) {
+ String clusterName = optValues[0];
+ String resourceName = optValues[1];
+ int replicaCount = Integer.parseInt(optValues[2]);
+ String groupTag = null;
+ if (groupTagValues != null && groupTagValues.length > 0) {
+ groupTag = groupTagValues[0];
+ }
+ ResourceAccessor accessor = resourceAccessor(clusterName);
+ accessor.generateDefaultAssignment(ResourceId.from(resourceName), replicaCount, groupTag);
+ }
+
void addInstance(String[] optValues) {
String clusterName = optValues[0];
String[] instanceIds = optValues[1].split(";");
@@ -651,6 +666,22 @@ public class NewClusterSetup {
accessor.updateCluster(delta);
}
+ void getConstraints(String[] optValues) {
+ String clusterName = optValues[0];
+ ConstraintType constraintType = ConstraintType.valueOf(optValues[1]);
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ ClusterConstraints constraints = accessor.readConstraints(constraintType);
+ System.out.println(constraints.toString());
+ }
+
+ void removeConstraint(String[] optValues) {
+ String clusterName = optValues[0];
+ ConstraintType constraintType = ConstraintType.valueOf(optValues[1]);
+ ConstraintId constraintId = ConstraintId.from(optValues[2]);
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ accessor.removeConstraint(constraintType, constraintId);
+ }
+
void listClusterInfo(String[] optValues) {
String clusterName = optValues[0];
ClusterAccessor accessor = clusterAccessor(clusterName);
@@ -823,6 +854,78 @@ public class NewClusterSetup {
userConfig.setMapField(partitionId.stringify(), fields);
}
+ void swapParticipants(String[] optValues) {
+ String clusterName = optValues[0];
+ String oldParticipantName = optValues[1];
+ String newParticipantName = optValues[2];
+ ParticipantAccessor accessor = participantAccessor(clusterName);
+ accessor.swapParticipants(ParticipantId.from(oldParticipantName),
+ ParticipantId.from(newParticipantName));
+ }
+
+ void resetPartition(String[] optValues) {
+ String clusterName = optValues[0];
+ String participantName = optValues[1];
+ String resourceName = optValues[2];
+ String partitionName = optValues[3];
+
+ Set<PartitionId> partitionIds = ImmutableSet.of(PartitionId.from(partitionName));
+ ParticipantAccessor accessor = participantAccessor(clusterName);
+ accessor.resetPartitionsForParticipant(ParticipantId.from(participantName),
+ ResourceId.from(resourceName), partitionIds);
+ }
+
+ void resetResource(String[] optValues) {
+ String clusterName = optValues[0];
+ String resourceName = optValues[1];
+ Set<ResourceId> resourceIds = ImmutableSet.of(ResourceId.from(resourceName));
+ ResourceAccessor accessor = resourceAccessor(clusterName);
+ accessor.resetResources(resourceIds);
+ }
+
+ void resetParticipant(String[] optValues) {
+ String clusterName = optValues[0];
+ String participantName = optValues[1];
+ Set<ParticipantId> participantIds = ImmutableSet.of(ParticipantId.from(participantName));
+ ParticipantAccessor accessor = participantAccessor(clusterName);
+ accessor.resetParticipants(participantIds);
+ }
+
+ void expandResource(String[] optValues) {
+ String clusterName = optValues[0];
+ String resourceName = optValues[1];
+ expandResource(ClusterId.from(clusterName), ResourceId.from(resourceName));
+ }
+
+ void expandCluster(String[] optValues) {
+ String clusterName = optValues[0];
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ Cluster cluster = accessor.readCluster();
+ for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
+ expandResource(ClusterId.from(clusterName), resourceId);
+ }
+ }
+
+ private void expandResource(ClusterId clusterId, ResourceId resourceId) {
+ ResourceAccessor accessor = resourceAccessor(clusterId.stringify());
+ Resource resource = accessor.readResource(resourceId);
+ SemiAutoRebalancerContext context =
+ resource.getRebalancerConfig().getRebalancerContext(SemiAutoRebalancerContext.class);
+ if (context == null) {
+ LOG.info("Only SEMI_AUTO mode supported for resource expansion");
+ return;
+ }
+ if (context.anyLiveParticipant()) {
+ LOG.info("Resource uses ANY_LIVE_PARTICIPANT, skipping default assignment");
+ return;
+ }
+ if (context.getPreferenceLists().size() == 0) {
+ LOG.info("No preference lists have been set yet, skipping default assignment");
+ return;
+ }
+ accessor.generateDefaultAssignment(resourceId, -1, null);
+ }
+
static int processCommandLineArgs(String[] cliArgs) {
CommandLineParser cliParser = new GnuParser();
Options cliOptions = constructCommandLineOptions();
@@ -909,19 +1012,24 @@ public class NewClusterSetup {
setup.addIdealState(optValues);
break;
case swapInstance:
- // TODO impl ClusterAccessor#swapParticipantsInCluster()
+ setup.swapParticipants(optValues);
break;
case dropInstance:
setup.dropInstance(optValues);
break;
case rebalance:
- // TODO impl this using ResourceAccessor
+ String[] groupTagValues = null;
+ if (cmd.hasOption(HelixOption.instanceGroupTag.name())) {
+ groupTagValues = cmd.getOptionValues(HelixOption.instanceGroupTag.name());
+ checkArgNum(HelixOption.instanceGroupTag, groupTagValues);
+ }
+ setup.rebalance(optValues, groupTagValues);
break;
case expandCluster:
- // TODO impl this
+ setup.expandCluster(optValues);
break;
case expandResource:
- // TODO impl this
+ setup.expandResource(optValues);
break;
case mode:
case rebalancerMode:
@@ -930,9 +1038,11 @@ public class NewClusterSetup {
// always used with addResource command
continue;
case instanceGroupTag:
- case resourceKeyPrefix:
// always used with rebalance command
continue;
+ case resourceKeyPrefix:
+ throw new UnsupportedOperationException(HelixOption.resourceKeyPrefix
+ + " is not supported, please set partition names directly");
case addResourceProperty:
throw new UnsupportedOperationException(HelixOption.addResourceProperty
+ " is not supported, please use setConfig");
@@ -973,13 +1083,13 @@ public class NewClusterSetup {
setup.enableCluster(optValues);
break;
case resetPartition:
- // TODO impl ResoourceAccessor#resetPartitions()
+ setup.resetPartition(optValues);
break;
case resetInstance:
- // TODO impl ParticipantAccessor#resetInstance()
+ setup.resetParticipant(optValues);
break;
case resetResource:
- // TODO impl ResourceAccessor#resetResource()
+ setup.resetResource(optValues);
break;
case addStat:
// TODO impl ClusterAccessor.addStat()
@@ -1003,11 +1113,13 @@ public class NewClusterSetup {
setup.removeConfig(optValues);
break;
case getConstraints:
+ setup.getConstraints(optValues);
break;
case setConstraint:
setup.setConstraint(optValues);
break;
case removeConstraint:
+ setup.removeConstraint(optValues);
break;
default:
System.err.println("Non-recognized option: " + opt);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aa1bd852/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index 652ac73..8de39ab 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -35,31 +35,20 @@ import java.util.TreeSet;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.ZNRecord;
-import org.apache.helix.api.HelixVersion;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.RunningInstance;
-import org.apache.helix.api.Scope;
import org.apache.helix.api.State;
import org.apache.helix.api.config.ClusterConfig;
-import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.MessageId;
import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ProcId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.SessionId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
import org.testng.annotations.Test;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
public class TestAutoRebalanceStrategy {
private static Logger logger = Logger.getLogger(TestAutoRebalanceStrategy.class);
@@ -229,30 +218,22 @@ public class TestAutoRebalanceStrategy {
ClusterConfig cluster =
new ClusterConfig.Builder(ClusterId.from("cluster")).addStateModelDefinition(
_stateModelDef).build();
- Map<ParticipantId, Participant> liveParticipantMap = Maps.newHashMap();
+ Set<ParticipantId> liveParticipantSet = Sets.newHashSet();
for (String node : _liveNodes) {
- Set<String> tags = Collections.emptySet();
- Map<MessageId, Message> messageMap = Collections.emptyMap();
- Set<PartitionId> disabledPartitionIdSet = Collections.emptySet();
- Map<ResourceId, CurrentState> currentStateMap = Maps.newHashMap();
- RunningInstance runningInstance =
- new RunningInstance(SessionId.from("testSession"), HelixVersion.from("1.2.3.4"),
- ProcId.from("1234"));
- Participant participant =
- new Participant(ParticipantId.from(node), node, 0, true, disabledPartitionIdSet,
- tags, runningInstance, currentStateMap, messageMap, new UserConfig(
- Scope.participant(ParticipantId.from(node))));
- liveParticipantMap.put(participant.getId(), participant);
+ liveParticipantSet.add(ParticipantId.from(node));
}
List<ParticipantId> preferenceList =
IdealState.preferenceListFromStringList(listResult.get(partition));
Set<ParticipantId> disabledParticipantsForPartition = Collections.emptySet();
Map<ParticipantId, State> currentStateMap =
IdealState.participantStateMapFromStringMap(rawCurStateMap);
+ Map<State, String> upperBounds =
+ NewConstraintBasedAssignment.stateConstraints(_stateModelDef,
+ ResourceId.from(RESOURCE_NAME), cluster);
Map<ParticipantId, State> assignment =
- NewConstraintBasedAssignment.computeAutoBestStateForPartition(cluster,
- ResourceId.from(RESOURCE_NAME), liveParticipantMap, _stateModelDef, preferenceList,
- currentStateMap, disabledParticipantsForPartition);
+ NewConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
+ liveParticipantSet, _stateModelDef, preferenceList, currentStateMap,
+ disabledParticipantsForPartition);
mapResult.put(partition, IdealState.stringMapFromParticipantStateMap(assignment));
}
return mapResult;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aa1bd852/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
index e07e7d7..e4943f8 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
@@ -259,10 +259,13 @@ public class TestNewAutoRebalanceStrategy {
// compute the mapping
Map<ParticipantId, State> replicaMap =
ResourceAssignment.replicaMapFromStringMap(_currentMapping.get(partition));
+ Map<State, String> upperBounds =
+ NewConstraintBasedAssignment.stateConstraints(_stateModelDef,
+ ResourceId.from(RESOURCE_NAME), clusterConfig);
Map<ParticipantId, State> assignment =
- NewConstraintBasedAssignment.computeAutoBestStateForPartition(clusterConfig,
- ResourceId.from(RESOURCE_NAME), liveParticipantMap, _stateModelDef,
- participantPreferenceList, replicaMap, disabledParticipantsForPartition);
+ NewConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
+ liveParticipantMap.keySet(), _stateModelDef, participantPreferenceList, replicaMap,
+ disabledParticipantsForPartition);
mapResult.put(partitionId, assignment);
}