You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/07 07:40:40 UTC
[3/3] git commit: [HELIX-109] adding config classes
[HELIX-109] adding config classes
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/5972a44e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/5972a44e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/5972a44e
Branch: refs/heads/helix-logical-model
Commit: 5972a44e726dcb0d9ea3671eac5caf45fc72c0fc
Parents: c07569d
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Fri Sep 6 22:39:57 2013 -0700
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Fri Sep 6 22:39:57 2013 -0700
----------------------------------------------------------------------
.../main/java/org/apache/helix/api/Cluster.java | 54 ++++--
.../org/apache/helix/api/ClusterAccessor.java | 63 +++++-
.../org/apache/helix/api/ClusterConfig.java | 180 +++++++++++++++++
.../java/org/apache/helix/api/Participant.java | 157 ++-------------
.../org/apache/helix/api/ParticipantConfig.java | 194 +++++++++++++++++++
.../org/apache/helix/api/RebalancerConfig.java | 64 +++++-
.../org/apache/helix/api/RebalancerRef.java | 5 +-
.../java/org/apache/helix/api/Resource.java | 172 +++++-----------
.../org/apache/helix/api/ResourceConfig.java | 173 +++++++++++++++++
.../controller/GenericHelixController.java | 14 +-
.../rebalancer/NewAutoRebalancer.java | 42 ++--
.../rebalancer/NewCustomRebalancer.java | 13 +-
.../rebalancer/NewSemiAutoRebalancer.java | 13 +-
.../util/NewConstraintBasedAssignment.java | 5 +-
.../stages/NewBestPossibleStateCalcStage.java | 7 +-
.../stages/NewBestPossibleStateOutput.java | 9 +
.../stages/NewCurrentStateComputationStage.java | 12 +-
.../stages/NewExternalViewComputeStage.java | 12 +-
.../stages/NewMessageGenerationStage.java | 22 ++-
.../stages/NewMessageSelectionStage.java | 14 +-
.../stages/NewMessageThrottleStage.java | 12 +-
.../stages/NewResourceComputationStage.java | 16 +-
.../stages/NewTaskAssignmentStage.java | 18 +-
.../apache/helix/model/ClusterConstraints.java | 8 +
.../java/org/apache/helix/model/IdealState.java | 18 +-
.../apache/helix/model/ResourceAssignment.java | 8 +
.../helix/tools/ClusterStateVerifier.java | 84 +++++---
.../org/apache/helix/api/TestNewStages.java | 12 +-
.../helix/controller/stages/BaseStageTest.java | 67 ++++---
.../TestBestPossibleCalcStageCompatibility.java | 73 ++++---
.../stages/TestBestPossibleStateCalcStage.java | 35 ++--
.../stages/TestCompatibilityCheckStage.java | 13 +-
.../TestCurrentStateComputationStage.java | 52 ++---
.../stages/TestMessageThrottleStage.java | 35 ++--
.../stages/TestMsgSelectionStage.java | 96 ++++++---
.../stages/TestRebalancePipeline.java | 76 ++++----
.../stages/TestResourceComputationStage.java | 79 ++++----
.../helix/integration/TestAutoRebalance.java | 6 +-
.../TestCustomizedIdealStateRebalancer.java | 59 +++---
39 files changed, 1303 insertions(+), 689 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index e890fb4..ce2318a 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -25,13 +25,14 @@ import java.util.Map;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
/**
* Represent a logical helix cluster
*/
public class Cluster {
- private final ClusterId _id;
/**
* map of resource-id to resource
@@ -60,9 +61,7 @@ public class Cluster {
private final ControllerId _leaderId;
- private final ClusterConfig _config = null;
-
- private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+ private final ClusterConfig _config;
/**
* construct a cluster
@@ -74,9 +73,26 @@ public class Cluster {
*/
public Cluster(ClusterId id, Map<ResourceId, Resource> resourceMap,
Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
- ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap) {
-
- _id = id;
+ ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap, boolean isPaused) {
+
+ // build the config
+ // Guava's transform and "copy" functions really return views so the maps all reflect each other
+ Map<ResourceId, ResourceConfig> resourceConfigMap =
+ Maps.transformValues(resourceMap, new Function<Resource, ResourceConfig>() {
+ @Override
+ public ResourceConfig apply(Resource resource) {
+ return resource.getConfig();
+ }
+ });
+ Map<ParticipantId, ParticipantConfig> participantConfigMap =
+ Maps.transformValues(participantMap, new Function<Participant, ParticipantConfig>() {
+ @Override
+ public ParticipantConfig apply(Participant participant) {
+ return participant.getConfig();
+ }
+ });
+ _config =
+ new ClusterConfig(id, resourceConfigMap, participantConfigMap, constraintMap, isPaused);
_resourceMap = ImmutableMap.copyOf(resourceMap);
@@ -94,8 +110,6 @@ public class Cluster {
_leaderId = leaderId;
- _constraintMap = ImmutableMap.copyOf(constraintMap);
-
// TODO impl this when we persist controllers and spectators on zookeeper
_controllerMap = ImmutableMap.copyOf(controllerMap);
_spectatorMap = Collections.emptyMap();
@@ -106,7 +120,7 @@ public class Cluster {
* @return cluster id
*/
public ClusterId getId() {
- return _id;
+ return _config.getId();
}
/**
@@ -167,17 +181,27 @@ public class Cluster {
}
/**
- * @return
+ * Get all the constraints on the cluster
+ * @return map of constraint type to constraints
*/
public Map<ConstraintType, ClusterConstraints> getConstraintMap() {
- return _constraintMap;
+ return _config.getConstraintMap();
}
/**
- * @param type
- * @return
+ * Get a cluster constraint
+ * @param type the type of constrant to query
+ * @return cluster constraints, or null if none
*/
public ClusterConstraints getConstraint(ConstraintType type) {
- return _constraintMap.get(type);
+ return _config.getConstraintMap().get(type);
+ }
+
+ /**
+ * Check the pasued status of the cluster
+ * @return true if paused, false otherwise
+ */
+ public boolean isPaused() {
+ return _config.isPaused();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
index 04d5831..a9fcf79 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
@@ -24,12 +24,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
@@ -76,7 +78,7 @@ public class ClusterAccessor {
*/
public void dropCluster() {
LOG.info("Dropping cluster: " + _clusterId);
- List<String> liveInstanceNames =_accessor.getChildNames(_keyBuilder.liveInstances());
+ List<String> liveInstanceNames = _accessor.getChildNames(_keyBuilder.liveInstances());
if (liveInstanceNames.size() > 0) {
throw new HelixException("Can't drop cluster: " + _clusterId
+ " because there are running participant: " + liveInstanceNames
@@ -97,6 +99,7 @@ public class ClusterAccessor {
* @return cluster
*/
public Cluster readCluster() {
+ // TODO many of these should live in resource, participant, etc accessors
/**
* map of instance-id to instance-config
*/
@@ -147,13 +150,20 @@ public class ClusterAccessor {
Map<String, ClusterConstraints> constraintMap =
_accessor.getChildValuesMap(_keyBuilder.constraints());
+ /**
+ * Map of resource id to external view
+ */
+ Map<String, ExternalView> externalViewMap =
+ _accessor.getChildValuesMap(_keyBuilder.externalViews());
+
Map<ResourceId, Resource> resourceMap = new HashMap<ResourceId, Resource>();
for (String resourceName : idealStateMap.keySet()) {
IdealState idealState = idealStateMap.get(resourceName);
-
// TODO pass resource assignment
ResourceId resourceId = Id.resource(resourceName);
- resourceMap.put(resourceId, new Resource(resourceId, idealState, null));
+ resourceMap.put(resourceId,
+ new Resource(resourceId, idealState, null, externalViewMap.get(resourceName),
+ liveInstanceMap.size()));
}
Map<ParticipantId, Participant> participantMap = new HashMap<ParticipantId, Participant>();
@@ -182,8 +192,11 @@ public class ClusterAccessor {
constraintMap.get(constraintType));
}
+ PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
+ boolean isPaused = pauseSignal != null;
+
return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
- clusterConstraintMap);
+ clusterConstraintMap, isPaused);
}
/**
@@ -204,7 +217,7 @@ public class ClusterAccessor {
* add a resource to cluster
* @param resource
*/
- public void addResourceToCluster(Resource resource) {
+ public void addResourceToCluster(ResourceConfig resource) {
StateModelDefId stateModelDefId = resource.getRebalancerConfig().getStateModelDefId();
if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
throw new HelixException("State model: " + stateModelDefId + " not found in cluster: "
@@ -217,8 +230,42 @@ public class ClusterAccessor {
+ ", because resource ideal state already exists in cluster: " + _clusterId);
}
- // TODO convert rebalancerConfig to idealState
- _accessor.createProperty(_keyBuilder.idealState(resourceId.stringify()), null);
+ // Create an IdealState from a RebalancerConfig
+ RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+ IdealState idealState = new IdealState(resourceId);
+ idealState.setRebalanceMode(rebalancerConfig.getRebalancerMode());
+ idealState.setMaxPartitionsPerInstance(rebalancerConfig.getMaxPartitionsPerParticipant());
+ if (rebalancerConfig.canAssignAnyLiveParticipant()) {
+ idealState.setReplicas(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString());
+ } else {
+ idealState.setReplicas(Integer.toString(rebalancerConfig.getReplicaCount()));
+ }
+ idealState.setStateModelDefId(rebalancerConfig.getStateModelDefId());
+ for (PartitionId partitionId : resource.getPartitionSet()) {
+ List<ParticipantId> preferenceList = rebalancerConfig.getPreferenceList(partitionId);
+ Map<ParticipantId, State> preferenceMap = rebalancerConfig.getPreferenceMap(partitionId);
+ if (preferenceList != null) {
+ idealState.setPreferenceList(partitionId, preferenceList);
+ }
+ if (preferenceMap != null) {
+ idealState.setParticipantStateMap(partitionId, preferenceMap);
+ }
+ }
+ idealState.setBucketSize(rebalancerConfig.getBucketSize());
+ idealState.setBatchMessageMode(rebalancerConfig.getBatchMessageMode());
+ String groupTag = rebalancerConfig.getParticipantGroupTag();
+ if (groupTag != null) {
+ idealState.setInstanceGroupTag(groupTag);
+ }
+ RebalancerRef rebalancerRef = rebalancerConfig.getRebalancerRef();
+ if (rebalancerRef != null) {
+ idealState.setRebalancerRef(rebalancerRef);
+ }
+ StateModelFactoryId stateModelFactoryId = rebalancerConfig.getStateModelFactoryId();
+ if (stateModelFactoryId != null) {
+ idealState.setStateModelFactoryId(stateModelFactoryId);
+ }
+ _accessor.createProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
}
/**
@@ -244,7 +291,7 @@ public class ClusterAccessor {
* add a participant to cluster
* @param participant
*/
- public void addParticipantToCluster(Participant participant) {
+ public void addParticipantToCluster(ParticipantConfig participant) {
if (!isClusterStructureValid()) {
throw new HelixException("Cluster: " + _clusterId + " structure is not valid");
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
index 8a2f629..1585aae 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
@@ -1,5 +1,14 @@
package org.apache.helix.api;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+
+import com.google.common.collect.ImmutableMap;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,6 +28,177 @@ package org.apache.helix.api;
* under the License.
*/
+/**
+ * Configuration properties of a cluster
+ */
public class ClusterConfig {
+ private final ClusterId _id;
+ private final Map<ResourceId, ResourceConfig> _resourceMap;
+ private final Map<ParticipantId, ParticipantConfig> _participantMap;
+ private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+ private final boolean _isPaused;
+
+ /**
+ * Initialize a cluster configuration. Also see ClusterConfig.Builder
+ * @param id
+ * @param resourceMap
+ * @param participantMap
+ * @param constraintMap
+ */
+ public ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
+ Map<ParticipantId, ParticipantConfig> participantMap,
+ Map<ConstraintType, ClusterConstraints> constraintMap, boolean isPaused) {
+ _id = id;
+ _resourceMap = ImmutableMap.copyOf(resourceMap);
+ _participantMap = ImmutableMap.copyOf(participantMap);
+ _constraintMap = ImmutableMap.copyOf(constraintMap);
+ _isPaused = isPaused;
+ }
+
+ /**
+ * Get cluster id
+ * @return cluster id
+ */
+ public ClusterId getId() {
+ return _id;
+ }
+
+ /**
+ * Get resources in the cluster
+ * @return a map of resource id to resource, or empty map if none
+ */
+ public Map<ResourceId, ResourceConfig> getResourceMap() {
+ return _resourceMap;
+ }
+
+ /**
+ * Get all the constraints on the cluster
+ * @return map of constraint type to constraints
+ */
+ public Map<ConstraintType, ClusterConstraints> getConstraintMap() {
+ return _constraintMap;
+ }
+
+ /**
+ * Get participants of the cluster
+ * @return a map of participant id to participant, or empty map if none
+ */
+ public Map<ParticipantId, ParticipantConfig> getParticipantMap() {
+ return _participantMap;
+ }
+
+ /**
+ * Check the pasued status of the cluster
+ * @return true if paused, false otherwise
+ */
+ public boolean isPaused() {
+ return _isPaused;
+ }
+
+ /**
+ * Assembles a cluster configuration
+ */
+ public static class Builder {
+ private final ClusterId _id;
+ private final Map<ResourceId, ResourceConfig> _resourceMap;
+ private final Map<ParticipantId, ParticipantConfig> _participantMap;
+ private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+ private boolean _isPaused;
+
+ /**
+ * Initialize builder for a cluster
+ * @param id cluster id
+ */
+ public Builder(ClusterId id) {
+ _id = id;
+ _resourceMap = new HashMap<ResourceId, ResourceConfig>();
+ _participantMap = new HashMap<ParticipantId, ParticipantConfig>();
+ _constraintMap = new HashMap<ConstraintType, ClusterConstraints>();
+ _isPaused = false;
+ }
+
+ /**
+ * Add a resource to the cluster
+ * @param resource resource configuration
+ * @return Builder
+ */
+ public Builder addResource(ResourceConfig resource) {
+ _resourceMap.put(resource.getId(), resource);
+ return this;
+ }
+
+ /**
+ * Add multiple resources to the cluster
+ * @param resources resource configurations
+ * @return Builder
+ */
+ public Builder addResources(Collection<ResourceConfig> resources) {
+ for (ResourceConfig resource : resources) {
+ addResource(resource);
+ }
+ return this;
+ }
+
+ /**
+ * Add a participant to the cluster
+ * @param participant participant configuration
+ * @return Builder
+ */
+ public Builder addParticipant(ParticipantConfig participant) {
+ _participantMap.put(participant.getId(), participant);
+ return this;
+ }
+
+ /**
+ * Add multiple participants to the cluster
+ * @param participants participant configurations
+ * @return Builder
+ */
+ public Builder addParticipants(Collection<ParticipantConfig> participants) {
+ for (ParticipantConfig participant : participants) {
+ addParticipant(participant);
+ }
+ return this;
+ }
+
+ /**
+ * Add a constraint to the cluster
+ * @param constraint cluster constraint of a specific type
+ * @return Builder
+ */
+ public Builder addConstraint(ClusterConstraints constraint) {
+ _constraintMap.put(constraint.getType(), constraint);
+ return this;
+ }
+
+ /**
+ * Add multiple constraints to the cluster
+ * @param constraints cluster constraints of multiple distinct types
+ * @return Builder
+ */
+ public Builder addConstraints(Collection<ClusterConstraints> constraints) {
+ for (ClusterConstraints constraint : constraints) {
+ addConstraint(constraint);
+ }
+ return this;
+ }
+
+ /**
+ * Set the paused status of the cluster
+ * @param isPaused true if paused, false otherwise
+ * @return Builder
+ */
+ public Builder setPausedStatus(boolean isPaused) {
+ _isPaused = isPaused;
+ return this;
+ }
+ /**
+ * Create the cluster configuration
+ * @return ClusterConfig
+ */
+ public ClusterConfig build() {
+ return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _isPaused);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/Participant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Participant.java b/helix-core/src/main/java/org/apache/helix/api/Participant.java
index 10ceebd..0c0cd12 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Participant.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Participant.java
@@ -19,8 +19,6 @@ package org.apache.helix.api;
* under the License.
*/
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -28,22 +26,12 @@ import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
/**
* A cluster participant
*/
public class Participant {
- private final ParticipantId _id;
- private final String _hostName;
- private final int _port;
- private final boolean _isEnabled;
-
- /**
- * set of disabled partition id's
- */
- private final Set<PartitionId> _disabledPartitionIdSet;
- private final Set<String> _tags;
+ private final ParticipantConfig _config;
private final RunningInstance _runningInstance;
@@ -64,12 +52,7 @@ public class Participant {
public Participant(ParticipantId id, String hostName, int port, boolean isEnabled,
Set<PartitionId> disabledPartitionIdSet, Set<String> tags, RunningInstance runningInstance,
Map<ResourceId, CurrentState> currentStateMap, Map<MessageId, Message> messageMap) {
- _id = id;
- _hostName = hostName;
- _port = port;
- _isEnabled = isEnabled;
- _disabledPartitionIdSet = ImmutableSet.copyOf(disabledPartitionIdSet);
- _tags = ImmutableSet.copyOf(tags);
+ _config = new ParticipantConfig(id, hostName, port, isEnabled, disabledPartitionIdSet, tags);
_runningInstance = runningInstance;
_currentStateMap = ImmutableMap.copyOf(currentStateMap);
_messageMap = ImmutableMap.copyOf(messageMap);
@@ -80,7 +63,7 @@ public class Participant {
* @return host name, or null if not applicable
*/
public String getHostName() {
- return _hostName;
+ return _config.getHostName();
}
/**
@@ -88,7 +71,7 @@ public class Participant {
* @return port number, or -1 if not applicable
*/
public int getPort() {
- return _port;
+ return _config.getPort();
}
/**
@@ -96,7 +79,7 @@ public class Participant {
* @return true if enabled or false otherwise
*/
public boolean isEnabled() {
- return _isEnabled;
+ return _config.isEnabled();
}
/**
@@ -120,7 +103,7 @@ public class Participant {
* @return set of disabled partition id's, or empty set if none
*/
public Set<PartitionId> getDisablePartitionIds() {
- return _disabledPartitionIdSet;
+ return _config.getDisablePartitionIds();
}
/**
@@ -128,7 +111,7 @@ public class Participant {
* @return set of tags
*/
public Set<String> getTags() {
- return _tags;
+ return _config.getTags();
}
/**
@@ -137,7 +120,7 @@ public class Participant {
* @return true if tagged, false otherwise
*/
public boolean hasTag(String tag) {
- return _tags.contains(tag);
+ return _config.hasTag(tag);
}
/**
@@ -156,125 +139,19 @@ public class Participant {
return _currentStateMap;
}
+ /**
+ * Get the participant id
+ * @return ParticipantId
+ */
public ParticipantId getId() {
- return _id;
+ return _config.getId();
}
/**
- * Assemble a participant
+ * Get the participant configuration
+ * @return ParticipantConfig that backs this participant
*/
- public static class Builder {
- private final ParticipantId _id;
- private final Set<PartitionId> _disabledPartitions;
- private final Set<String> _tags;
- private final Map<ResourceId, CurrentState> _currentStateMap;
- private final Map<MessageId, Message> _messageMap;
- private String _hostName;
- private int _port;
- private boolean _isEnabled;
- private RunningInstance _runningInstance;
-
- /**
- * Build a participant with a given id
- * @param id participant id
- */
- public Builder(ParticipantId id) {
- _id = id;
- _disabledPartitions = new HashSet<PartitionId>();
- _tags = new HashSet<String>();
- _currentStateMap = new HashMap<ResourceId, CurrentState>();
- _messageMap = new HashMap<MessageId, Message>();
- _isEnabled = true;
- }
-
- /**
- * Set the participant host name
- * @param hostName reachable host when live
- * @return Builder
- */
- public Builder hostName(String hostName) {
- _hostName = hostName;
- return this;
- }
-
- /**
- * Set the participant port
- * @param port port number
- * @return Builder
- */
- public Builder port(int port) {
- _port = port;
- return this;
- }
-
- /**
- * Set whether or not the participant is enabled
- * @param isEnabled true if enabled, false otherwise
- * @return Builder
- */
- public Builder enabled(boolean isEnabled) {
- _isEnabled = isEnabled;
- return this;
- }
-
- /**
- * Add a partition to disable for this participant
- * @param partitionId the partition to disable
- * @return Builder
- */
- public Builder addDisabledPartition(PartitionId partitionId) {
- _disabledPartitions.add(partitionId);
- return this;
- }
-
- /**
- * Add an arbitrary tag for this participant
- * @param tag the tag to add
- * @return Builder
- */
- public Builder addTag(String tag) {
- _tags.add(tag);
- return this;
- }
-
- /**
- * Add live properties to participants that are running
- * @param runningInstance live participant properties
- * @return Builder
- */
- public Builder runningInstance(RunningInstance runningInstance) {
- _runningInstance = runningInstance;
- return this;
- }
-
- /**
- * Add a resource current state for this participant
- * @param resourceId the resource the current state corresponds to
- * @param currentState the current state
- * @return Builder
- */
- public Builder addCurrentState(ResourceId resourceId, CurrentState currentState) {
- _currentStateMap.put(resourceId, currentState);
- return this;
- }
-
- /**
- * Add a message for the participant
- * @param message message to add
- * @return Builder
- */
- public Builder addMessage(Message message) {
- _messageMap.put(new MessageId(message.getId()), message);
- return this;
- }
-
- /**
- * Assemble the participant
- * @return instantiated Participant
- */
- public Participant build() {
- return new Participant(_id, _hostName, _port, _isEnabled, _disabledPartitions, _tags,
- _runningInstance, _currentStateMap, _messageMap);
- }
+ public ParticipantConfig getConfig() {
+ return _config;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java b/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
new file mode 100644
index 0000000..3c77a40
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
@@ -0,0 +1,194 @@
+package org.apache.helix.api;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Configuration properties of a Helix participant
+ */
+public class ParticipantConfig {
+ private final ParticipantId _id;
+ private final String _hostName;
+ private final int _port;
+ private final boolean _isEnabled;
+ private final Set<PartitionId> _disabledPartitions;
+ private final Set<String> _tags;
+
+ /**
+ * Initialize a participant configuration. Also see ParticipantConfig.Builder
+ * @param id participant id
+ * @param hostName host where participant can be reached
+ * @param port port to use to contact participant
+ * @param isEnabled true if enabled, false if disabled
+ * @param disabledPartitions set of partitions, if any to disable on this participant
+ * @param tags tags to set for the participant
+ */
+ public ParticipantConfig(ParticipantId id, String hostName, int port, boolean isEnabled,
+ Set<PartitionId> disabledPartitions, Set<String> tags) {
+ _id = id;
+ _hostName = hostName;
+ _port = port;
+ _isEnabled = isEnabled;
+ _disabledPartitions = ImmutableSet.copyOf(disabledPartitions);
+ _tags = ImmutableSet.copyOf(tags);
+ }
+
+ /**
+ * Get the host name of the participant
+ * @return host name, or null if not applicable
+ */
+ public String getHostName() {
+ return _hostName;
+ }
+
+ /**
+ * Get the port of the participant
+ * @return port number, or -1 if not applicable
+ */
+ public int getPort() {
+ return _port;
+ }
+
+ /**
+ * Get if the participant is enabled
+ * @return true if enabled or false otherwise
+ */
+ public boolean isEnabled() {
+ return _isEnabled;
+ }
+
+ /**
+ * Get disabled partition id's
+ * @return set of disabled partition id's, or empty set if none
+ */
+ public Set<PartitionId> getDisablePartitionIds() {
+ return _disabledPartitions;
+ }
+
+ /**
+ * Get tags
+ * @return set of tags
+ */
+ public Set<String> getTags() {
+ return _tags;
+ }
+
+ /**
+ * Check if participant has a tag
+ * @param tag tag to check
+ * @return true if tagged, false otherwise
+ */
+ public boolean hasTag(String tag) {
+ return _tags.contains(tag);
+ }
+
+ /**
+ * Get the participant id
+ * @return ParticipantId
+ */
+ public ParticipantId getId() {
+ return _id;
+ }
+
+ /**
+ * Assemble a participant
+ */
+ public static class Builder {
+ private final ParticipantId _id;
+ private String _hostName;
+ private int _port;
+ private boolean _isEnabled;
+ private final Set<PartitionId> _disabledPartitions;
+ private final Set<String> _tags;
+
+ /**
+ * Build a participant with a given id
+ * @param id participant id
+ */
+ public Builder(ParticipantId id) {
+ _id = id;
+ _disabledPartitions = new HashSet<PartitionId>();
+ _tags = new HashSet<String>();
+ _isEnabled = true;
+ }
+
+ /**
+ * Set the participant host name
+ * @param hostName reachable host when live
+ * @return Builder
+ */
+ public Builder hostName(String hostName) {
+ _hostName = hostName;
+ return this;
+ }
+
+ /**
+ * Set the participant port
+ * @param port port number
+ * @return Builder
+ */
+ public Builder port(int port) {
+ _port = port;
+ return this;
+ }
+
+ /**
+ * Set whether or not the participant is enabled
+ * @param isEnabled true if enabled, false otherwise
+ * @return Builder
+ */
+ public Builder enabled(boolean isEnabled) {
+ _isEnabled = isEnabled;
+ return this;
+ }
+
+ /**
+ * Add a partition to disable for this participant
+ * @param partitionId the partition to disable
+ * @return Builder
+ */
+ public Builder addDisabledPartition(PartitionId partitionId) {
+ _disabledPartitions.add(partitionId);
+ return this;
+ }
+
+ /**
+ * Add an arbitrary tag for this participant
+ * @param tag the tag to add
+ * @return Builder
+ */
+ public Builder addTag(String tag) {
+ _tags.add(tag);
+ return this;
+ }
+
+ /**
+ * Assemble the participant
+ * @return instantiated Participant
+ */
+ public ParticipantConfig build() {
+ return new ParticipantConfig(_id, _hostName, _port, _isEnabled, _disabledPartitions, _tags);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
index 4ac254d..257354d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
@@ -19,9 +19,11 @@ package org.apache.helix.api;
* under the License.
*/
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.helix.HelixConstants;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.ResourceAssignment;
@@ -40,6 +42,7 @@ public class RebalancerConfig {
private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
private final ResourceAssignment _resourceAssignment;
private final int _replicaCount;
+ private final boolean _anyLiveParticipant;
private final String _participantGroupTag;
private final int _maxPartitionsPerParticipant;
private final int _bucketSize;
@@ -51,11 +54,19 @@ public class RebalancerConfig {
* @param idealState the physical ideal state
* @param resourceAssignment last mapping of a resource
*/
- public RebalancerConfig(IdealState idealState, ResourceAssignment resourceAssignment) {
+ public RebalancerConfig(IdealState idealState, ResourceAssignment resourceAssignment,
+ int liveParticipantCount) {
_rebalancerMode = idealState.getRebalanceMode();
_rebalancerRef = idealState.getRebalancerRef();
_stateModelDefId = idealState.getStateModelDefId();
- _replicaCount = Integer.parseInt(idealState.getReplicas());
+ String replicaCount = idealState.getReplicas();
+ if (replicaCount.equals(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString())) {
+ _replicaCount = liveParticipantCount;
+ _anyLiveParticipant = true;
+ } else {
+ _replicaCount = Integer.parseInt(idealState.getReplicas());
+ _anyLiveParticipant = false;
+ }
_participantGroupTag = idealState.getInstanceGroupTag();
_maxPartitionsPerParticipant = idealState.getMaxPartitionsPerInstance();
_bucketSize = idealState.getBucketSize();
@@ -68,10 +79,14 @@ public class RebalancerConfig {
ImmutableMap.Builder<PartitionId, Map<ParticipantId, State>> preferenceMaps =
new ImmutableMap.Builder<PartitionId, Map<ParticipantId, State>>();
for (PartitionId partitionId : idealState.getPartitionSet()) {
- preferenceLists.put(partitionId,
- ImmutableList.copyOf(idealState.getPreferenceList(partitionId)));
- preferenceMaps.put(partitionId,
- ImmutableMap.copyOf(idealState.getParticipantStateMap(partitionId)));
+ List<ParticipantId> preferenceList = idealState.getPreferenceList(partitionId);
+ if (preferenceList != null) {
+ preferenceLists.put(partitionId, ImmutableList.copyOf(preferenceList));
+ }
+ Map<ParticipantId, State> preferenceMap = idealState.getParticipantStateMap(partitionId);
+ if (preferenceMap != null) {
+ preferenceMaps.put(partitionId, ImmutableMap.copyOf(preferenceMap));
+ }
}
_preferenceLists = preferenceLists.build();
_preferenceMaps = preferenceMaps.build();
@@ -118,7 +133,10 @@ public class RebalancerConfig {
* @return the ordered preference list (early entries are more preferred)
*/
public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
- return _preferenceLists.get(partitionId);
+ if (_preferenceLists.containsKey(partitionId)) {
+ return _preferenceLists.get(partitionId);
+ }
+ return Collections.emptyList();
}
/**
@@ -127,7 +145,10 @@ public class RebalancerConfig {
* @return a mapping of participant to state for each replica
*/
public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
- return _preferenceMaps.get(partitionId);
+ if (_preferenceMaps.containsKey(partitionId)) {
+ return _preferenceMaps.get(partitionId);
+ }
+ return Collections.emptyMap();
}
/**
@@ -179,10 +200,19 @@ public class RebalancerConfig {
}
/**
+ * Check if replicas can be assigned to any live participant
+ * @return true if they can, false if they cannot
+ */
+ public boolean canAssignAnyLiveParticipant() {
+ return _anyLiveParticipant;
+ }
+
+ /**
* Assembles a RebalancerConfig
*/
public static class Builder {
private final IdealState _idealState;
+ private boolean _anyLiveParticipant;
private ResourceAssignment _resourceAssignment;
/**
@@ -191,6 +221,7 @@ public class RebalancerConfig {
*/
public Builder(ResourceId resourceId) {
_idealState = new IdealState(resourceId);
+ _anyLiveParticipant = false;
}
/**
@@ -283,11 +314,26 @@ public class RebalancerConfig {
}
/**
+ * Set whether any live participant should be used in rebalancing
+ * @param useAnyParticipant true if any live participant can be used, false otherwise
+ * @return
+ */
+ public Builder anyLiveParticipant(boolean useAnyParticipant) {
+ _anyLiveParticipant = true;
+ return this;
+ }
+
+ /**
* Assemble a RebalancerConfig
* @return a fully defined rebalancer configuration
*/
public RebalancerConfig build() {
- return new RebalancerConfig(_idealState, _resourceAssignment);
+ if (_anyLiveParticipant) {
+ return new RebalancerConfig(_idealState, _resourceAssignment, Integer.parseInt(_idealState
+ .getReplicas()));
+ } else {
+ return new RebalancerConfig(_idealState, _resourceAssignment, -1);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
index 5f22898..5c7ce56 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
@@ -62,9 +62,12 @@ public class RebalancerRef {
/**
* Get a rebalancer class reference
* @param rebalancerClassName name of the class
- * @return RebalancerRef
+ * @return RebalancerRef or null if name is null
*/
public static RebalancerRef from(String rebalancerClassName) {
+ if (rebalancerClassName == null) {
+ return null;
+ }
return new RebalancerRef(rebalancerClassName);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 0c8b730..8445617 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -20,7 +20,6 @@ package org.apache.helix.api;
*/
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -29,41 +28,33 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.ResourceAssignment;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
/**
* Represent a resource entity in helix cluster
*/
public class Resource {
- private final ResourceId _id;
- private final RebalancerConfig _rebalancerConfig;
- private final SchedulerTaskConfig _schedulerTaskConfig;
-
- private final Map<PartitionId, Partition> _partitionMap;
-
+ private final ResourceConfig _config;
private final ExternalView _externalView;
/**
* Construct a resource
- * @param idealState
+ * @param id resource id
+ * @param idealState ideal state of the resource
* @param currentStateMap map of participant-id to current state
+ * @param liveParticipantCount number of live participants in the system
*/
- public Resource(ResourceId id, IdealState idealState, ResourceAssignment resourceAssignment) {
- _id = id;
- _rebalancerConfig = new RebalancerConfig(idealState, resourceAssignment);
-
+ public Resource(ResourceId id, IdealState idealState, ResourceAssignment resourceAssignment,
+ ExternalView externalView, int liveParticipantCount) {
Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
- Map<PartitionId, Map<String, String>> schedulerTaskConfig =
+ Map<PartitionId, Map<String, String>> schedulerTaskConfigMap =
new HashMap<PartitionId, Map<String, String>>();
Map<String, Integer> transitionTimeoutMap = new HashMap<String, Integer>();
for (PartitionId partitionId : idealState.getPartitionSet()) {
partitionMap.put(partitionId, new Partition(partitionId));
// TODO refactor it
- Map<String, String> taskConfigMap = idealState.getRecord().getMapField(partitionId.stringify());
+ Map<String, String> taskConfigMap = idealState.getInstanceStateMap(partitionId.stringify());
if (taskConfigMap != null) {
- schedulerTaskConfig.put(partitionId, taskConfigMap);
+ schedulerTaskConfigMap.put(partitionId, taskConfigMap);
}
// TODO refactor it
@@ -78,53 +69,38 @@ public class Resource {
}
}
}
- _partitionMap = ImmutableMap.copyOf(partitionMap);
- _schedulerTaskConfig = new SchedulerTaskConfig(transitionTimeoutMap, schedulerTaskConfig);
+ SchedulerTaskConfig schedulerTaskConfig =
+ new SchedulerTaskConfig(transitionTimeoutMap, schedulerTaskConfigMap);
+ RebalancerConfig rebalancerConfig =
+ new RebalancerConfig(idealState, resourceAssignment, liveParticipantCount);
- _externalView = null;
- }
-
- /**
- * Construct a Resource
- * @param id resource identifier
- * @param partitionSet disjoint partitions of the resource
- * @param externalView external view of the resource
- * @param pendingExternalView pending external view based on unprocessed messages
- * @param rebalancerConfig configuration properties for rebalancing this resource
- */
- public Resource(ResourceId id, Map<PartitionId, Partition> partitionMap,
- ExternalView externalView,
- RebalancerConfig rebalancerConfig, SchedulerTaskConfig schedulerTaskConfig) {
- _id = id;
- _partitionMap = ImmutableMap.copyOf(partitionMap);
+ _config = new ResourceConfig(id, partitionMap, schedulerTaskConfig, rebalancerConfig);
_externalView = externalView;
- _rebalancerConfig = rebalancerConfig;
- _schedulerTaskConfig = schedulerTaskConfig;
}
/**
- * Get the set of partitions of the resource
- * @return set of partitions or empty set if none
+ * Get the partitions of the resource
+ * @return map of partition id to partition or empty map if none
*/
public Map<PartitionId, Partition> getPartitionMap() {
- return _partitionMap;
+ return _config.getPartitionMap();
}
/**
- * @param partitionId
- * @return
+ * Get a partition that the resource contains
+ * @param partitionId the partition id to look up
+ * @return Partition or null if none is present with the given id
*/
public Partition getPartition(PartitionId partitionId) {
- return _partitionMap.get(partitionId);
+ return _config.getPartition(partitionId);
}
/**
- * @return
+ * Get the set of partition ids that the resource contains
+ * @return partition id set, or empty if none
*/
- public Set<Partition> getPartitionSet() {
- Set<Partition> partitionSet = new HashSet<Partition>();
- partitionSet.addAll(_partitionMap.values());
- return ImmutableSet.copyOf(partitionSet);
+ public Set<PartitionId> getPartitionSet() {
+ return _config.getPartitionSet();
}
/**
@@ -135,95 +111,35 @@ public class Resource {
return _externalView;
}
+ /**
+ * Get the resource properties configuring rebalancing
+ * @return RebalancerConfig properties
+ */
public RebalancerConfig getRebalancerConfig() {
- return _rebalancerConfig;
+ return _config.getRebalancerConfig();
}
+ /**
+ * Get the resource id
+ * @return ResourceId
+ */
public ResourceId getId() {
- return _id;
+ return _config.getId();
}
+ /**
+ * Get the properties configuring scheduler tasks
+ * @return SchedulerTaskConfig properties
+ */
public SchedulerTaskConfig getSchedulerTaskConfig() {
- return _schedulerTaskConfig;
+ return _config.getSchedulerTaskConfig();
}
/**
- * Assembles a Resource
+ * Get the configuration of this resource
+ * @return ResourceConfig that backs this Resource
*/
- public static class Builder {
- private final ResourceId _id;
- private final Map<PartitionId, Partition> _partitionMap;
- private ExternalView _externalView;
- private RebalancerConfig _rebalancerConfig;
- private SchedulerTaskConfig _schedulerTaskConfig;
-
- /**
- * Build a Resource with an id
- * @param id resource id
- */
- public Builder(ResourceId id) {
- _id = id;
- _partitionMap = new HashMap<PartitionId, Partition>();
- }
-
- /**
- * Add a partition that the resource serves
- * @param partition fully-qualified partition
- * @return Builder
- */
- public Builder addPartition(Partition partition) {
- _partitionMap.put(partition.getId(), partition);
- return this;
- }
-
- /**
- * Add a set of partitions
- * @param partitions
- * @return Builder
- */
- public Builder addPartitions(Set<Partition> partitions) {
- for (Partition partition : partitions) {
- addPartition(partition);
- }
- return this;
- }
-
- /**
- * Set the external view of this resource
- * @param extView currently served replica placement and state
- * @return Builder
- */
- public Builder externalView(ExternalView extView) {
- _externalView = extView;
- return this;
- }
-
- /**
- * Set the rebalancer configuration
- * @param rebalancerConfig properties of interest for rebalancing
- * @return Builder
- */
- public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
- _rebalancerConfig = rebalancerConfig;
- return this;
- }
-
- /**
- * @param schedulerTaskConfig
- * @return
- */
- public Builder schedulerTaskConfig(SchedulerTaskConfig schedulerTaskConfig) {
- _schedulerTaskConfig = schedulerTaskConfig;
- return this;
- }
-
- /**
- * Create a Resource object
- * @return instantiated Resource
- */
- public Resource build() {
- return new Resource(_id, _partitionMap, _externalView, _rebalancerConfig,
- _schedulerTaskConfig);
- }
+ public ResourceConfig getConfig() {
+ return _config;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
new file mode 100644
index 0000000..5170c40
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
@@ -0,0 +1,173 @@
+package org.apache.helix.api;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Full configuration of a Helix resource. Typically used to add or modify resources on a cluster
+ */
+public class ResourceConfig {
+ private final ResourceId _id;
+ private final Map<PartitionId, Partition> _partitionMap;
+ private final RebalancerConfig _rebalancerConfig;
+ private final SchedulerTaskConfig _schedulerTaskConfig;
+
+ /**
+ * Instantiate a configuration. Consider using ResourceConfig.Builder
+ * @param id resource id
+ * @param partitionMap map of partition identifiers to partition objects
+ * @param schedulerTaskConfig configuration for scheduler tasks associated with the resource
+ * @param rebalancerConfig configuration for rebalancing the resource
+ */
+ public ResourceConfig(ResourceId id, Map<PartitionId, Partition> partitionMap,
+ SchedulerTaskConfig schedulerTaskConfig, RebalancerConfig rebalancerConfig) {
+ _id = id;
+ _partitionMap = ImmutableMap.copyOf(partitionMap);
+ _schedulerTaskConfig = schedulerTaskConfig;
+ _rebalancerConfig = rebalancerConfig;
+ }
+
+ /**
+ * Get the partitions of the resource
+ * @return map of partition id to partition or empty map if none
+ */
+ public Map<PartitionId, Partition> getPartitionMap() {
+ return _partitionMap;
+ }
+
+ /**
+ * Get a partition that the resource contains
+ * @param partitionId the partition id to look up
+ * @return Partition or null if none is present with the given id
+ */
+ public Partition getPartition(PartitionId partitionId) {
+ return _partitionMap.get(partitionId);
+ }
+
+ /**
+ * Get the set of partition ids that the resource contains
+ * @return partition id set, or empty if none
+ */
+ public Set<PartitionId> getPartitionSet() {
+ Set<PartitionId> partitionSet = new HashSet<PartitionId>();
+ partitionSet.addAll(_partitionMap.keySet());
+ return ImmutableSet.copyOf(partitionSet);
+ }
+
+ /**
+ * Get the resource properties configuring rebalancing
+ * @return RebalancerConfig properties
+ */
+ public RebalancerConfig getRebalancerConfig() {
+ return _rebalancerConfig;
+ }
+
+ /**
+ * Get the resource id
+ * @return ResourceId
+ */
+ public ResourceId getId() {
+ return _id;
+ }
+
+ /**
+ * Get the properties configuring scheduler tasks
+ * @return SchedulerTaskConfig properties
+ */
+ public SchedulerTaskConfig getSchedulerTaskConfig() {
+ return _schedulerTaskConfig;
+ }
+
+ /**
+ * Assembles a ResourceConfig
+ */
+ public static class Builder {
+ private final ResourceId _id;
+ private final Map<PartitionId, Partition> _partitionMap;
+ private RebalancerConfig _rebalancerConfig;
+ private SchedulerTaskConfig _schedulerTaskConfig;
+
+ /**
+ * Build a Resource with an id
+ * @param id resource id
+ */
+ public Builder(ResourceId id) {
+ _id = id;
+ _partitionMap = new HashMap<PartitionId, Partition>();
+ }
+
+ /**
+ * Add a partition that the resource serves
+ * @param partition fully-qualified partition
+ * @return Builder
+ */
+ public Builder addPartition(Partition partition) {
+ _partitionMap.put(partition.getId(), partition);
+ return this;
+ }
+
+ /**
+ * Add a collection of partitions
+ * @param partitions
+ * @return Builder
+ */
+ public Builder addPartitions(Collection<Partition> partitions) {
+ for (Partition partition : partitions) {
+ addPartition(partition);
+ }
+ return this;
+ }
+
+ /**
+ * Set the rebalancer configuration
+ * @param rebalancerConfig properties of interest for rebalancing
+ * @return Builder
+ */
+ public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
+ _rebalancerConfig = rebalancerConfig;
+ return this;
+ }
+
+ /**
+ * @param schedulerTaskConfig
+ * @return
+ */
+ public Builder schedulerTaskConfig(SchedulerTaskConfig schedulerTaskConfig) {
+ _schedulerTaskConfig = schedulerTaskConfig;
+ return this;
+ }
+
+ /**
+ * Create a Resource object
+ * @return instantiated Resource
+ */
+ public ResourceConfig build() {
+ return new ResourceConfig(_id, _partitionMap, _schedulerTaskConfig, _rebalancerConfig);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 6570dc4..4df2201 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -44,14 +44,7 @@ import org.apache.helix.NotificationContext.Type;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.PipelineRegistry;
-import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.CompatibilityCheckStage;
-import org.apache.helix.controller.stages.CurrentStateComputationStage;
-import org.apache.helix.controller.stages.ExternalViewComputeStage;
-import org.apache.helix.controller.stages.MessageGenerationPhase;
-import org.apache.helix.controller.stages.MessageSelectionStage;
-import org.apache.helix.controller.stages.MessageThrottleStage;
import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
import org.apache.helix.controller.stages.NewCompatibilityCheckStage;
import org.apache.helix.controller.stages.NewCurrentStateComputationStage;
@@ -62,10 +55,6 @@ import org.apache.helix.controller.stages.NewMessageThrottleStage;
import org.apache.helix.controller.stages.NewReadClusterDataStage;
import org.apache.helix.controller.stages.NewResourceComputationStage;
import org.apache.helix.controller.stages.NewTaskAssignmentStage;
-import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.controller.stages.RebalanceIdealStateStage;
-import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.controller.stages.TaskAssignmentStage;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HealthStat;
@@ -205,8 +194,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
registry.register("idealStateChange", dataRefresh, rebalancePipeline);
registry.register("currentStateChange", dataRefresh, rebalancePipeline, externalViewPipeline);
registry.register("configChange", dataRefresh, rebalancePipeline);
- registry.register("liveInstanceChange", dataRefresh, rebalancePipeline,
- externalViewPipeline);
+ registry.register("liveInstanceChange", dataRefresh, rebalancePipeline, externalViewPipeline);
registry.register("messageChange", dataRefresh, rebalancePipeline);
registry.register("externalView", dataRefresh);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
index 8821082..a28166c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
@@ -29,9 +29,9 @@ import java.util.Set;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Id;
import org.apache.helix.api.Participant;
import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
@@ -46,6 +46,7 @@ import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
+import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.collect.Lists;
@@ -69,12 +70,17 @@ public class NewAutoRebalancer implements NewRebalancer {
public ResourceAssignment computeResourceMapping(Resource resource, Cluster cluster,
StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
// Compute a preference list based on the current ideal state
- List<Partition> partitions = new ArrayList<Partition>(resource.getPartitionSet());
+ List<PartitionId> partitions = new ArrayList<PartitionId>(resource.getPartitionSet());
List<String> partitionNames = Lists.transform(partitions, Functions.toStringFunction());
RebalancerConfig config = resource.getRebalancerConfig();
Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
- int replicas = config.getReplicaCount();
+ int replicas = -1;
+ if (config.canAssignAnyLiveParticipant()) {
+ replicas = liveParticipants.size();
+ } else {
+ replicas = config.getReplicaCount();
+ }
LinkedHashMap<String, Integer> stateCountMap =
ConstraintBasedAssignment.stateCount(stateModelDef, liveParticipants.size(), replicas);
@@ -129,17 +135,25 @@ public class NewAutoRebalancer implements NewRebalancer {
LOG.debug("Processing resource:" + resource.getId());
}
ResourceAssignment partitionMapping = new ResourceAssignment(resource.getId());
- for (Partition partition : partitions) {
+ for (PartitionId partition : partitions) {
Set<ParticipantId> disabledParticipantsForPartition =
- NewConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition.getId());
+ NewConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition);
List<ParticipantId> preferenceList =
- NewConstraintBasedAssignment.getPreferenceList(cluster, partition.getId(), config);
+ Lists.transform(newMapping.getListField(partition.stringify()),
+ new Function<String, ParticipantId>() {
+ @Override
+ public ParticipantId apply(String participantName) {
+ return Id.participant(participantName);
+ }
+ });
+ preferenceList =
+ NewConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
Map<ParticipantId, State> bestStateForPartition =
NewConstraintBasedAssignment.computeAutoBestStateForPartition(liveParticipants,
stateModelDef, preferenceList,
- currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId()),
+ currentStateOutput.getCurrentStateMap(resource.getId(), partition),
disabledParticipantsForPartition);
- partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
+ partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
}
@@ -149,23 +163,23 @@ public class NewAutoRebalancer implements NewRebalancer {
Map<PartitionId, Map<ParticipantId, State>> map =
new HashMap<PartitionId, Map<ParticipantId, State>>();
- for (Partition partition : resource.getPartitionSet()) {
+ for (PartitionId partition : resource.getPartitionSet()) {
Map<ParticipantId, State> curStateMap =
- currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId());
- map.put(partition.getId(), new HashMap<ParticipantId, State>());
+ currentStateOutput.getCurrentStateMap(resource.getId(), partition);
+ map.put(partition, new HashMap<ParticipantId, State>());
for (ParticipantId node : curStateMap.keySet()) {
State state = curStateMap.get(node);
if (stateCountMap.containsKey(state)) {
- map.get(partition.getId()).put(node, state);
+ map.get(partition).put(node, state);
}
}
Map<ParticipantId, State> pendingStateMap =
- currentStateOutput.getPendingStateMap(resource.getId(), partition.getId());
+ currentStateOutput.getPendingStateMap(resource.getId(), partition);
for (ParticipantId node : pendingStateMap.keySet()) {
State state = pendingStateMap.get(node);
if (stateCountMap.containsKey(state)) {
- map.get(partition.getId()).put(node, state);
+ map.get(partition).put(node, state);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
index 8d000f5..0dd346b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
@@ -27,7 +27,7 @@ import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
import org.apache.helix.api.State;
@@ -58,17 +58,16 @@ public class NewCustomRebalancer implements NewRebalancer {
}
ResourceAssignment partitionMapping = new ResourceAssignment(resource.getId());
RebalancerConfig config = resource.getRebalancerConfig();
- for (Partition partition : resource.getPartitionSet()) {
+ for (PartitionId partition : resource.getPartitionSet()) {
Map<ParticipantId, State> currentStateMap =
- currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId());
+ currentStateOutput.getCurrentStateMap(resource.getId(), partition);
Set<ParticipantId> disabledInstancesForPartition =
NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
- partition.getId());
+ partition);
Map<ParticipantId, State> bestStateForPartition =
computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap(), stateModelDef,
- config.getPreferenceMap(partition.getId()), currentStateMap,
- disabledInstancesForPartition);
- partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
+ config.getPreferenceMap(partition), currentStateMap, disabledInstancesForPartition);
+ partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
index 27bb513..0b1611c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
@@ -25,7 +25,7 @@ import java.util.Set;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
import org.apache.helix.api.State;
@@ -56,19 +56,20 @@ public class NewSemiAutoRebalancer implements NewRebalancer {
}
ResourceAssignment partitionMapping = new ResourceAssignment(resource.getId());
RebalancerConfig config = resource.getRebalancerConfig();
- for (Partition partition : resource.getPartitionSet()) {
+ for (PartitionId partition : resource.getPartitionSet()) {
Map<ParticipantId, State> currentStateMap =
- currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId());
+ currentStateOutput.getCurrentStateMap(resource.getId(), partition);
Set<ParticipantId> disabledInstancesForPartition =
NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
- partition.getId());
+ partition);
List<ParticipantId> preferenceList =
- NewConstraintBasedAssignment.getPreferenceList(cluster, partition.getId(), config);
+ NewConstraintBasedAssignment.getPreferenceList(cluster, partition,
+ config.getPreferenceList(partition));
Map<ParticipantId, State> bestStateForPartition =
NewConstraintBasedAssignment.computeAutoBestStateForPartition(
cluster.getLiveParticipantMap(), stateModelDef, preferenceList, currentStateMap,
disabledInstancesForPartition);
- partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
+ partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
index 224853b..07856ca 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
@@ -34,7 +34,6 @@ import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.State;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -76,9 +75,7 @@ public class NewConstraintBasedAssignment {
* @return list with most preferred participants first
*/
public static List<ParticipantId> getPreferenceList(Cluster cluster, PartitionId partitionId,
- RebalancerConfig config) {
- List<ParticipantId> prefList = config.getPreferenceList(partitionId);
-
+ List<ParticipantId> prefList) {
if (prefList != null && prefList.size() == 1
&& StateModelToken.ANY_LIVEINSTANCE.toString().equals(prefList.get(0).stringify())) {
prefList = new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
index bc14297..52a5af2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -23,11 +23,11 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Id;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -58,7 +58,8 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
NewCurrentStateOutput currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.toString());
- Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
Cluster cluster = event.getAttribute("ClusterDataCache");
if (currentStateOutput == null || resourceMap == null || cluster == null) {
@@ -106,7 +107,7 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
// TODO check this
private NewBestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
- Map<ResourceId, Resource> resourceMap, NewCurrentStateOutput currentStateOutput) {
+ Map<ResourceId, ResourceConfig> resourceMap, NewCurrentStateOutput currentStateOutput) {
NewBestPossibleStateOutput output = new NewBestPossibleStateOutput();
Map<StateModelDefId, StateModelDefinition> stateModelDefs =
event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
index d5ee850..8c5005d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
@@ -2,6 +2,7 @@ package org.apache.helix.controller.stages;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import org.apache.helix.api.ResourceId;
import org.apache.helix.model.ResourceAssignment;
@@ -31,4 +32,12 @@ public class NewBestPossibleStateOutput {
public ResourceAssignment getResourceAssignment(ResourceId resourceId) {
return _resourceAssignmentMap.get(resourceId);
}
+
+ /**
+ * Get all of the resources currently assigned
+ * @return set of assigned resource ids
+ */
+ public Set<ResourceId> getAssignedResources() {
+ return _resourceAssignmentMap.keySet();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
index 0f8c33e..e0cd22f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
@@ -28,7 +28,7 @@ import org.apache.helix.api.Participant;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.Partition;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.SessionId;
import org.apache.helix.api.State;
@@ -48,7 +48,8 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
Cluster cluster = event.getAttribute("ClusterDataCache");
- Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
if (cluster == null || resourceMap == null) {
throw new StageException("Missing attributes in event:" + event
@@ -72,7 +73,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
}
ResourceId resourceId = message.getResourceId();
- Resource resource = resourceMap.get(resourceId);
+ ResourceConfig resource = resourceMap.get(resourceId);
if (resource == null) {
continue;
}
@@ -92,8 +93,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
for (PartitionId partitionId : partitionNames) {
Partition partition = resource.getPartition(partitionId);
if (partition != null) {
- currentStateOutput.setPendingState(resourceId, partitionId,
- participantId,
+ currentStateOutput.setPendingState(resourceId, partitionId, participantId,
message.getToState());
} else {
// log
@@ -113,7 +113,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
ResourceId resourceId = curState.getResourceId();
StateModelDefId stateModelDefId = curState.getStateModelDefId();
- Resource resource = resourceMap.get(resourceId);
+ ResourceConfig resource = resourceMap.get(resourceId);
if (resource == null) {
continue;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
index 1a99346..68169ae 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
@@ -29,7 +29,6 @@ import java.util.TreeMap;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
-import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
@@ -40,7 +39,7 @@ import org.apache.helix.api.Id;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
-import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -50,7 +49,6 @@ import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
import org.apache.helix.model.StatusUpdate;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.log4j.Logger;
@@ -64,7 +62,8 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
log.info("START ExternalViewComputeStage.process()");
HelixManager manager = event.getAttribute("helixmanager");
- Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
Cluster cluster = event.getAttribute("ClusterDataCache");
if (manager == null || resourceMap == null || cluster == null) {
@@ -89,7 +88,7 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
// view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
// if resource ideal state has bucket size, set it
// otherwise resource has been dropped, use bucket size from current state instead
- Resource resource = resourceMap.get(resourceId);
+ ResourceConfig resource = resourceMap.get(resourceId);
RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
if (rebalancerConfig.getBucketSize() > 0) {
view.setBucketSize(rebalancerConfig.getBucketSize());
@@ -139,8 +138,7 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
// message, and then remove the partitions from the ideal state
if (rebalancerConfig != null
&& rebalancerConfig.getStateModelDefId().stringify()
- .equalsIgnoreCase(
- DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+ .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
// TODO fix it
// updateScheduledTaskStatus(view, manager, idealState);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
index 0a72dc0..0ae72cf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
@@ -30,10 +30,10 @@ import org.apache.helix.api.Cluster;
import org.apache.helix.api.Id;
import org.apache.helix.api.MessageId;
import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.SessionId;
import org.apache.helix.api.State;
@@ -42,12 +42,10 @@ import org.apache.helix.api.StateModelFactoryId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
-import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -63,7 +61,8 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
Cluster cluster = event.getAttribute("ClusterDataCache");
Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
- Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
NewCurrentStateOutput currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.toString());
NewBestPossibleStateOutput bestPossibleStateOutput =
@@ -77,7 +76,7 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
NewMessageOutput output = new NewMessageOutput();
for (ResourceId resourceId : resourceMap.keySet()) {
- Resource resource = resourceMap.get(resourceId);
+ ResourceConfig resource = resourceMap.get(resourceId);
int bucketSize = resource.getRebalancerConfig().getBucketSize();
StateModelDefinition stateModelDef =
@@ -145,10 +144,13 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
if (rebalancerConfig != null
&& rebalancerConfig.getStateModelDefId().stringify()
.equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
- if (resource.getPartitionSet().size() > 0) {
- // TODO refactor it
- message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
- resource.getSchedulerTaskConfig().getTaskConfig(partitionId));
+ if (resource.getPartitionMap().size() > 0) {
+ // TODO refactor it -- we need a way to read in scheduler tasks a priori
+ Resource activeResource = cluster.getResource(resourceId);
+ if (activeResource != null) {
+ message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
+ activeResource.getSchedulerTaskConfig().getTaskConfig(partitionId));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
index 9745c64..ed06c5f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
@@ -28,10 +28,12 @@ import java.util.TreeMap;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantConfig;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
import org.apache.helix.api.StateModelDefId;
@@ -91,11 +93,11 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
Cluster cluster = event.getAttribute("ClusterDataCache");
Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
- Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
NewCurrentStateOutput currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.toString());
- NewMessageOutput messageGenOutput =
- event.getAttribute(AttributeName.MESSAGES_ALL.toString());
+ NewMessageOutput messageGenOutput = event.getAttribute(AttributeName.MESSAGES_ALL.toString());
if (cluster == null || resourceMap == null || currentStateOutput == null
|| messageGenOutput == null) {
throw new StageException("Missing attributes in event:" + event
@@ -105,7 +107,7 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
NewMessageOutput output = new NewMessageOutput();
for (ResourceId resourceId : resourceMap.keySet()) {
- Resource resource = resourceMap.get(resourceId);
+ ResourceConfig resource = resourceMap.get(resourceId);
StateModelDefinition stateModelDef =
stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
@@ -119,8 +121,8 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
List<Message> messages = messageGenOutput.getMessages(resourceId, partitionId);
List<Message> selectedMessages =
selectMessages(cluster.getLiveParticipantMap(),
- currentStateOutput.getCurrentStateMap(resourceId, partitionId),
- currentStateOutput.getPendingStateMap(resourceId, partitionId), messages,
+ currentStateOutput.getCurrentStateMap(resourceId, partitionId),
+ currentStateOutput.getPendingStateMap(resourceId, partitionId), messages,
stateConstraints, stateTransitionPriorities, stateModelDef.getInitialState());
output.setMessages(resourceId, partitionId, selectedMessages);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
index 5bea5b4..cfbd45c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
@@ -30,17 +30,16 @@ import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.ClusterConstraints;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
-import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ClusterConstraints.ConstraintValue;
+import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.Message;
import org.apache.log4j.Logger;
public class NewMessageThrottleStage extends AbstractBaseStage {
@@ -120,7 +119,8 @@ public class NewMessageThrottleStage extends AbstractBaseStage {
Cluster cluster = event.getAttribute("ClusterDataCache");
NewMessageOutput msgSelectionOutput =
event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
if (cluster == null || resourceMap == null || msgSelectionOutput == null) {
throw new StageException("Missing attributes in event: " + event
@@ -145,7 +145,7 @@ public class NewMessageThrottleStage extends AbstractBaseStage {
// go through all new messages, throttle if necessary
// assume messages should be sorted by state transition priority in messageSelection stage
for (ResourceId resourceId : resourceMap.keySet()) {
- Resource resource = resourceMap.get(resourceId);
+ ResourceConfig resource = resourceMap.get(resourceId);
// TODO fix it
for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
List<Message> messages = msgSelectionOutput.getMessages(resourceId, partitionId);