You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/07 07:40:38 UTC
[1/3] [HELIX-109] adding config classes
Updated Branches:
refs/heads/helix-logical-model c07569d47 -> 5972a44e7
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index fbc8328..9fc8447 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -25,13 +25,16 @@ import java.util.List;
import java.util.Map;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
-import org.apache.helix.api.Id;
-import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.State;
+import org.apache.helix.controller.rebalancer.NewRebalancer;
import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.NewCurrentStateOutput;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
@@ -39,8 +42,6 @@ import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.IdealStateProperty;
import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -51,38 +52,27 @@ import org.testng.annotations.Test;
public class TestCustomizedIdealStateRebalancer extends
ZkStandAloneCMTestBaseWithPropertyServerCheck {
String db2 = TEST_DB + "2";
- static boolean testRebalancerCreated = false;
static boolean testRebalancerInvoked = false;
- public static class TestRebalancer implements Rebalancer {
-
- @Override
- public void init(HelixManager manager) {
- testRebalancerCreated = true;
- }
+ public static class TestRebalancer implements NewRebalancer {
/**
* Very basic mapping that evenly assigns one replica of each partition to live nodes, each of
* which is in the highest-priority state.
*/
@Override
- public ResourceAssignment computeResourceMapping(Resource resource,
- IdealState currentIdealState, CurrentStateOutput currentStateOutput,
- ClusterDataCache clusterData) {
- List<String> liveInstances = new ArrayList<String>(clusterData.getLiveInstances().keySet());
- String stateModelName = currentIdealState.getStateModelDefRef();
- StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
- ResourceAssignment resourceMapping =
- new ResourceAssignment(Id.resource(resource.getResourceName()));
+ public ResourceAssignment computeResourceMapping(Resource resource, Cluster cluster,
+ StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
+ List<ParticipantId> liveParticipants =
+ new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
+ ResourceAssignment resourceMapping = new ResourceAssignment(resource.getId());
int i = 0;
- for (Partition partition : resource.getPartitions()) {
- String partitionName = partition.getPartitionName();
- int nodeIndex = i % liveInstances.size();
- currentIdealState.getInstanceStateMap(partitionName).clear();
- currentIdealState.getInstanceStateMap(partitionName).put(liveInstances.get(nodeIndex),
- stateModelDef.getStatesPriorityStringList().get(0));
- resourceMapping.addReplicaMap(Id.partition(partitionName), ResourceAssignment
- .replicaMapFromStringMap(currentIdealState.getInstanceStateMap(partitionName)));
+ for (PartitionId partitionId : resource.getPartitionSet()) {
+ int nodeIndex = i % liveParticipants.size();
+ Map<ParticipantId, State> replicaMap = new HashMap<ParticipantId, State>();
+ replicaMap.put(liveParticipants.get(nodeIndex), stateModelDef.getStatesPriorityList()
+ .get(0));
+ resourceMapping.addReplicaMap(partitionId, replicaMap);
i++;
}
testRebalancerInvoked = true;
@@ -107,7 +97,7 @@ public class TestCustomizedIdealStateRebalancer extends
Assert.assertTrue(result);
Thread.sleep(1000);
HelixDataAccessor accessor =
- new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor(_zkClient));
+ new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2));
Assert.assertEquals(ev.getPartitionStringSet().size(), 60);
@@ -119,7 +109,6 @@ public class TestCustomizedIdealStateRebalancer extends
Assert.assertEquals(is.getPreferenceList(partition).size(), 0);
Assert.assertEquals(is.getInstanceStateMap(partition).size(), 0);
}
- Assert.assertTrue(testRebalancerCreated);
Assert.assertTrue(testRebalancerInvoked);
}
@@ -138,7 +127,7 @@ public class TestCustomizedIdealStateRebalancer extends
public boolean verify() {
try {
HelixDataAccessor accessor =
- new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(_client));
+ new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_client));
Builder keyBuilder = accessor.keyBuilder();
int numberOfPartitions =
accessor.getProperty(keyBuilder.idealState(_resourceName)).getRecord().getListFields()
@@ -159,9 +148,9 @@ public class TestCustomizedIdealStateRebalancer extends
if (instances == 0) {
instances = cache.getLiveInstances().size();
}
- return verifyBalanceExternalView(
- accessor.getProperty(keyBuilder.externalView(_resourceName)).getRecord(),
- numberOfPartitions, masterValue, replicas, instances);
+ ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName));
+ return verifyBalanceExternalView(externalView.getRecord(), numberOfPartitions, masterValue,
+ replicas, instances);
} catch (Exception e) {
return false;
}
[3/3] git commit: [HELIX-109] adding config classes
Posted by ki...@apache.org.
[HELIX-109] adding config classes
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/5972a44e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/5972a44e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/5972a44e
Branch: refs/heads/helix-logical-model
Commit: 5972a44e726dcb0d9ea3671eac5caf45fc72c0fc
Parents: c07569d
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Fri Sep 6 22:39:57 2013 -0700
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Fri Sep 6 22:39:57 2013 -0700
----------------------------------------------------------------------
.../main/java/org/apache/helix/api/Cluster.java | 54 ++++--
.../org/apache/helix/api/ClusterAccessor.java | 63 +++++-
.../org/apache/helix/api/ClusterConfig.java | 180 +++++++++++++++++
.../java/org/apache/helix/api/Participant.java | 157 ++-------------
.../org/apache/helix/api/ParticipantConfig.java | 194 +++++++++++++++++++
.../org/apache/helix/api/RebalancerConfig.java | 64 +++++-
.../org/apache/helix/api/RebalancerRef.java | 5 +-
.../java/org/apache/helix/api/Resource.java | 172 +++++-----------
.../org/apache/helix/api/ResourceConfig.java | 173 +++++++++++++++++
.../controller/GenericHelixController.java | 14 +-
.../rebalancer/NewAutoRebalancer.java | 42 ++--
.../rebalancer/NewCustomRebalancer.java | 13 +-
.../rebalancer/NewSemiAutoRebalancer.java | 13 +-
.../util/NewConstraintBasedAssignment.java | 5 +-
.../stages/NewBestPossibleStateCalcStage.java | 7 +-
.../stages/NewBestPossibleStateOutput.java | 9 +
.../stages/NewCurrentStateComputationStage.java | 12 +-
.../stages/NewExternalViewComputeStage.java | 12 +-
.../stages/NewMessageGenerationStage.java | 22 ++-
.../stages/NewMessageSelectionStage.java | 14 +-
.../stages/NewMessageThrottleStage.java | 12 +-
.../stages/NewResourceComputationStage.java | 16 +-
.../stages/NewTaskAssignmentStage.java | 18 +-
.../apache/helix/model/ClusterConstraints.java | 8 +
.../java/org/apache/helix/model/IdealState.java | 18 +-
.../apache/helix/model/ResourceAssignment.java | 8 +
.../helix/tools/ClusterStateVerifier.java | 84 +++++---
.../org/apache/helix/api/TestNewStages.java | 12 +-
.../helix/controller/stages/BaseStageTest.java | 67 ++++---
.../TestBestPossibleCalcStageCompatibility.java | 73 ++++---
.../stages/TestBestPossibleStateCalcStage.java | 35 ++--
.../stages/TestCompatibilityCheckStage.java | 13 +-
.../TestCurrentStateComputationStage.java | 52 ++---
.../stages/TestMessageThrottleStage.java | 35 ++--
.../stages/TestMsgSelectionStage.java | 96 ++++++---
.../stages/TestRebalancePipeline.java | 76 ++++----
.../stages/TestResourceComputationStage.java | 79 ++++----
.../helix/integration/TestAutoRebalance.java | 6 +-
.../TestCustomizedIdealStateRebalancer.java | 59 +++---
39 files changed, 1303 insertions(+), 689 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index e890fb4..ce2318a 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -25,13 +25,14 @@ import java.util.Map;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
/**
* Represent a logical helix cluster
*/
public class Cluster {
- private final ClusterId _id;
/**
* map of resource-id to resource
@@ -60,9 +61,7 @@ public class Cluster {
private final ControllerId _leaderId;
- private final ClusterConfig _config = null;
-
- private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+ private final ClusterConfig _config;
/**
* construct a cluster
@@ -74,9 +73,26 @@ public class Cluster {
*/
public Cluster(ClusterId id, Map<ResourceId, Resource> resourceMap,
Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
- ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap) {
-
- _id = id;
+ ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap, boolean isPaused) {
+
+ // build the config
+ // Guava's transform and "copy" functions really return views so the maps all reflect each other
+ Map<ResourceId, ResourceConfig> resourceConfigMap =
+ Maps.transformValues(resourceMap, new Function<Resource, ResourceConfig>() {
+ @Override
+ public ResourceConfig apply(Resource resource) {
+ return resource.getConfig();
+ }
+ });
+ Map<ParticipantId, ParticipantConfig> participantConfigMap =
+ Maps.transformValues(participantMap, new Function<Participant, ParticipantConfig>() {
+ @Override
+ public ParticipantConfig apply(Participant participant) {
+ return participant.getConfig();
+ }
+ });
+ _config =
+ new ClusterConfig(id, resourceConfigMap, participantConfigMap, constraintMap, isPaused);
_resourceMap = ImmutableMap.copyOf(resourceMap);
@@ -94,8 +110,6 @@ public class Cluster {
_leaderId = leaderId;
- _constraintMap = ImmutableMap.copyOf(constraintMap);
-
// TODO impl this when we persist controllers and spectators on zookeeper
_controllerMap = ImmutableMap.copyOf(controllerMap);
_spectatorMap = Collections.emptyMap();
@@ -106,7 +120,7 @@ public class Cluster {
* @return cluster id
*/
public ClusterId getId() {
- return _id;
+ return _config.getId();
}
/**
@@ -167,17 +181,27 @@ public class Cluster {
}
/**
- * @return
+ * Get all the constraints on the cluster
+ * @return map of constraint type to constraints
*/
public Map<ConstraintType, ClusterConstraints> getConstraintMap() {
- return _constraintMap;
+ return _config.getConstraintMap();
}
/**
- * @param type
- * @return
+ * Get a cluster constraint
+ * @param type the type of constrant to query
+ * @return cluster constraints, or null if none
*/
public ClusterConstraints getConstraint(ConstraintType type) {
- return _constraintMap.get(type);
+ return _config.getConstraintMap().get(type);
+ }
+
+ /**
+ * Check the pasued status of the cluster
+ * @return true if paused, false otherwise
+ */
+ public boolean isPaused() {
+ return _config.isPaused();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
index 04d5831..a9fcf79 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
@@ -24,12 +24,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
@@ -76,7 +78,7 @@ public class ClusterAccessor {
*/
public void dropCluster() {
LOG.info("Dropping cluster: " + _clusterId);
- List<String> liveInstanceNames =_accessor.getChildNames(_keyBuilder.liveInstances());
+ List<String> liveInstanceNames = _accessor.getChildNames(_keyBuilder.liveInstances());
if (liveInstanceNames.size() > 0) {
throw new HelixException("Can't drop cluster: " + _clusterId
+ " because there are running participant: " + liveInstanceNames
@@ -97,6 +99,7 @@ public class ClusterAccessor {
* @return cluster
*/
public Cluster readCluster() {
+ // TODO many of these should live in resource, participant, etc accessors
/**
* map of instance-id to instance-config
*/
@@ -147,13 +150,20 @@ public class ClusterAccessor {
Map<String, ClusterConstraints> constraintMap =
_accessor.getChildValuesMap(_keyBuilder.constraints());
+ /**
+ * Map of resource id to external view
+ */
+ Map<String, ExternalView> externalViewMap =
+ _accessor.getChildValuesMap(_keyBuilder.externalViews());
+
Map<ResourceId, Resource> resourceMap = new HashMap<ResourceId, Resource>();
for (String resourceName : idealStateMap.keySet()) {
IdealState idealState = idealStateMap.get(resourceName);
-
// TODO pass resource assignment
ResourceId resourceId = Id.resource(resourceName);
- resourceMap.put(resourceId, new Resource(resourceId, idealState, null));
+ resourceMap.put(resourceId,
+ new Resource(resourceId, idealState, null, externalViewMap.get(resourceName),
+ liveInstanceMap.size()));
}
Map<ParticipantId, Participant> participantMap = new HashMap<ParticipantId, Participant>();
@@ -182,8 +192,11 @@ public class ClusterAccessor {
constraintMap.get(constraintType));
}
+ PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
+ boolean isPaused = pauseSignal != null;
+
return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
- clusterConstraintMap);
+ clusterConstraintMap, isPaused);
}
/**
@@ -204,7 +217,7 @@ public class ClusterAccessor {
* add a resource to cluster
* @param resource
*/
- public void addResourceToCluster(Resource resource) {
+ public void addResourceToCluster(ResourceConfig resource) {
StateModelDefId stateModelDefId = resource.getRebalancerConfig().getStateModelDefId();
if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
throw new HelixException("State model: " + stateModelDefId + " not found in cluster: "
@@ -217,8 +230,42 @@ public class ClusterAccessor {
+ ", because resource ideal state already exists in cluster: " + _clusterId);
}
- // TODO convert rebalancerConfig to idealState
- _accessor.createProperty(_keyBuilder.idealState(resourceId.stringify()), null);
+ // Create an IdealState from a RebalancerConfig
+ RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+ IdealState idealState = new IdealState(resourceId);
+ idealState.setRebalanceMode(rebalancerConfig.getRebalancerMode());
+ idealState.setMaxPartitionsPerInstance(rebalancerConfig.getMaxPartitionsPerParticipant());
+ if (rebalancerConfig.canAssignAnyLiveParticipant()) {
+ idealState.setReplicas(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString());
+ } else {
+ idealState.setReplicas(Integer.toString(rebalancerConfig.getReplicaCount()));
+ }
+ idealState.setStateModelDefId(rebalancerConfig.getStateModelDefId());
+ for (PartitionId partitionId : resource.getPartitionSet()) {
+ List<ParticipantId> preferenceList = rebalancerConfig.getPreferenceList(partitionId);
+ Map<ParticipantId, State> preferenceMap = rebalancerConfig.getPreferenceMap(partitionId);
+ if (preferenceList != null) {
+ idealState.setPreferenceList(partitionId, preferenceList);
+ }
+ if (preferenceMap != null) {
+ idealState.setParticipantStateMap(partitionId, preferenceMap);
+ }
+ }
+ idealState.setBucketSize(rebalancerConfig.getBucketSize());
+ idealState.setBatchMessageMode(rebalancerConfig.getBatchMessageMode());
+ String groupTag = rebalancerConfig.getParticipantGroupTag();
+ if (groupTag != null) {
+ idealState.setInstanceGroupTag(groupTag);
+ }
+ RebalancerRef rebalancerRef = rebalancerConfig.getRebalancerRef();
+ if (rebalancerRef != null) {
+ idealState.setRebalancerRef(rebalancerRef);
+ }
+ StateModelFactoryId stateModelFactoryId = rebalancerConfig.getStateModelFactoryId();
+ if (stateModelFactoryId != null) {
+ idealState.setStateModelFactoryId(stateModelFactoryId);
+ }
+ _accessor.createProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
}
/**
@@ -244,7 +291,7 @@ public class ClusterAccessor {
* add a participant to cluster
* @param participant
*/
- public void addParticipantToCluster(Participant participant) {
+ public void addParticipantToCluster(ParticipantConfig participant) {
if (!isClusterStructureValid()) {
throw new HelixException("Cluster: " + _clusterId + " structure is not valid");
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
index 8a2f629..1585aae 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
@@ -1,5 +1,14 @@
package org.apache.helix.api;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+
+import com.google.common.collect.ImmutableMap;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,6 +28,177 @@ package org.apache.helix.api;
* under the License.
*/
+/**
+ * Configuration properties of a cluster
+ */
public class ClusterConfig {
+ private final ClusterId _id;
+ private final Map<ResourceId, ResourceConfig> _resourceMap;
+ private final Map<ParticipantId, ParticipantConfig> _participantMap;
+ private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+ private final boolean _isPaused;
+
+ /**
+ * Initialize a cluster configuration. Also see ClusterConfig.Builder
+ * @param id
+ * @param resourceMap
+ * @param participantMap
+ * @param constraintMap
+ */
+ public ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
+ Map<ParticipantId, ParticipantConfig> participantMap,
+ Map<ConstraintType, ClusterConstraints> constraintMap, boolean isPaused) {
+ _id = id;
+ _resourceMap = ImmutableMap.copyOf(resourceMap);
+ _participantMap = ImmutableMap.copyOf(participantMap);
+ _constraintMap = ImmutableMap.copyOf(constraintMap);
+ _isPaused = isPaused;
+ }
+
+ /**
+ * Get cluster id
+ * @return cluster id
+ */
+ public ClusterId getId() {
+ return _id;
+ }
+
+ /**
+ * Get resources in the cluster
+ * @return a map of resource id to resource, or empty map if none
+ */
+ public Map<ResourceId, ResourceConfig> getResourceMap() {
+ return _resourceMap;
+ }
+
+ /**
+ * Get all the constraints on the cluster
+ * @return map of constraint type to constraints
+ */
+ public Map<ConstraintType, ClusterConstraints> getConstraintMap() {
+ return _constraintMap;
+ }
+
+ /**
+ * Get participants of the cluster
+ * @return a map of participant id to participant, or empty map if none
+ */
+ public Map<ParticipantId, ParticipantConfig> getParticipantMap() {
+ return _participantMap;
+ }
+
+ /**
+ * Check the pasued status of the cluster
+ * @return true if paused, false otherwise
+ */
+ public boolean isPaused() {
+ return _isPaused;
+ }
+
+ /**
+ * Assembles a cluster configuration
+ */
+ public static class Builder {
+ private final ClusterId _id;
+ private final Map<ResourceId, ResourceConfig> _resourceMap;
+ private final Map<ParticipantId, ParticipantConfig> _participantMap;
+ private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+ private boolean _isPaused;
+
+ /**
+ * Initialize builder for a cluster
+ * @param id cluster id
+ */
+ public Builder(ClusterId id) {
+ _id = id;
+ _resourceMap = new HashMap<ResourceId, ResourceConfig>();
+ _participantMap = new HashMap<ParticipantId, ParticipantConfig>();
+ _constraintMap = new HashMap<ConstraintType, ClusterConstraints>();
+ _isPaused = false;
+ }
+
+ /**
+ * Add a resource to the cluster
+ * @param resource resource configuration
+ * @return Builder
+ */
+ public Builder addResource(ResourceConfig resource) {
+ _resourceMap.put(resource.getId(), resource);
+ return this;
+ }
+
+ /**
+ * Add multiple resources to the cluster
+ * @param resources resource configurations
+ * @return Builder
+ */
+ public Builder addResources(Collection<ResourceConfig> resources) {
+ for (ResourceConfig resource : resources) {
+ addResource(resource);
+ }
+ return this;
+ }
+
+ /**
+ * Add a participant to the cluster
+ * @param participant participant configuration
+ * @return Builder
+ */
+ public Builder addParticipant(ParticipantConfig participant) {
+ _participantMap.put(participant.getId(), participant);
+ return this;
+ }
+
+ /**
+ * Add multiple participants to the cluster
+ * @param participants participant configurations
+ * @return Builder
+ */
+ public Builder addParticipants(Collection<ParticipantConfig> participants) {
+ for (ParticipantConfig participant : participants) {
+ addParticipant(participant);
+ }
+ return this;
+ }
+
+ /**
+ * Add a constraint to the cluster
+ * @param constraint cluster constraint of a specific type
+ * @return Builder
+ */
+ public Builder addConstraint(ClusterConstraints constraint) {
+ _constraintMap.put(constraint.getType(), constraint);
+ return this;
+ }
+
+ /**
+ * Add multiple constraints to the cluster
+ * @param constraints cluster constraints of multiple distinct types
+ * @return Builder
+ */
+ public Builder addConstraints(Collection<ClusterConstraints> constraints) {
+ for (ClusterConstraints constraint : constraints) {
+ addConstraint(constraint);
+ }
+ return this;
+ }
+
+ /**
+ * Set the paused status of the cluster
+ * @param isPaused true if paused, false otherwise
+ * @return Builder
+ */
+ public Builder setPausedStatus(boolean isPaused) {
+ _isPaused = isPaused;
+ return this;
+ }
+ /**
+ * Create the cluster configuration
+ * @return ClusterConfig
+ */
+ public ClusterConfig build() {
+ return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _isPaused);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/Participant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Participant.java b/helix-core/src/main/java/org/apache/helix/api/Participant.java
index 10ceebd..0c0cd12 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Participant.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Participant.java
@@ -19,8 +19,6 @@ package org.apache.helix.api;
* under the License.
*/
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -28,22 +26,12 @@ import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
/**
* A cluster participant
*/
public class Participant {
- private final ParticipantId _id;
- private final String _hostName;
- private final int _port;
- private final boolean _isEnabled;
-
- /**
- * set of disabled partition id's
- */
- private final Set<PartitionId> _disabledPartitionIdSet;
- private final Set<String> _tags;
+ private final ParticipantConfig _config;
private final RunningInstance _runningInstance;
@@ -64,12 +52,7 @@ public class Participant {
public Participant(ParticipantId id, String hostName, int port, boolean isEnabled,
Set<PartitionId> disabledPartitionIdSet, Set<String> tags, RunningInstance runningInstance,
Map<ResourceId, CurrentState> currentStateMap, Map<MessageId, Message> messageMap) {
- _id = id;
- _hostName = hostName;
- _port = port;
- _isEnabled = isEnabled;
- _disabledPartitionIdSet = ImmutableSet.copyOf(disabledPartitionIdSet);
- _tags = ImmutableSet.copyOf(tags);
+ _config = new ParticipantConfig(id, hostName, port, isEnabled, disabledPartitionIdSet, tags);
_runningInstance = runningInstance;
_currentStateMap = ImmutableMap.copyOf(currentStateMap);
_messageMap = ImmutableMap.copyOf(messageMap);
@@ -80,7 +63,7 @@ public class Participant {
* @return host name, or null if not applicable
*/
public String getHostName() {
- return _hostName;
+ return _config.getHostName();
}
/**
@@ -88,7 +71,7 @@ public class Participant {
* @return port number, or -1 if not applicable
*/
public int getPort() {
- return _port;
+ return _config.getPort();
}
/**
@@ -96,7 +79,7 @@ public class Participant {
* @return true if enabled or false otherwise
*/
public boolean isEnabled() {
- return _isEnabled;
+ return _config.isEnabled();
}
/**
@@ -120,7 +103,7 @@ public class Participant {
* @return set of disabled partition id's, or empty set if none
*/
public Set<PartitionId> getDisablePartitionIds() {
- return _disabledPartitionIdSet;
+ return _config.getDisablePartitionIds();
}
/**
@@ -128,7 +111,7 @@ public class Participant {
* @return set of tags
*/
public Set<String> getTags() {
- return _tags;
+ return _config.getTags();
}
/**
@@ -137,7 +120,7 @@ public class Participant {
* @return true if tagged, false otherwise
*/
public boolean hasTag(String tag) {
- return _tags.contains(tag);
+ return _config.hasTag(tag);
}
/**
@@ -156,125 +139,19 @@ public class Participant {
return _currentStateMap;
}
+ /**
+ * Get the participant id
+ * @return ParticipantId
+ */
public ParticipantId getId() {
- return _id;
+ return _config.getId();
}
/**
- * Assemble a participant
+ * Get the participant configuration
+ * @return ParticipantConfig that backs this participant
*/
- public static class Builder {
- private final ParticipantId _id;
- private final Set<PartitionId> _disabledPartitions;
- private final Set<String> _tags;
- private final Map<ResourceId, CurrentState> _currentStateMap;
- private final Map<MessageId, Message> _messageMap;
- private String _hostName;
- private int _port;
- private boolean _isEnabled;
- private RunningInstance _runningInstance;
-
- /**
- * Build a participant with a given id
- * @param id participant id
- */
- public Builder(ParticipantId id) {
- _id = id;
- _disabledPartitions = new HashSet<PartitionId>();
- _tags = new HashSet<String>();
- _currentStateMap = new HashMap<ResourceId, CurrentState>();
- _messageMap = new HashMap<MessageId, Message>();
- _isEnabled = true;
- }
-
- /**
- * Set the participant host name
- * @param hostName reachable host when live
- * @return Builder
- */
- public Builder hostName(String hostName) {
- _hostName = hostName;
- return this;
- }
-
- /**
- * Set the participant port
- * @param port port number
- * @return Builder
- */
- public Builder port(int port) {
- _port = port;
- return this;
- }
-
- /**
- * Set whether or not the participant is enabled
- * @param isEnabled true if enabled, false otherwise
- * @return Builder
- */
- public Builder enabled(boolean isEnabled) {
- _isEnabled = isEnabled;
- return this;
- }
-
- /**
- * Add a partition to disable for this participant
- * @param partitionId the partition to disable
- * @return Builder
- */
- public Builder addDisabledPartition(PartitionId partitionId) {
- _disabledPartitions.add(partitionId);
- return this;
- }
-
- /**
- * Add an arbitrary tag for this participant
- * @param tag the tag to add
- * @return Builder
- */
- public Builder addTag(String tag) {
- _tags.add(tag);
- return this;
- }
-
- /**
- * Add live properties to participants that are running
- * @param runningInstance live participant properties
- * @return Builder
- */
- public Builder runningInstance(RunningInstance runningInstance) {
- _runningInstance = runningInstance;
- return this;
- }
-
- /**
- * Add a resource current state for this participant
- * @param resourceId the resource the current state corresponds to
- * @param currentState the current state
- * @return Builder
- */
- public Builder addCurrentState(ResourceId resourceId, CurrentState currentState) {
- _currentStateMap.put(resourceId, currentState);
- return this;
- }
-
- /**
- * Add a message for the participant
- * @param message message to add
- * @return Builder
- */
- public Builder addMessage(Message message) {
- _messageMap.put(new MessageId(message.getId()), message);
- return this;
- }
-
- /**
- * Assemble the participant
- * @return instantiated Participant
- */
- public Participant build() {
- return new Participant(_id, _hostName, _port, _isEnabled, _disabledPartitions, _tags,
- _runningInstance, _currentStateMap, _messageMap);
- }
+ public ParticipantConfig getConfig() {
+ return _config;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java b/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
new file mode 100644
index 0000000..3c77a40
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
@@ -0,0 +1,194 @@
+package org.apache.helix.api;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Configuration properties of a Helix participant
+ */
+public class ParticipantConfig {
+ private final ParticipantId _id;
+ private final String _hostName;
+ private final int _port;
+ private final boolean _isEnabled;
+ private final Set<PartitionId> _disabledPartitions;
+ private final Set<String> _tags;
+
+ /**
+ * Initialize a participant configuration. Also see ParticipantConfig.Builder
+ * @param id participant id
+ * @param hostName host where participant can be reached
+ * @param port port to use to contact participant
+ * @param isEnabled true if enabled, false if disabled
+ * @param disabledPartitions set of partitions, if any to disable on this participant
+ * @param tags tags to set for the participant
+ */
+ public ParticipantConfig(ParticipantId id, String hostName, int port, boolean isEnabled,
+ Set<PartitionId> disabledPartitions, Set<String> tags) {
+ _id = id;
+ _hostName = hostName;
+ _port = port;
+ _isEnabled = isEnabled;
+ _disabledPartitions = ImmutableSet.copyOf(disabledPartitions);
+ _tags = ImmutableSet.copyOf(tags);
+ }
+
+ /**
+ * Get the host name of the participant
+ * @return host name, or null if not applicable
+ */
+ public String getHostName() {
+ return _hostName;
+ }
+
+ /**
+ * Get the port of the participant
+ * @return port number, or -1 if not applicable
+ */
+ public int getPort() {
+ return _port;
+ }
+
+ /**
+ * Get if the participant is enabled
+ * @return true if enabled or false otherwise
+ */
+ public boolean isEnabled() {
+ return _isEnabled;
+ }
+
+ /**
+ * Get disabled partition id's
+ * @return set of disabled partition id's, or empty set if none
+ */
+ public Set<PartitionId> getDisablePartitionIds() {
+ return _disabledPartitions;
+ }
+
+ /**
+ * Get tags
+ * @return set of tags
+ */
+ public Set<String> getTags() {
+ return _tags;
+ }
+
+ /**
+ * Check if participant has a tag
+ * @param tag tag to check
+ * @return true if tagged, false otherwise
+ */
+ public boolean hasTag(String tag) {
+ return _tags.contains(tag);
+ }
+
+ /**
+ * Get the participant id
+ * @return ParticipantId
+ */
+ public ParticipantId getId() {
+ return _id;
+ }
+
+ /**
+ * Assemble a participant
+ */
+ public static class Builder {
+ private final ParticipantId _id;
+ private String _hostName;
+ private int _port;
+ private boolean _isEnabled;
+ private final Set<PartitionId> _disabledPartitions;
+ private final Set<String> _tags;
+
+ /**
+ * Build a participant with a given id
+ * @param id participant id
+ */
+ public Builder(ParticipantId id) {
+ _id = id;
+ _disabledPartitions = new HashSet<PartitionId>();
+ _tags = new HashSet<String>();
+ _isEnabled = true;
+ }
+
+ /**
+ * Set the participant host name
+ * @param hostName reachable host when live
+ * @return Builder
+ */
+ public Builder hostName(String hostName) {
+ _hostName = hostName;
+ return this;
+ }
+
+ /**
+ * Set the participant port
+ * @param port port number
+ * @return Builder
+ */
+ public Builder port(int port) {
+ _port = port;
+ return this;
+ }
+
+ /**
+ * Set whether or not the participant is enabled
+ * @param isEnabled true if enabled, false otherwise
+ * @return Builder
+ */
+ public Builder enabled(boolean isEnabled) {
+ _isEnabled = isEnabled;
+ return this;
+ }
+
+ /**
+ * Add a partition to disable for this participant
+ * @param partitionId the partition to disable
+ * @return Builder
+ */
+ public Builder addDisabledPartition(PartitionId partitionId) {
+ _disabledPartitions.add(partitionId);
+ return this;
+ }
+
+ /**
+ * Add an arbitrary tag for this participant
+ * @param tag the tag to add
+ * @return Builder
+ */
+ public Builder addTag(String tag) {
+ _tags.add(tag);
+ return this;
+ }
+
+ /**
+ * Assemble the participant
+ * @return instantiated Participant
+ */
+ public ParticipantConfig build() {
+ return new ParticipantConfig(_id, _hostName, _port, _isEnabled, _disabledPartitions, _tags);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
index 4ac254d..257354d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
@@ -19,9 +19,11 @@ package org.apache.helix.api;
* under the License.
*/
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.helix.HelixConstants;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.ResourceAssignment;
@@ -40,6 +42,7 @@ public class RebalancerConfig {
private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
private final ResourceAssignment _resourceAssignment;
private final int _replicaCount;
+ private final boolean _anyLiveParticipant;
private final String _participantGroupTag;
private final int _maxPartitionsPerParticipant;
private final int _bucketSize;
@@ -51,11 +54,19 @@ public class RebalancerConfig {
* @param idealState the physical ideal state
* @param resourceAssignment last mapping of a resource
*/
- public RebalancerConfig(IdealState idealState, ResourceAssignment resourceAssignment) {
+ public RebalancerConfig(IdealState idealState, ResourceAssignment resourceAssignment,
+ int liveParticipantCount) {
_rebalancerMode = idealState.getRebalanceMode();
_rebalancerRef = idealState.getRebalancerRef();
_stateModelDefId = idealState.getStateModelDefId();
- _replicaCount = Integer.parseInt(idealState.getReplicas());
+ String replicaCount = idealState.getReplicas();
+ if (replicaCount.equals(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString())) {
+ _replicaCount = liveParticipantCount;
+ _anyLiveParticipant = true;
+ } else {
+ _replicaCount = Integer.parseInt(idealState.getReplicas());
+ _anyLiveParticipant = false;
+ }
_participantGroupTag = idealState.getInstanceGroupTag();
_maxPartitionsPerParticipant = idealState.getMaxPartitionsPerInstance();
_bucketSize = idealState.getBucketSize();
@@ -68,10 +79,14 @@ public class RebalancerConfig {
ImmutableMap.Builder<PartitionId, Map<ParticipantId, State>> preferenceMaps =
new ImmutableMap.Builder<PartitionId, Map<ParticipantId, State>>();
for (PartitionId partitionId : idealState.getPartitionSet()) {
- preferenceLists.put(partitionId,
- ImmutableList.copyOf(idealState.getPreferenceList(partitionId)));
- preferenceMaps.put(partitionId,
- ImmutableMap.copyOf(idealState.getParticipantStateMap(partitionId)));
+ List<ParticipantId> preferenceList = idealState.getPreferenceList(partitionId);
+ if (preferenceList != null) {
+ preferenceLists.put(partitionId, ImmutableList.copyOf(preferenceList));
+ }
+ Map<ParticipantId, State> preferenceMap = idealState.getParticipantStateMap(partitionId);
+ if (preferenceMap != null) {
+ preferenceMaps.put(partitionId, ImmutableMap.copyOf(preferenceMap));
+ }
}
_preferenceLists = preferenceLists.build();
_preferenceMaps = preferenceMaps.build();
@@ -118,7 +133,10 @@ public class RebalancerConfig {
* @return the ordered preference list (early entries are more preferred)
*/
public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
- return _preferenceLists.get(partitionId);
+ if (_preferenceLists.containsKey(partitionId)) {
+ return _preferenceLists.get(partitionId);
+ }
+ return Collections.emptyList();
}
/**
@@ -127,7 +145,10 @@ public class RebalancerConfig {
* @return a mapping of participant to state for each replica
*/
public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
- return _preferenceMaps.get(partitionId);
+ if (_preferenceMaps.containsKey(partitionId)) {
+ return _preferenceMaps.get(partitionId);
+ }
+ return Collections.emptyMap();
}
/**
@@ -179,10 +200,19 @@ public class RebalancerConfig {
}
/**
+ * Check if replicas can be assigned to any live participant
+ * @return true if they can, false if they cannot
+ */
+ public boolean canAssignAnyLiveParticipant() {
+ return _anyLiveParticipant;
+ }
+
+ /**
* Assembles a RebalancerConfig
*/
public static class Builder {
private final IdealState _idealState;
+ private boolean _anyLiveParticipant;
private ResourceAssignment _resourceAssignment;
/**
@@ -191,6 +221,7 @@ public class RebalancerConfig {
*/
public Builder(ResourceId resourceId) {
_idealState = new IdealState(resourceId);
+ _anyLiveParticipant = false;
}
/**
@@ -283,11 +314,26 @@ public class RebalancerConfig {
}
/**
+ * Set whether any live participant should be used in rebalancing
+ * @param useAnyParticipant true if any live participant can be used, false otherwise
+ * @return
+ */
+ public Builder anyLiveParticipant(boolean useAnyParticipant) {
+ _anyLiveParticipant = true;
+ return this;
+ }
+
+ /**
* Assemble a RebalancerConfig
* @return a fully defined rebalancer configuration
*/
public RebalancerConfig build() {
- return new RebalancerConfig(_idealState, _resourceAssignment);
+ if (_anyLiveParticipant) {
+ return new RebalancerConfig(_idealState, _resourceAssignment, Integer.parseInt(_idealState
+ .getReplicas()));
+ } else {
+ return new RebalancerConfig(_idealState, _resourceAssignment, -1);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
index 5f22898..5c7ce56 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerRef.java
@@ -62,9 +62,12 @@ public class RebalancerRef {
/**
* Get a rebalancer class reference
* @param rebalancerClassName name of the class
- * @return RebalancerRef
+ * @return RebalancerRef or null if name is null
*/
public static RebalancerRef from(String rebalancerClassName) {
+ if (rebalancerClassName == null) {
+ return null;
+ }
return new RebalancerRef(rebalancerClassName);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 0c8b730..8445617 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -20,7 +20,6 @@ package org.apache.helix.api;
*/
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -29,41 +28,33 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.ResourceAssignment;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
/**
* Represent a resource entity in helix cluster
*/
public class Resource {
- private final ResourceId _id;
- private final RebalancerConfig _rebalancerConfig;
- private final SchedulerTaskConfig _schedulerTaskConfig;
-
- private final Map<PartitionId, Partition> _partitionMap;
-
+ private final ResourceConfig _config;
private final ExternalView _externalView;
/**
* Construct a resource
- * @param idealState
+ * @param id resource id
+ * @param idealState ideal state of the resource
* @param currentStateMap map of participant-id to current state
+ * @param liveParticipantCount number of live participants in the system
*/
- public Resource(ResourceId id, IdealState idealState, ResourceAssignment resourceAssignment) {
- _id = id;
- _rebalancerConfig = new RebalancerConfig(idealState, resourceAssignment);
-
+ public Resource(ResourceId id, IdealState idealState, ResourceAssignment resourceAssignment,
+ ExternalView externalView, int liveParticipantCount) {
Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
- Map<PartitionId, Map<String, String>> schedulerTaskConfig =
+ Map<PartitionId, Map<String, String>> schedulerTaskConfigMap =
new HashMap<PartitionId, Map<String, String>>();
Map<String, Integer> transitionTimeoutMap = new HashMap<String, Integer>();
for (PartitionId partitionId : idealState.getPartitionSet()) {
partitionMap.put(partitionId, new Partition(partitionId));
// TODO refactor it
- Map<String, String> taskConfigMap = idealState.getRecord().getMapField(partitionId.stringify());
+ Map<String, String> taskConfigMap = idealState.getInstanceStateMap(partitionId.stringify());
if (taskConfigMap != null) {
- schedulerTaskConfig.put(partitionId, taskConfigMap);
+ schedulerTaskConfigMap.put(partitionId, taskConfigMap);
}
// TODO refactor it
@@ -78,53 +69,38 @@ public class Resource {
}
}
}
- _partitionMap = ImmutableMap.copyOf(partitionMap);
- _schedulerTaskConfig = new SchedulerTaskConfig(transitionTimeoutMap, schedulerTaskConfig);
+ SchedulerTaskConfig schedulerTaskConfig =
+ new SchedulerTaskConfig(transitionTimeoutMap, schedulerTaskConfigMap);
+ RebalancerConfig rebalancerConfig =
+ new RebalancerConfig(idealState, resourceAssignment, liveParticipantCount);
- _externalView = null;
- }
-
- /**
- * Construct a Resource
- * @param id resource identifier
- * @param partitionSet disjoint partitions of the resource
- * @param externalView external view of the resource
- * @param pendingExternalView pending external view based on unprocessed messages
- * @param rebalancerConfig configuration properties for rebalancing this resource
- */
- public Resource(ResourceId id, Map<PartitionId, Partition> partitionMap,
- ExternalView externalView,
- RebalancerConfig rebalancerConfig, SchedulerTaskConfig schedulerTaskConfig) {
- _id = id;
- _partitionMap = ImmutableMap.copyOf(partitionMap);
+ _config = new ResourceConfig(id, partitionMap, schedulerTaskConfig, rebalancerConfig);
_externalView = externalView;
- _rebalancerConfig = rebalancerConfig;
- _schedulerTaskConfig = schedulerTaskConfig;
}
/**
- * Get the set of partitions of the resource
- * @return set of partitions or empty set if none
+ * Get the partitions of the resource
+ * @return map of partition id to partition or empty map if none
*/
public Map<PartitionId, Partition> getPartitionMap() {
- return _partitionMap;
+ return _config.getPartitionMap();
}
/**
- * @param partitionId
- * @return
+ * Get a partition that the resource contains
+ * @param partitionId the partition id to look up
+ * @return Partition or null if none is present with the given id
*/
public Partition getPartition(PartitionId partitionId) {
- return _partitionMap.get(partitionId);
+ return _config.getPartition(partitionId);
}
/**
- * @return
+ * Get the set of partition ids that the resource contains
+ * @return partition id set, or empty if none
*/
- public Set<Partition> getPartitionSet() {
- Set<Partition> partitionSet = new HashSet<Partition>();
- partitionSet.addAll(_partitionMap.values());
- return ImmutableSet.copyOf(partitionSet);
+ public Set<PartitionId> getPartitionSet() {
+ return _config.getPartitionSet();
}
/**
@@ -135,95 +111,35 @@ public class Resource {
return _externalView;
}
+ /**
+ * Get the resource properties configuring rebalancing
+ * @return RebalancerConfig properties
+ */
public RebalancerConfig getRebalancerConfig() {
- return _rebalancerConfig;
+ return _config.getRebalancerConfig();
}
+ /**
+ * Get the resource id
+ * @return ResourceId
+ */
public ResourceId getId() {
- return _id;
+ return _config.getId();
}
+ /**
+ * Get the properties configuring scheduler tasks
+ * @return SchedulerTaskConfig properties
+ */
public SchedulerTaskConfig getSchedulerTaskConfig() {
- return _schedulerTaskConfig;
+ return _config.getSchedulerTaskConfig();
}
/**
- * Assembles a Resource
+ * Get the configuration of this resource
+ * @return ResourceConfig that backs this Resource
*/
- public static class Builder {
- private final ResourceId _id;
- private final Map<PartitionId, Partition> _partitionMap;
- private ExternalView _externalView;
- private RebalancerConfig _rebalancerConfig;
- private SchedulerTaskConfig _schedulerTaskConfig;
-
- /**
- * Build a Resource with an id
- * @param id resource id
- */
- public Builder(ResourceId id) {
- _id = id;
- _partitionMap = new HashMap<PartitionId, Partition>();
- }
-
- /**
- * Add a partition that the resource serves
- * @param partition fully-qualified partition
- * @return Builder
- */
- public Builder addPartition(Partition partition) {
- _partitionMap.put(partition.getId(), partition);
- return this;
- }
-
- /**
- * Add a set of partitions
- * @param partitions
- * @return Builder
- */
- public Builder addPartitions(Set<Partition> partitions) {
- for (Partition partition : partitions) {
- addPartition(partition);
- }
- return this;
- }
-
- /**
- * Set the external view of this resource
- * @param extView currently served replica placement and state
- * @return Builder
- */
- public Builder externalView(ExternalView extView) {
- _externalView = extView;
- return this;
- }
-
- /**
- * Set the rebalancer configuration
- * @param rebalancerConfig properties of interest for rebalancing
- * @return Builder
- */
- public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
- _rebalancerConfig = rebalancerConfig;
- return this;
- }
-
- /**
- * @param schedulerTaskConfig
- * @return
- */
- public Builder schedulerTaskConfig(SchedulerTaskConfig schedulerTaskConfig) {
- _schedulerTaskConfig = schedulerTaskConfig;
- return this;
- }
-
- /**
- * Create a Resource object
- * @return instantiated Resource
- */
- public Resource build() {
- return new Resource(_id, _partitionMap, _externalView, _rebalancerConfig,
- _schedulerTaskConfig);
- }
+ public ResourceConfig getConfig() {
+ return _config;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
new file mode 100644
index 0000000..5170c40
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
@@ -0,0 +1,173 @@
+package org.apache.helix.api;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Full configuration of a Helix resource. Typically used to add or modify resources on a cluster
+ */
+public class ResourceConfig {
+ private final ResourceId _id;
+ private final Map<PartitionId, Partition> _partitionMap;
+ private final RebalancerConfig _rebalancerConfig;
+ private final SchedulerTaskConfig _schedulerTaskConfig;
+
+ /**
+ * Instantiate a configuration. Consider using ResourceConfig.Builder
+ * @param id resource id
+ * @param partitionMap map of partition identifiers to partition objects
+ * @param schedulerTaskConfig configuration for scheduler tasks associated with the resource
+ * @param rebalancerConfig configuration for rebalancing the resource
+ */
+ public ResourceConfig(ResourceId id, Map<PartitionId, Partition> partitionMap,
+ SchedulerTaskConfig schedulerTaskConfig, RebalancerConfig rebalancerConfig) {
+ _id = id;
+ _partitionMap = ImmutableMap.copyOf(partitionMap);
+ _schedulerTaskConfig = schedulerTaskConfig;
+ _rebalancerConfig = rebalancerConfig;
+ }
+
+ /**
+ * Get the partitions of the resource
+ * @return map of partition id to partition or empty map if none
+ */
+ public Map<PartitionId, Partition> getPartitionMap() {
+ return _partitionMap;
+ }
+
+ /**
+ * Get a partition that the resource contains
+ * @param partitionId the partition id to look up
+ * @return Partition or null if none is present with the given id
+ */
+ public Partition getPartition(PartitionId partitionId) {
+ return _partitionMap.get(partitionId);
+ }
+
+ /**
+ * Get the set of partition ids that the resource contains
+ * @return partition id set, or empty if none
+ */
+ public Set<PartitionId> getPartitionSet() {
+ Set<PartitionId> partitionSet = new HashSet<PartitionId>();
+ partitionSet.addAll(_partitionMap.keySet());
+ return ImmutableSet.copyOf(partitionSet);
+ }
+
+ /**
+ * Get the resource properties configuring rebalancing
+ * @return RebalancerConfig properties
+ */
+ public RebalancerConfig getRebalancerConfig() {
+ return _rebalancerConfig;
+ }
+
+ /**
+ * Get the resource id
+ * @return ResourceId
+ */
+ public ResourceId getId() {
+ return _id;
+ }
+
+ /**
+ * Get the properties configuring scheduler tasks
+ * @return SchedulerTaskConfig properties
+ */
+ public SchedulerTaskConfig getSchedulerTaskConfig() {
+ return _schedulerTaskConfig;
+ }
+
+ /**
+ * Assembles a ResourceConfig
+ */
+ public static class Builder {
+ private final ResourceId _id;
+ private final Map<PartitionId, Partition> _partitionMap;
+ private RebalancerConfig _rebalancerConfig;
+ private SchedulerTaskConfig _schedulerTaskConfig;
+
+ /**
+ * Build a Resource with an id
+ * @param id resource id
+ */
+ public Builder(ResourceId id) {
+ _id = id;
+ _partitionMap = new HashMap<PartitionId, Partition>();
+ }
+
+ /**
+ * Add a partition that the resource serves
+ * @param partition fully-qualified partition
+ * @return Builder
+ */
+ public Builder addPartition(Partition partition) {
+ _partitionMap.put(partition.getId(), partition);
+ return this;
+ }
+
+ /**
+ * Add a collection of partitions
+ * @param partitions
+ * @return Builder
+ */
+ public Builder addPartitions(Collection<Partition> partitions) {
+ for (Partition partition : partitions) {
+ addPartition(partition);
+ }
+ return this;
+ }
+
+ /**
+ * Set the rebalancer configuration
+ * @param rebalancerConfig properties of interest for rebalancing
+ * @return Builder
+ */
+ public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
+ _rebalancerConfig = rebalancerConfig;
+ return this;
+ }
+
+ /**
+ * @param schedulerTaskConfig
+ * @return
+ */
+ public Builder schedulerTaskConfig(SchedulerTaskConfig schedulerTaskConfig) {
+ _schedulerTaskConfig = schedulerTaskConfig;
+ return this;
+ }
+
+ /**
+ * Create a Resource object
+ * @return instantiated Resource
+ */
+ public ResourceConfig build() {
+ return new ResourceConfig(_id, _partitionMap, _schedulerTaskConfig, _rebalancerConfig);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 6570dc4..4df2201 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -44,14 +44,7 @@ import org.apache.helix.NotificationContext.Type;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.PipelineRegistry;
-import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.CompatibilityCheckStage;
-import org.apache.helix.controller.stages.CurrentStateComputationStage;
-import org.apache.helix.controller.stages.ExternalViewComputeStage;
-import org.apache.helix.controller.stages.MessageGenerationPhase;
-import org.apache.helix.controller.stages.MessageSelectionStage;
-import org.apache.helix.controller.stages.MessageThrottleStage;
import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
import org.apache.helix.controller.stages.NewCompatibilityCheckStage;
import org.apache.helix.controller.stages.NewCurrentStateComputationStage;
@@ -62,10 +55,6 @@ import org.apache.helix.controller.stages.NewMessageThrottleStage;
import org.apache.helix.controller.stages.NewReadClusterDataStage;
import org.apache.helix.controller.stages.NewResourceComputationStage;
import org.apache.helix.controller.stages.NewTaskAssignmentStage;
-import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.controller.stages.RebalanceIdealStateStage;
-import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.controller.stages.TaskAssignmentStage;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HealthStat;
@@ -205,8 +194,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
registry.register("idealStateChange", dataRefresh, rebalancePipeline);
registry.register("currentStateChange", dataRefresh, rebalancePipeline, externalViewPipeline);
registry.register("configChange", dataRefresh, rebalancePipeline);
- registry.register("liveInstanceChange", dataRefresh, rebalancePipeline,
- externalViewPipeline);
+ registry.register("liveInstanceChange", dataRefresh, rebalancePipeline, externalViewPipeline);
registry.register("messageChange", dataRefresh, rebalancePipeline);
registry.register("externalView", dataRefresh);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
index 8821082..a28166c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
@@ -29,9 +29,9 @@ import java.util.Set;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Id;
import org.apache.helix.api.Participant;
import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
@@ -46,6 +46,7 @@ import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
+import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.collect.Lists;
@@ -69,12 +70,17 @@ public class NewAutoRebalancer implements NewRebalancer {
public ResourceAssignment computeResourceMapping(Resource resource, Cluster cluster,
StateModelDefinition stateModelDef, NewCurrentStateOutput currentStateOutput) {
// Compute a preference list based on the current ideal state
- List<Partition> partitions = new ArrayList<Partition>(resource.getPartitionSet());
+ List<PartitionId> partitions = new ArrayList<PartitionId>(resource.getPartitionSet());
List<String> partitionNames = Lists.transform(partitions, Functions.toStringFunction());
RebalancerConfig config = resource.getRebalancerConfig();
Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
Map<ParticipantId, Participant> allParticipants = cluster.getParticipantMap();
- int replicas = config.getReplicaCount();
+ int replicas = -1;
+ if (config.canAssignAnyLiveParticipant()) {
+ replicas = liveParticipants.size();
+ } else {
+ replicas = config.getReplicaCount();
+ }
LinkedHashMap<String, Integer> stateCountMap =
ConstraintBasedAssignment.stateCount(stateModelDef, liveParticipants.size(), replicas);
@@ -129,17 +135,25 @@ public class NewAutoRebalancer implements NewRebalancer {
LOG.debug("Processing resource:" + resource.getId());
}
ResourceAssignment partitionMapping = new ResourceAssignment(resource.getId());
- for (Partition partition : partitions) {
+ for (PartitionId partition : partitions) {
Set<ParticipantId> disabledParticipantsForPartition =
- NewConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition.getId());
+ NewConstraintBasedAssignment.getDisabledParticipants(allParticipants, partition);
List<ParticipantId> preferenceList =
- NewConstraintBasedAssignment.getPreferenceList(cluster, partition.getId(), config);
+ Lists.transform(newMapping.getListField(partition.stringify()),
+ new Function<String, ParticipantId>() {
+ @Override
+ public ParticipantId apply(String participantName) {
+ return Id.participant(participantName);
+ }
+ });
+ preferenceList =
+ NewConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
Map<ParticipantId, State> bestStateForPartition =
NewConstraintBasedAssignment.computeAutoBestStateForPartition(liveParticipants,
stateModelDef, preferenceList,
- currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId()),
+ currentStateOutput.getCurrentStateMap(resource.getId(), partition),
disabledParticipantsForPartition);
- partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
+ partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
}
@@ -149,23 +163,23 @@ public class NewAutoRebalancer implements NewRebalancer {
Map<PartitionId, Map<ParticipantId, State>> map =
new HashMap<PartitionId, Map<ParticipantId, State>>();
- for (Partition partition : resource.getPartitionSet()) {
+ for (PartitionId partition : resource.getPartitionSet()) {
Map<ParticipantId, State> curStateMap =
- currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId());
- map.put(partition.getId(), new HashMap<ParticipantId, State>());
+ currentStateOutput.getCurrentStateMap(resource.getId(), partition);
+ map.put(partition, new HashMap<ParticipantId, State>());
for (ParticipantId node : curStateMap.keySet()) {
State state = curStateMap.get(node);
if (stateCountMap.containsKey(state)) {
- map.get(partition.getId()).put(node, state);
+ map.get(partition).put(node, state);
}
}
Map<ParticipantId, State> pendingStateMap =
- currentStateOutput.getPendingStateMap(resource.getId(), partition.getId());
+ currentStateOutput.getPendingStateMap(resource.getId(), partition);
for (ParticipantId node : pendingStateMap.keySet()) {
State state = pendingStateMap.get(node);
if (stateCountMap.containsKey(state)) {
- map.get(partition.getId()).put(node, state);
+ map.get(partition).put(node, state);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
index 8d000f5..0dd346b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewCustomRebalancer.java
@@ -27,7 +27,7 @@ import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
import org.apache.helix.api.State;
@@ -58,17 +58,16 @@ public class NewCustomRebalancer implements NewRebalancer {
}
ResourceAssignment partitionMapping = new ResourceAssignment(resource.getId());
RebalancerConfig config = resource.getRebalancerConfig();
- for (Partition partition : resource.getPartitionSet()) {
+ for (PartitionId partition : resource.getPartitionSet()) {
Map<ParticipantId, State> currentStateMap =
- currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId());
+ currentStateOutput.getCurrentStateMap(resource.getId(), partition);
Set<ParticipantId> disabledInstancesForPartition =
NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
- partition.getId());
+ partition);
Map<ParticipantId, State> bestStateForPartition =
computeCustomizedBestStateForPartition(cluster.getLiveParticipantMap(), stateModelDef,
- config.getPreferenceMap(partition.getId()), currentStateMap,
- disabledInstancesForPartition);
- partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
+ config.getPreferenceMap(partition), currentStateMap, disabledInstancesForPartition);
+ partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
index 27bb513..0b1611c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
@@ -25,7 +25,7 @@ import java.util.Set;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
import org.apache.helix.api.State;
@@ -56,19 +56,20 @@ public class NewSemiAutoRebalancer implements NewRebalancer {
}
ResourceAssignment partitionMapping = new ResourceAssignment(resource.getId());
RebalancerConfig config = resource.getRebalancerConfig();
- for (Partition partition : resource.getPartitionSet()) {
+ for (PartitionId partition : resource.getPartitionSet()) {
Map<ParticipantId, State> currentStateMap =
- currentStateOutput.getCurrentStateMap(resource.getId(), partition.getId());
+ currentStateOutput.getCurrentStateMap(resource.getId(), partition);
Set<ParticipantId> disabledInstancesForPartition =
NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
- partition.getId());
+ partition);
List<ParticipantId> preferenceList =
- NewConstraintBasedAssignment.getPreferenceList(cluster, partition.getId(), config);
+ NewConstraintBasedAssignment.getPreferenceList(cluster, partition,
+ config.getPreferenceList(partition));
Map<ParticipantId, State> bestStateForPartition =
NewConstraintBasedAssignment.computeAutoBestStateForPartition(
cluster.getLiveParticipantMap(), stateModelDef, preferenceList, currentStateMap,
disabledInstancesForPartition);
- partitionMapping.addReplicaMap(partition.getId(), bestStateForPartition);
+ partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
index 224853b..07856ca 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
@@ -34,7 +34,6 @@ import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.State;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -76,9 +75,7 @@ public class NewConstraintBasedAssignment {
* @return list with most preferred participants first
*/
public static List<ParticipantId> getPreferenceList(Cluster cluster, PartitionId partitionId,
- RebalancerConfig config) {
- List<ParticipantId> prefList = config.getPreferenceList(partitionId);
-
+ List<ParticipantId> prefList) {
if (prefList != null && prefList.size() == 1
&& StateModelToken.ANY_LIVEINSTANCE.toString().equals(prefList.get(0).stringify())) {
prefList = new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
index bc14297..52a5af2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -23,11 +23,11 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Id;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -58,7 +58,8 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
NewCurrentStateOutput currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.toString());
- Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
Cluster cluster = event.getAttribute("ClusterDataCache");
if (currentStateOutput == null || resourceMap == null || cluster == null) {
@@ -106,7 +107,7 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
// TODO check this
private NewBestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
- Map<ResourceId, Resource> resourceMap, NewCurrentStateOutput currentStateOutput) {
+ Map<ResourceId, ResourceConfig> resourceMap, NewCurrentStateOutput currentStateOutput) {
NewBestPossibleStateOutput output = new NewBestPossibleStateOutput();
Map<StateModelDefId, StateModelDefinition> stateModelDefs =
event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
index d5ee850..8c5005d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
@@ -2,6 +2,7 @@ package org.apache.helix.controller.stages;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import org.apache.helix.api.ResourceId;
import org.apache.helix.model.ResourceAssignment;
@@ -31,4 +32,12 @@ public class NewBestPossibleStateOutput {
public ResourceAssignment getResourceAssignment(ResourceId resourceId) {
return _resourceAssignmentMap.get(resourceId);
}
+
+ /**
+ * Get all of the resources currently assigned
+ * @return set of assigned resource ids
+ */
+ public Set<ResourceId> getAssignedResources() {
+ return _resourceAssignmentMap.keySet();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
index 0f8c33e..e0cd22f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
@@ -28,7 +28,7 @@ import org.apache.helix.api.Participant;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.Partition;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.SessionId;
import org.apache.helix.api.State;
@@ -48,7 +48,8 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
Cluster cluster = event.getAttribute("ClusterDataCache");
- Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
if (cluster == null || resourceMap == null) {
throw new StageException("Missing attributes in event:" + event
@@ -72,7 +73,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
}
ResourceId resourceId = message.getResourceId();
- Resource resource = resourceMap.get(resourceId);
+ ResourceConfig resource = resourceMap.get(resourceId);
if (resource == null) {
continue;
}
@@ -92,8 +93,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
for (PartitionId partitionId : partitionNames) {
Partition partition = resource.getPartition(partitionId);
if (partition != null) {
- currentStateOutput.setPendingState(resourceId, partitionId,
- participantId,
+ currentStateOutput.setPendingState(resourceId, partitionId, participantId,
message.getToState());
} else {
// log
@@ -113,7 +113,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
ResourceId resourceId = curState.getResourceId();
StateModelDefId stateModelDefId = curState.getStateModelDefId();
- Resource resource = resourceMap.get(resourceId);
+ ResourceConfig resource = resourceMap.get(resourceId);
if (resource == null) {
continue;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
index 1a99346..68169ae 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
@@ -29,7 +29,6 @@ import java.util.TreeMap;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
-import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
@@ -40,7 +39,7 @@ import org.apache.helix.api.Id;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
-import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -50,7 +49,6 @@ import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
import org.apache.helix.model.StatusUpdate;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.log4j.Logger;
@@ -64,7 +62,8 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
log.info("START ExternalViewComputeStage.process()");
HelixManager manager = event.getAttribute("helixmanager");
- Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
Cluster cluster = event.getAttribute("ClusterDataCache");
if (manager == null || resourceMap == null || cluster == null) {
@@ -89,7 +88,7 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
// view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
// if resource ideal state has bucket size, set it
// otherwise resource has been dropped, use bucket size from current state instead
- Resource resource = resourceMap.get(resourceId);
+ ResourceConfig resource = resourceMap.get(resourceId);
RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
if (rebalancerConfig.getBucketSize() > 0) {
view.setBucketSize(rebalancerConfig.getBucketSize());
@@ -139,8 +138,7 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
// message, and then remove the partitions from the ideal state
if (rebalancerConfig != null
&& rebalancerConfig.getStateModelDefId().stringify()
- .equalsIgnoreCase(
- DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+ .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
// TODO fix it
// updateScheduledTaskStatus(view, manager, idealState);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
index 0a72dc0..0ae72cf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
@@ -30,10 +30,10 @@ import org.apache.helix.api.Cluster;
import org.apache.helix.api.Id;
import org.apache.helix.api.MessageId;
import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.SessionId;
import org.apache.helix.api.State;
@@ -42,12 +42,10 @@ import org.apache.helix.api.StateModelFactoryId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
-import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -63,7 +61,8 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
Cluster cluster = event.getAttribute("ClusterDataCache");
Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
- Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
NewCurrentStateOutput currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.toString());
NewBestPossibleStateOutput bestPossibleStateOutput =
@@ -77,7 +76,7 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
NewMessageOutput output = new NewMessageOutput();
for (ResourceId resourceId : resourceMap.keySet()) {
- Resource resource = resourceMap.get(resourceId);
+ ResourceConfig resource = resourceMap.get(resourceId);
int bucketSize = resource.getRebalancerConfig().getBucketSize();
StateModelDefinition stateModelDef =
@@ -145,10 +144,13 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
if (rebalancerConfig != null
&& rebalancerConfig.getStateModelDefId().stringify()
.equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
- if (resource.getPartitionSet().size() > 0) {
- // TODO refactor it
- message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
- resource.getSchedulerTaskConfig().getTaskConfig(partitionId));
+ if (resource.getPartitionMap().size() > 0) {
+ // TODO refactor it -- we need a way to read in scheduler tasks a priori
+ Resource activeResource = cluster.getResource(resourceId);
+ if (activeResource != null) {
+ message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
+ activeResource.getSchedulerTaskConfig().getTaskConfig(partitionId));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
index 9745c64..ed06c5f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
@@ -28,10 +28,12 @@ import java.util.TreeMap;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantConfig;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
import org.apache.helix.api.StateModelDefId;
@@ -91,11 +93,11 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
Cluster cluster = event.getAttribute("ClusterDataCache");
Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
event.getAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString());
- Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
NewCurrentStateOutput currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.toString());
- NewMessageOutput messageGenOutput =
- event.getAttribute(AttributeName.MESSAGES_ALL.toString());
+ NewMessageOutput messageGenOutput = event.getAttribute(AttributeName.MESSAGES_ALL.toString());
if (cluster == null || resourceMap == null || currentStateOutput == null
|| messageGenOutput == null) {
throw new StageException("Missing attributes in event:" + event
@@ -105,7 +107,7 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
NewMessageOutput output = new NewMessageOutput();
for (ResourceId resourceId : resourceMap.keySet()) {
- Resource resource = resourceMap.get(resourceId);
+ ResourceConfig resource = resourceMap.get(resourceId);
StateModelDefinition stateModelDef =
stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
@@ -119,8 +121,8 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
List<Message> messages = messageGenOutput.getMessages(resourceId, partitionId);
List<Message> selectedMessages =
selectMessages(cluster.getLiveParticipantMap(),
- currentStateOutput.getCurrentStateMap(resourceId, partitionId),
- currentStateOutput.getPendingStateMap(resourceId, partitionId), messages,
+ currentStateOutput.getCurrentStateMap(resourceId, partitionId),
+ currentStateOutput.getPendingStateMap(resourceId, partitionId), messages,
stateConstraints, stateTransitionPriorities, stateModelDef.getInitialState());
output.setMessages(resourceId, partitionId, selectedMessages);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
index 5bea5b4..cfbd45c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
@@ -30,17 +30,16 @@ import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.ClusterConstraints;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
-import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ClusterConstraints.ConstraintValue;
+import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.Message;
import org.apache.log4j.Logger;
public class NewMessageThrottleStage extends AbstractBaseStage {
@@ -120,7 +119,8 @@ public class NewMessageThrottleStage extends AbstractBaseStage {
Cluster cluster = event.getAttribute("ClusterDataCache");
NewMessageOutput msgSelectionOutput =
event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
if (cluster == null || resourceMap == null || msgSelectionOutput == null) {
throw new StageException("Missing attributes in event: " + event
@@ -145,7 +145,7 @@ public class NewMessageThrottleStage extends AbstractBaseStage {
// go through all new messages, throttle if necessary
// assume messages should be sorted by state transition priority in messageSelection stage
for (ResourceId resourceId : resourceMap.keySet()) {
- Resource resource = resourceMap.get(resourceId);
+ ResourceConfig resource = resourceMap.get(resourceId);
// TODO fix it
for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
List<Message> messages = msgSelectionOutput.getMessages(resourceId, partitionId);
[2/3] [HELIX-109] adding config classes
Posted by ki...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
index af23eb2..a6d9db4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -19,8 +19,10 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Set;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
@@ -28,6 +30,7 @@ import org.apache.helix.api.Partition;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.StateModelFactoryId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -51,16 +54,17 @@ public class NewResourceComputationStage extends AbstractBaseStage {
throw new StageException("Missing attributes in event:" + event + ". Requires Cluster");
}
- Map<ResourceId, Resource.Builder> resourceBuilderMap =
- new LinkedHashMap<ResourceId, Resource.Builder>();
+ Map<ResourceId, ResourceConfig.Builder> resourceBuilderMap =
+ new LinkedHashMap<ResourceId, ResourceConfig.Builder>();
// include all resources in ideal-state
for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
Resource resource = cluster.getResource(resourceId);
RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
- Resource.Builder resourceBuilder = new Resource.Builder(resourceId);
+ ResourceConfig.Builder resourceBuilder = new ResourceConfig.Builder(resourceId);
resourceBuilder.rebalancerConfig(rebalancerConfig);
- resourceBuilder.addPartitions(resource.getPartitionSet());
+ Set<Partition> partitionSet = new HashSet<Partition>(resource.getPartitionMap().values());
+ resourceBuilder.addPartitions(partitionSet);
resourceBuilderMap.put(resourceId, resourceBuilder);
}
@@ -87,7 +91,7 @@ public class NewResourceComputationStage extends AbstractBaseStage {
rebalancerConfigBuilder.bucketSize(currentState.getBucketSize());
rebalancerConfigBuilder.batchMessageMode(currentState.getBatchMessageMode());
- Resource.Builder resourceBuilder = new Resource.Builder(resourceId);
+ ResourceConfig.Builder resourceBuilder = new ResourceConfig.Builder(resourceId);
resourceBuilder.rebalancerConfig(rebalancerConfigBuilder.build());
resourceBuilderMap.put(resourceId, resourceBuilder);
}
@@ -99,7 +103,7 @@ public class NewResourceComputationStage extends AbstractBaseStage {
}
// convert builder-map to resource-map
- Map<ResourceId, Resource> resourceMap = new LinkedHashMap<ResourceId, Resource>();
+ Map<ResourceId, ResourceConfig> resourceMap = new LinkedHashMap<ResourceId, ResourceConfig>();
for (ResourceId resourceId : resourceBuilderMap.keySet()) {
resourceMap.put(resourceId, resourceBuilderMap.get(resourceId).build());
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
index 2b8a0c8..f5bb47f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
@@ -35,13 +35,11 @@ import org.apache.helix.api.Id;
import org.apache.helix.api.Participant;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
import org.apache.log4j.Logger;
public class NewTaskAssignmentStage extends AbstractBaseStage {
@@ -53,9 +51,9 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
logger.info("START TaskAssignmentStage.process()");
HelixManager manager = event.getAttribute("helixmanager");
- Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
- NewMessageOutput messageOutput =
- event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
+ NewMessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
Cluster cluster = event.getAttribute("ClusterDataCache");
Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
@@ -68,7 +66,7 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
List<Message> messagesToSend = new ArrayList<Message>();
for (ResourceId resourceId : resourceMap.keySet()) {
- Resource resource = resourceMap.get(resourceId);
+ ResourceConfig resource = resourceMap.get(resourceId);
for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
messagesToSend.addAll(messages);
@@ -86,8 +84,8 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
}
List<Message> batchMessage(Builder keyBuilder, List<Message> messages,
- Map<ResourceId, Resource> resourceMap, Map<ParticipantId, Participant> liveParticipantMap,
- HelixManagerProperties properties) {
+ Map<ResourceId, ResourceConfig> resourceMap,
+ Map<ParticipantId, Participant> liveParticipantMap, HelixManagerProperties properties) {
// group messages by its CurrentState path + "/" + fromState + "/" + toState
Map<String, Message> batchMessages = new HashMap<String, Message>();
List<Message> outputMessages = new ArrayList<Message>();
@@ -96,7 +94,7 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
while (iter.hasNext()) {
Message message = iter.next();
ResourceId resourceId = message.getResourceId();
- Resource resource = resourceMap.get(resourceId);
+ ResourceConfig resource = resourceMap.get(resourceId);
ParticipantId participantId = Id.participant(message.getTgtName());
Participant liveParticipant = liveParticipantMap.get(participantId);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
index 3b46c13..ef47a12 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
@@ -76,6 +76,14 @@ public class ClusterConstraints extends HelixProperty {
}
/**
+ * Get the type of constraint this object represents
+ * @return constraint type
+ */
+ public ConstraintType getType() {
+ return ConstraintType.valueOf(getId());
+ }
+
+ /**
* Instantiate constraints from a pre-populated ZNRecord
* @param record ZNRecord containing all constraints
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 24ec7c9..16b3fa1 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -339,10 +339,13 @@ public class IdealState extends HelixProperty {
Map<String, String> instanceStateMap = getInstanceStateMap(partitionId.stringify());
ImmutableMap.Builder<ParticipantId, State> builder =
new ImmutableMap.Builder<ParticipantId, State>();
- for (String participantId : instanceStateMap.keySet()) {
- builder.put(Id.participant(participantId), State.from(instanceStateMap.get(participantId)));
+ if (instanceStateMap != null) {
+ for (String participantId : instanceStateMap.keySet()) {
+ builder.put(Id.participant(participantId), State.from(instanceStateMap.get(participantId)));
+ }
+ return builder.build();
}
- return builder.build();
+ return null;
}
/**
@@ -433,10 +436,13 @@ public class IdealState extends HelixProperty {
public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
ImmutableList.Builder<ParticipantId> builder = new ImmutableList.Builder<ParticipantId>();
List<String> preferenceStringList = getPreferenceList(partitionId.stringify());
- for (String participantName : preferenceStringList) {
- builder.add(Id.participant(participantName));
+ if (preferenceStringList != null) {
+ for (String participantName : preferenceStringList) {
+ builder.add(Id.participant(participantName));
+ }
+ return builder.build();
}
- return builder.build();
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
index 8577578..2b06c2b 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -84,6 +84,14 @@ public class ResourceAssignment extends HelixProperty {
}
/**
+ * Get the entire map of a resource
+ * @return map of partition to participant to state
+ */
+ public Map<PartitionId, Map<ParticipantId, State>> getResourceMap() {
+ return replicaMapsFromStringMaps(_record.getMapFields());
+ }
+
+ /**
* Get the participant, state pairs for a partition
* @param partition the Partition to look up
* @return map of (participant id, state)
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index 7ceee85..b371c6a 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -45,6 +45,12 @@ import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.ClusterAccessor;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.StateModelDefId;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
import org.apache.helix.controller.stages.AttributeName;
@@ -53,6 +59,10 @@ import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.NewBestPossibleStateOutput;
+import org.apache.helix.controller.stages.NewCurrentStateComputationStage;
+import org.apache.helix.controller.stages.NewResourceComputationStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -60,6 +70,8 @@ import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.store.PropertyJsonComparator;
@@ -156,7 +168,7 @@ public class ClusterStateVerifier {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
- return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates);
+ return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates, clusterName);
} catch (Exception e) {
LOG.error("exception in verification", e);
}
@@ -222,10 +234,11 @@ public class ClusterStateVerifier {
}
static boolean verifyBestPossAndExtView(HelixDataAccessor accessor,
- Map<String, Map<String, String>> errStates) {
+ Map<String, Map<String, String>> errStates, String clusterName) {
try {
Builder keyBuilder = accessor.keyBuilder();
// read cluster once and do verification
+ // TODO: stop using ClusterDataCache
ClusterDataCache cache = new ClusterDataCache();
cache.refresh(accessor);
@@ -250,10 +263,31 @@ public class ClusterStateVerifier {
}
}
+ Map<String, StateModelDefinition> stateModelDefs =
+ accessor.getChildValuesMap(keyBuilder.stateModelDefs());
+ Map<StateModelDefId, StateModelDefinition> convertedDefs =
+ new HashMap<StateModelDefId, StateModelDefinition>();
+ for (String defName : stateModelDefs.keySet()) {
+ convertedDefs.put(Id.stateModelDef(defName), stateModelDefs.get(defName));
+ }
+ ClusterAccessor clusterAccessor = new ClusterAccessor(Id.cluster(clusterName), accessor);
+ Cluster cluster = clusterAccessor.readCluster();
// calculate best possible state
- BestPossibleStateOutput bestPossOutput = ClusterStateVerifier.calcBestPossState(cache);
- Map<String, Map<Partition, Map<String, String>>> bestPossStateMap =
- bestPossOutput.getStateMap();
+ NewBestPossibleStateOutput bestPossOutput =
+ ClusterStateVerifier.calcBestPossState(cluster, convertedDefs);
+ Map<String, Map<String, Map<String, String>>> bestPossStateMap =
+ new HashMap<String, Map<String, Map<String, String>>>();
+ for (ResourceId resourceId : bestPossOutput.getAssignedResources()) {
+ ResourceAssignment resourceAssignment = bestPossOutput.getResourceAssignment(resourceId);
+ Map<String, Map<String, String>> resourceMap = new HashMap<String, Map<String, String>>();
+ for (PartitionId partitionId : resourceAssignment.getMappedPartitions()) {
+ Map<String, String> replicaMap =
+ ResourceAssignment.stringMapFromReplicaMap(resourceAssignment
+ .getReplicaMap(partitionId));
+ resourceMap.put(partitionId.stringify(), replicaMap);
+ }
+ bestPossStateMap.put(resourceId.stringify(), resourceMap);
+ }
// set error states
if (errStates != null) {
@@ -263,13 +297,12 @@ public class ClusterStateVerifier {
String instanceName = partErrStates.get(partitionName);
if (!bestPossStateMap.containsKey(resourceName)) {
- bestPossStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+ bestPossStateMap.put(resourceName, new HashMap<String, Map<String, String>>());
}
- Partition partition = new Partition(partitionName);
- if (!bestPossStateMap.get(resourceName).containsKey(partition)) {
- bestPossStateMap.get(resourceName).put(partition, new HashMap<String, String>());
+ if (!bestPossStateMap.get(resourceName).containsKey(partitionName)) {
+ bestPossStateMap.get(resourceName).put(partitionName, new HashMap<String, String>());
}
- bestPossStateMap.get(resourceName).get(partition)
+ bestPossStateMap.get(resourceName).get(partitionName)
.put(instanceName, HelixDefinedState.ERROR.toString());
}
}
@@ -285,11 +318,12 @@ public class ClusterStateVerifier {
}
// step 0: remove empty map and DROPPED state from best possible state
- Map<Partition, Map<String, String>> bpStateMap =
- bestPossOutput.getResourceMap(resourceName);
- Iterator<Entry<Partition, Map<String, String>>> iter = bpStateMap.entrySet().iterator();
+ Map<String, Map<String, String>> bpStateMap =
+ ResourceAssignment.stringMapsFromReplicaMaps(bestPossOutput.getResourceAssignment(
+ Id.resource(resourceName)).getResourceMap());
+ Iterator<Entry<String, Map<String, String>>> iter = bpStateMap.entrySet().iterator();
while (iter.hasNext()) {
- Map.Entry<Partition, Map<String, String>> entry = iter.next();
+ Map.Entry<String, Map<String, String>> entry = iter.next();
Map<String, String> instanceStateMap = entry.getValue();
if (instanceStateMap.isEmpty()) {
iter.remove();
@@ -310,7 +344,9 @@ public class ClusterStateVerifier {
// step 1: externalView and bestPossibleState has equal size
int extViewSize = extView.getRecord().getMapFields().size();
- int bestPossStateSize = bestPossOutput.getResourceMap(resourceName).size();
+ int bestPossStateSize =
+ bestPossOutput.getResourceAssignment(Id.resource(resourceName)).getMappedPartitions()
+ .size();
if (extViewSize != bestPossStateSize) {
LOG.info("exterView size (" + extViewSize + ") is different from bestPossState size ("
+ bestPossStateSize + ") for resource: " + resourceName);
@@ -328,7 +364,8 @@ public class ClusterStateVerifier {
for (String partition : extView.getRecord().getMapFields().keySet()) {
Map<String, String> evInstanceStateMap = extView.getRecord().getMapField(partition);
Map<String, String> bpInstanceStateMap =
- bestPossOutput.getInstanceStateMap(resourceName, new Partition(partition));
+ ResourceAssignment.stringMapFromReplicaMap(bestPossOutput.getResourceAssignment(
+ Id.resource(resourceName)).getReplicaMap(Id.partition(partition)));
boolean result =
ClusterStateVerifier.<String, String> compareMap(evInstanceStateMap,
@@ -404,24 +441,27 @@ public class ClusterStateVerifier {
/**
* calculate the best possible state note that DROPPED states are not checked since when
* kick off the BestPossibleStateCalcStage we are providing an empty current state map
+ * @param convertedDefs
* @param cache
* @return
* @throws Exception
*/
- static BestPossibleStateOutput calcBestPossState(ClusterDataCache cache) throws Exception {
+ static NewBestPossibleStateOutput calcBestPossState(Cluster cluster,
+ Map<StateModelDefId, StateModelDefinition> convertedDefs) throws Exception {
ClusterEvent event = new ClusterEvent("sampleEvent");
- event.addAttribute("ClusterDataCache", cache);
+ event.addAttribute("ClusterDataCache", cluster);
+ event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), convertedDefs);
- ResourceComputationStage rcState = new ResourceComputationStage();
- CurrentStateComputationStage csStage = new CurrentStateComputationStage();
- BestPossibleStateCalcStage bpStage = new BestPossibleStateCalcStage();
+ NewResourceComputationStage rcState = new NewResourceComputationStage();
+ NewCurrentStateComputationStage csStage = new NewCurrentStateComputationStage();
+ NewBestPossibleStateCalcStage bpStage = new NewBestPossibleStateCalcStage();
runStage(event, rcState);
runStage(event, csStage);
runStage(event, bpStage);
- BestPossibleStateOutput output =
+ NewBestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
// System.out.println("output:" + output);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index cc26596..ce2781f 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -52,6 +52,9 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+
public class TestNewStages extends ZkUnitTestBase {
final int n = 2;
final int p = 8;
@@ -115,7 +118,14 @@ public class TestNewStages extends ZkUnitTestBase {
Cluster cluster = clusterAccessor.readCluster();
ClusterEvent event = new ClusterEvent(testName);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), new NewCurrentStateOutput());
- event.addAttribute(AttributeName.RESOURCES.toString(), cluster.getResourceMap());
+ Map<ResourceId, ResourceConfig> resourceConfigMap =
+ Maps.transformValues(cluster.getResourceMap(), new Function<Resource, ResourceConfig>() {
+ @Override
+ public ResourceConfig apply(Resource resource) {
+ return resource.getConfig();
+ }
+ });
+ event.addAttribute(AttributeName.RESOURCES.toString(), resourceConfigMap);
event.addAttribute("ClusterDataCache", cluster);
Map<StateModelDefId, StateModelDefinition> stateModelMap =
new HashMap<StateModelDefId, StateModelDefinition>();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index 6dcf725..382f036 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -29,15 +29,19 @@ import java.util.UUID;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.Mocks;
-import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.StateModelDefId;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Resource;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.StateModelConfigGenerator;
import org.testng.annotations.AfterClass;
@@ -107,11 +111,16 @@ public class BaseStageTest {
protected void setupLiveInstances(int numLiveInstances) {
// setup liveInstances
for (int i = 0; i < numLiveInstances; i++) {
- LiveInstance liveInstance = new LiveInstance("localhost_" + i);
+ String instanceName = "localhost_" + i;
+ InstanceConfig instanceConfig = new InstanceConfig(Id.participant(instanceName));
+ instanceConfig.setHostName("localhost");
+ instanceConfig.setPort(Integer.toString(i));
+ LiveInstance liveInstance = new LiveInstance(instanceName);
liveInstance.setSessionId("session_" + i);
Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.liveInstance("localhost_" + i), liveInstance);
+ accessor.setProperty(keyBuilder.instanceConfig(instanceName), instanceConfig);
+ accessor.setProperty(keyBuilder.liveInstance(instanceName), liveInstance);
}
}
@@ -128,32 +137,38 @@ public class BaseStageTest {
stage.postProcess();
}
- protected void setupStateModel() {
- ZNRecord masterSlave = new StateModelConfigGenerator().generateConfigForMasterSlave();
-
+ protected Map<StateModelDefId, StateModelDefinition> setupStateModel() {
Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), new StateModelDefinition(
- masterSlave));
+ Map<StateModelDefId, StateModelDefinition> defs =
+ new HashMap<StateModelDefId, StateModelDefinition>();
+
+ ZNRecord masterSlave = StateModelConfigGenerator.generateConfigForMasterSlave();
+ StateModelDefinition masterSlaveDef = new StateModelDefinition(masterSlave);
+ defs.put(Id.stateModelDef(masterSlaveDef.getId()), masterSlaveDef);
+ accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), masterSlaveDef);
+
+ ZNRecord leaderStandby = StateModelConfigGenerator.generateConfigForLeaderStandby();
+ StateModelDefinition leaderStandbyDef = new StateModelDefinition(leaderStandby);
+ defs.put(Id.stateModelDef(leaderStandbyDef.getId()), leaderStandbyDef);
+ accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), leaderStandbyDef);
- ZNRecord leaderStandby = new StateModelConfigGenerator().generateConfigForLeaderStandby();
- accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), new StateModelDefinition(
- leaderStandby));
+ ZNRecord onlineOffline = StateModelConfigGenerator.generateConfigForOnlineOffline();
+ StateModelDefinition onlineOfflineDef = new StateModelDefinition(onlineOffline);
+ defs.put(Id.stateModelDef(onlineOfflineDef.getId()), onlineOfflineDef);
+ accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), onlineOfflineDef);
- ZNRecord onlineOffline = new StateModelConfigGenerator().generateConfigForOnlineOffline();
- accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), new StateModelDefinition(
- onlineOffline));
+ return defs;
}
- protected Map<String, Resource> getResourceMap() {
- Map<String, Resource> resourceMap = new HashMap<String, Resource>();
- Resource testResource = new Resource("testResourceName");
- testResource.setStateModelDefRef("MasterSlave");
- testResource.addPartition("testResourceName_0");
- testResource.addPartition("testResourceName_1");
- testResource.addPartition("testResourceName_2");
- testResource.addPartition("testResourceName_3");
- testResource.addPartition("testResourceName_4");
- resourceMap.put("testResourceName", testResource);
+ protected Map<ResourceId, ResourceConfig> getResourceMap() {
+ Map<ResourceId, ResourceConfig> resourceMap = new HashMap<ResourceId, ResourceConfig>();
+ ResourceConfig.Builder builder = new ResourceConfig.Builder(Id.resource("testResourceName"));
+ builder.addPartition(new Partition(Id.partition("testResourceName_0")));
+ builder.addPartition(new Partition(Id.partition("testResourceName_1")));
+ builder.addPartition(new Partition(Id.partition("testResourceName_2")));
+ builder.addPartition(new Partition(Id.partition("testResourceName_3")));
+ builder.addPartition(new Partition(Id.partition("testResourceName_4")));
+ resourceMap.put(Id.resource("testResourceName"), builder.build());
return resourceMap;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
index 2453bd8..82b70b1 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
@@ -24,12 +24,17 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
-import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.IdealStateModeProperty;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
@@ -41,68 +46,78 @@ import org.testng.annotations.Test;
public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
@Test
public void testSemiAutoModeCompatibility() {
- System.out.println("START TestBestPossibleStateCalcStage at "
- + new Date(System.currentTimeMillis()));
+ System.out
+ .println("START TestBestPossibleStateCalcStageCompatibility_testSemiAutoModeCompatibility at "
+ + new Date(System.currentTimeMillis()));
String[] resources = new String[] {
"testResourceName"
};
setupIdealStateDeprecated(5, resources, 10, 1, IdealStateModeProperty.AUTO);
setupLiveInstances(5);
- setupStateModel();
+ Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
- Map<String, Resource> resourceMap = getResourceMap();
- CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+ Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
+ NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+ event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefs);
- ReadClusterDataStage stage1 = new ReadClusterDataStage();
+ NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
runStage(event, stage1);
- BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
+ NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
runStage(event, stage2);
- BestPossibleStateOutput output =
+ NewBestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
for (int p = 0; p < 5; p++) {
- Partition resource = new Partition("testResourceName_" + p);
- AssertJUnit.assertEquals("MASTER", output.getInstanceStateMap("testResourceName", resource)
- .get("localhost_" + (p + 1) % 5));
+ Map<ParticipantId, State> replicaMap =
+ output.getResourceAssignment(Id.resource("testResourceName")).getReplicaMap(
+ Id.partition("testResourceName_" + p));
+ AssertJUnit.assertEquals(State.from("MASTER"),
+ replicaMap.get(Id.participant("localhost_" + (p + 1) % 5)));
}
- System.out.println("END TestBestPossibleStateCalcStage at "
- + new Date(System.currentTimeMillis()));
+ System.out
+ .println("END TestBestPossibleStateCalcStageCompatibility_testSemiAutoModeCompatibility at "
+ + new Date(System.currentTimeMillis()));
}
@Test
public void testCustomModeCompatibility() {
- System.out.println("START TestBestPossibleStateCalcStage at "
- + new Date(System.currentTimeMillis()));
+ System.out
+ .println("START TestBestPossibleStateCalcStageCompatibility_testCustomModeCompatibility at "
+ + new Date(System.currentTimeMillis()));
String[] resources = new String[] {
"testResourceName"
};
setupIdealStateDeprecated(5, resources, 10, 1, IdealStateModeProperty.CUSTOMIZED);
setupLiveInstances(5);
- setupStateModel();
+ Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
- Map<String, Resource> resourceMap = getResourceMap();
- CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+ Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
+ NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+ event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefs);
- ReadClusterDataStage stage1 = new ReadClusterDataStage();
+ NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
runStage(event, stage1);
- BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
+ NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
runStage(event, stage2);
- BestPossibleStateOutput output =
+ NewBestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
for (int p = 0; p < 5; p++) {
- Partition resource = new Partition("testResourceName_" + p);
- AssertJUnit.assertNull(output.getInstanceStateMap("testResourceName", resource).get(
- "localhost_" + (p + 1) % 5));
+ Map<ParticipantId, State> replicaMap =
+ output.getResourceAssignment(Id.resource("testResourceName")).getReplicaMap(
+ Id.partition("testResourceName_" + p));
+ AssertJUnit.assertEquals(State.from("MASTER"),
+ replicaMap.get(Id.participant("localhost_" + (p + 1) % 5)));
}
- System.out.println("END TestBestPossibleStateCalcStage at "
- + new Date(System.currentTimeMillis()));
+ System.out
+ .println("END TestBestPossibleStateCalcStageCompatibility_testCustomModeCompatibility at "
+ + new Date(System.currentTimeMillis()));
}
protected List<IdealState> setupIdealStateDeprecated(int nodes, String[] resources,
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
index 82c7b37..1a76615 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
@@ -22,14 +22,14 @@ package org.apache.helix.controller.stages;
import java.util.Date;
import java.util.Map;
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
@@ -45,24 +45,27 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest {
};
setupIdealState(5, resources, 10, 1, RebalanceMode.SEMI_AUTO);
setupLiveInstances(5);
- setupStateModel();
+ Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
- Map<String, Resource> resourceMap = getResourceMap();
- CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+ Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
+ NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+ event.addAttribute(AttributeName.STATE_MODEL_DEFINITIONS.toString(), stateModelDefs);
- ReadClusterDataStage stage1 = new ReadClusterDataStage();
+ NewReadClusterDataStage stage1 = new NewReadClusterDataStage();
runStage(event, stage1);
- BestPossibleStateCalcStage stage2 = new BestPossibleStateCalcStage();
+ NewBestPossibleStateCalcStage stage2 = new NewBestPossibleStateCalcStage();
runStage(event, stage2);
- BestPossibleStateOutput output =
+ NewBestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
for (int p = 0; p < 5; p++) {
- Partition resource = new Partition("testResourceName_" + p);
- AssertJUnit.assertEquals("MASTER", output.getInstanceStateMap("testResourceName", resource)
- .get("localhost_" + (p + 1) % 5));
+ Map<ParticipantId, State> replicaMap =
+ output.getResourceAssignment(Id.resource("testResourceName")).getReplicaMap(
+ Id.partition("testResourceName_" + p));
+ AssertJUnit.assertEquals(State.from("MASTER"),
+ replicaMap.get(Id.participant("localhost_" + (p + 1) % 5)));
}
System.out.println("END TestBestPossibleStateCalcStage at "
+ new Date(System.currentTimeMillis()));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
index bce7c2d..47875fc 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCompatibilityCheckStage.java
@@ -28,6 +28,7 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.controller.pipeline.StageContext;
import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
import org.testng.Assert;
@@ -64,6 +65,8 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
LiveInstance liveInstance = new LiveInstance(record);
liveInstance.setSessionId("session_0");
accessor.setProperty(keyBuilder.liveInstance("localhost_0"), liveInstance);
+ InstanceConfig config = new InstanceConfig(liveInstance.getInstanceName());
+ accessor.setProperty(keyBuilder.instanceConfig(config.getInstanceName()), config);
if (controllerVersion != null) {
((Mocks.MockManager) manager).setVersion(controllerVersion);
@@ -74,13 +77,13 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
.put("minimum_supported_version.participant", minSupportedParticipantVersion);
}
event.addAttribute("helixmanager", manager);
- runStage(event, new ReadClusterDataStage());
+ runStage(event, new NewReadClusterDataStage());
}
@Test
public void testCompatible() {
prepare("0.4.0", "0.4.0");
- CompatibilityCheckStage stage = new CompatibilityCheckStage();
+ NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
StageContext context = new StageContext();
stage.init(context);
stage.preProcess();
@@ -95,7 +98,7 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
@Test
public void testNullParticipantVersion() {
prepare("0.4.0", null);
- CompatibilityCheckStage stage = new CompatibilityCheckStage();
+ NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
StageContext context = new StageContext();
stage.init(context);
stage.preProcess();
@@ -111,7 +114,7 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
@Test
public void testNullControllerVersion() {
prepare(null, "0.4.0");
- CompatibilityCheckStage stage = new CompatibilityCheckStage();
+ NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
StageContext context = new StageContext();
stage.init(context);
stage.preProcess();
@@ -127,7 +130,7 @@ public class TestCompatibilityCheckStage extends BaseStageTest {
@Test
public void testIncompatible() {
prepare("0.6.1-incubating-SNAPSHOT", "0.3.4", "0.4");
- CompatibilityCheckStage stage = new CompatibilityCheckStage();
+ NewCompatibilityCheckStage stage = new NewCompatibilityCheckStage();
StageContext context = new StageContext();
stage.init(context);
stage.preProcess();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index ecad444..3f567ae 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -24,11 +24,11 @@ import java.util.Map;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.Id;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
@@ -36,32 +36,32 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
@Test
public void testEmptyCS() {
- Map<String, Resource> resourceMap = getResourceMap();
+ Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
- CurrentStateComputationStage stage = new CurrentStateComputationStage();
- runStage(event, new ReadClusterDataStage());
+ NewCurrentStateComputationStage stage = new NewCurrentStateComputationStage();
+ runStage(event, new NewReadClusterDataStage());
runStage(event, stage);
- CurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ NewCurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.toString());
AssertJUnit.assertEquals(
- output.getCurrentStateMap("testResourceName", new Partition("testResourceName_0")).size(),
- 0);
+ output.getCurrentStateMap(Id.resource("testResourceName"),
+ Id.partition("testResourceName_0")).size(), 0);
}
@Test
public void testSimpleCS() {
// setup resource
- Map<String, Resource> resourceMap = getResourceMap();
+ Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
setupLiveInstances(5);
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
- CurrentStateComputationStage stage = new CurrentStateComputationStage();
- runStage(event, new ReadClusterDataStage());
+ NewCurrentStateComputationStage stage = new NewCurrentStateComputationStage();
+ runStage(event, new NewReadClusterDataStage());
runStage(event, stage);
- CurrentStateOutput output1 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ NewCurrentStateOutput output1 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
AssertJUnit.assertEquals(
- output1.getCurrentStateMap("testResourceName", new Partition("testResourceName_0")).size(),
- 0);
+ output1.getCurrentStateMap(Id.resource("testResourceName"),
+ Id.partition("testResourceName_0")).size(), 0);
// Add a state transition messages
Message message = new Message(Message.MessageType.STATE_TRANSITION, Id.message("msg1"));
@@ -75,13 +75,13 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.message("localhost_" + 3, message.getId()), message);
- runStage(event, new ReadClusterDataStage());
+ runStage(event, new NewReadClusterDataStage());
runStage(event, stage);
- CurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
- String pendingState =
- output2.getPendingState("testResourceName", new Partition("testResourceName_1"),
- "localhost_3");
- AssertJUnit.assertEquals(pendingState, "SLAVE");
+ NewCurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ State pendingState =
+ output2.getPendingState(Id.resource("testResourceName"),
+ Id.partition("testResourceName_1"), Id.participant("localhost_3"));
+ AssertJUnit.assertEquals(pendingState, State.from("SLAVE"));
ZNRecord record1 = new ZNRecord("testResourceName");
// Add a current state that matches sessionId and one that does not match
@@ -100,13 +100,13 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
accessor.setProperty(
keyBuilder.currentState("localhost_3", "session_dead", "testResourceName"),
stateWithDeadSession);
- runStage(event, new ReadClusterDataStage());
+ runStage(event, new NewReadClusterDataStage());
runStage(event, stage);
- CurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
- String currentState =
- output3.getCurrentState("testResourceName", new Partition("testResourceName_1"),
- "localhost_3");
- AssertJUnit.assertEquals(currentState, "OFFLINE");
+ NewCurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ State currentState =
+ output3.getCurrentState(Id.resource("testResourceName"),
+ Id.partition("testResourceName_1"), Id.participant("localhost_3"));
+ AssertJUnit.assertEquals(currentState, State.from("OFFLINE"));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index 26bbc20..bcd8f4a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -41,7 +41,6 @@ import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -56,7 +55,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
HelixManager manager = new DummyClusterManager(clusterName, accessor);
// ideal state: node0 is MASTER, node1 is SLAVE
@@ -74,7 +73,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
ClusterEvent event = new ClusterEvent("testEvent");
event.addAttribute("helixmanager", manager);
- MessageThrottleStage throttleStage = new MessageThrottleStage();
+ NewMessageThrottleStage throttleStage = new NewMessageThrottleStage();
try {
runStage(event, throttleStage);
Assert.fail("Should throw exception since DATA_CACHE is null");
@@ -83,7 +82,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
}
Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new ReadClusterDataStage());
+ dataRefresh.addStage(new NewReadClusterDataStage());
runPipeline(event, dataRefresh);
try {
@@ -92,7 +91,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
} catch (Exception e) {
// OK
}
- runStage(event, new ResourceComputationStage());
+ runStage(event, new NewResourceComputationStage());
try {
runStage(event, throttleStage);
@@ -100,22 +99,22 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
} catch (Exception e) {
// OK
}
- MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput();
+ NewMessageOutput msgSelectOutput = new NewMessageOutput();
List<Message> selectMessages = new ArrayList<Message>();
Message msg =
createMessage(MessageType.STATE_TRANSITION, Id.message("msgId-001"), "OFFLINE", "SLAVE",
"TestDB", "localhost_0");
selectMessages.add(msg);
- msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
+ msgSelectOutput.setMessages(Id.resource("TestDB"), Id.partition("TestDB_0"), selectMessages);
event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
runStage(event, throttleStage);
- MessageThrottleStageOutput msgThrottleOutput =
+ NewMessageOutput msgThrottleOutput =
event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
- Assert.assertEquals(msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0")).size(),
- 1);
+ Assert.assertEquals(
+ msgThrottleOutput.getMessages(Id.resource("TestDB"), Id.partition("TestDB_0")).size(), 1);
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
@@ -127,7 +126,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
HelixManager manager = new DummyClusterManager(clusterName, accessor);
// ideal state: node0 is MASTER, node1 is SLAVE
@@ -212,7 +211,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
ClusterConstraints constraint =
accessor.getProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()));
- MessageThrottleStage throttleStage = new MessageThrottleStage();
+ NewMessageThrottleStage throttleStage = new NewMessageThrottleStage();
// test constraintSelection
// message1: hit contraintSelection rule1 and rule2
@@ -262,10 +261,10 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
event.addAttribute("helixmanager", manager);
Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new ReadClusterDataStage());
+ dataRefresh.addStage(new NewReadClusterDataStage());
runPipeline(event, dataRefresh);
- runStage(event, new ResourceComputationStage());
- MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput();
+ runStage(event, new NewResourceComputationStage());
+ NewMessageOutput msgSelectOutput = new NewMessageOutput();
Message msg3 =
createMessage(MessageType.STATE_TRANSITION, Id.message("msgId-003"), "OFFLINE", "SLAVE",
@@ -291,15 +290,15 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
selectMessages.add(msg5); // should be throttled
selectMessages.add(msg6); // should be throttled
- msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
+ msgSelectOutput.setMessages(Id.resource("TestDB"), Id.partition("TestDB_0"), selectMessages);
event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
runStage(event, throttleStage);
- MessageThrottleStageOutput msgThrottleOutput =
+ NewMessageOutput msgThrottleOutput =
event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
List<Message> throttleMessages =
- msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0"));
+ msgThrottleOutput.getMessages(Id.resource("TestDB"), Id.partition("TestDB_0"));
Assert.assertEquals(throttleMessages.size(), 4);
Assert.assertTrue(throttleMessages.contains(msg1));
Assert.assertTrue(throttleMessages.contains(msg2));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
index 97d5ec1..825aa05 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
@@ -20,15 +20,25 @@ package org.apache.helix.controller.stages;
*/
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.helix.TestHelper;
+import org.apache.helix.api.HelixVersion;
import org.apache.helix.api.Id;
-import org.apache.helix.controller.stages.MessageSelectionStage.Bounds;
-import org.apache.helix.model.LiveInstance;
+import org.apache.helix.api.MessageId;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.RunningInstance;
+import org.apache.helix.api.State;
+import org.apache.helix.controller.stages.NewMessageSelectionStage.Bounds;
+import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -38,15 +48,27 @@ public class TestMsgSelectionStage {
public void testMasterXfer() {
System.out.println("START testMasterXfer at " + new Date(System.currentTimeMillis()));
- Map<String, LiveInstance> liveInstances = new HashMap<String, LiveInstance>();
- liveInstances.put("localhost_0", new LiveInstance("localhost_0"));
- liveInstances.put("localhost_1", new LiveInstance("localhost_1"));
-
- Map<String, String> currentStates = new HashMap<String, String>();
- currentStates.put("localhost_0", "SLAVE");
- currentStates.put("localhost_1", "MASTER");
-
- Map<String, String> pendingStates = new HashMap<String, String>();
+ Map<ParticipantId, Participant> liveInstances = new HashMap<ParticipantId, Participant>();
+ Set<PartitionId> disabledPartitions = Collections.emptySet();
+ Set<String> tags = Collections.emptySet();
+ Map<ResourceId, CurrentState> currentStateMap = Collections.emptyMap();
+ Map<MessageId, Message> messageMap = Collections.emptyMap();
+ RunningInstance runningInstance0 =
+ new RunningInstance(Id.session("session_0"), HelixVersion.from("1.2.3.4"), Id.process("0"));
+ RunningInstance runningInstance1 =
+ new RunningInstance(Id.session("session_1"), HelixVersion.from("1.2.3.4"), Id.process("1"));
+ liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_0"),
+ "localhost", 0, true, disabledPartitions, tags, runningInstance0, currentStateMap,
+ messageMap));
+ liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_1"),
+ "localhost", 1, true, disabledPartitions, tags, runningInstance1, currentStateMap,
+ messageMap));
+
+ Map<ParticipantId, State> currentStates = new HashMap<ParticipantId, State>();
+ currentStates.put(Id.participant("localhost_0"), State.from("SLAVE"));
+ currentStates.put(Id.participant("localhost_1"), State.from("MASTER"));
+
+ Map<ParticipantId, State> pendingStates = new HashMap<ParticipantId, State>();
List<Message> messages = new ArrayList<Message>();
messages.add(TestHelper.createMessage(Id.message("msgId_0"), "SLAVE", "MASTER", "localhost_0",
@@ -54,17 +76,17 @@ public class TestMsgSelectionStage {
messages.add(TestHelper.createMessage(Id.message("msgId_1"), "MASTER", "SLAVE", "localhost_1",
"TestDB", "TestDB_0"));
- Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
- stateConstraints.put("MASTER", new Bounds(0, 1));
- stateConstraints.put("SLAVE", new Bounds(0, 2));
+ Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
+ stateConstraints.put(State.from("MASTER"), new Bounds(0, 1));
+ stateConstraints.put(State.from("SLAVE"), new Bounds(0, 2));
Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
stateTransitionPriorities.put("MASTER-SLAVE", 0);
stateTransitionPriorities.put("SLAVE-MASTER", 1);
List<Message> selectedMsg =
- new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates,
- messages, stateConstraints, stateTransitionPriorities, "OFFLINE");
+ new NewMessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates,
+ messages, stateConstraints, stateTransitionPriorities, State.from("OFFLINE"));
Assert.assertEquals(selectedMsg.size(), 1);
Assert.assertEquals(selectedMsg.get(0).getMsgId(), Id.message("msgId_1"));
@@ -76,32 +98,44 @@ public class TestMsgSelectionStage {
System.out.println("START testMasterXferAfterMasterResume at "
+ new Date(System.currentTimeMillis()));
- Map<String, LiveInstance> liveInstances = new HashMap<String, LiveInstance>();
- liveInstances.put("localhost_0", new LiveInstance("localhost_0"));
- liveInstances.put("localhost_1", new LiveInstance("localhost_1"));
-
- Map<String, String> currentStates = new HashMap<String, String>();
- currentStates.put("localhost_0", "SLAVE");
- currentStates.put("localhost_1", "SLAVE");
-
- Map<String, String> pendingStates = new HashMap<String, String>();
- pendingStates.put("localhost_1", "MASTER");
+ Map<ParticipantId, Participant> liveInstances = new HashMap<ParticipantId, Participant>();
+ Set<PartitionId> disabledPartitions = Collections.emptySet();
+ Set<String> tags = Collections.emptySet();
+ Map<ResourceId, CurrentState> currentStateMap = Collections.emptyMap();
+ Map<MessageId, Message> messageMap = Collections.emptyMap();
+ RunningInstance runningInstance0 =
+ new RunningInstance(Id.session("session_0"), HelixVersion.from("1.2.3.4"), Id.process("0"));
+ RunningInstance runningInstance1 =
+ new RunningInstance(Id.session("session_1"), HelixVersion.from("1.2.3.4"), Id.process("1"));
+ liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_0"),
+ "localhost", 0, true, disabledPartitions, tags, runningInstance0, currentStateMap,
+ messageMap));
+ liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_1"),
+ "localhost", 1, true, disabledPartitions, tags, runningInstance1, currentStateMap,
+ messageMap));
+
+ Map<ParticipantId, State> currentStates = new HashMap<ParticipantId, State>();
+ currentStates.put(Id.participant("localhost_0"), State.from("SLAVE"));
+ currentStates.put(Id.participant("localhost_1"), State.from("SLAVE"));
+
+ Map<ParticipantId, State> pendingStates = new HashMap<ParticipantId, State>();
+ pendingStates.put(Id.participant("localhost_1"), State.from("MASTER"));
List<Message> messages = new ArrayList<Message>();
messages.add(TestHelper.createMessage(Id.message("msgId_0"), "SLAVE", "MASTER", "localhost_0",
"TestDB", "TestDB_0"));
- Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
- stateConstraints.put("MASTER", new Bounds(0, 1));
- stateConstraints.put("SLAVE", new Bounds(0, 2));
+ Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
+ stateConstraints.put(State.from("MASTER"), new Bounds(0, 1));
+ stateConstraints.put(State.from("SLAVE"), new Bounds(0, 2));
Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
stateTransitionPriorities.put("MASTER-SLAVE", 0);
stateTransitionPriorities.put("SLAVE-MASTER", 1);
List<Message> selectedMsg =
- new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates,
- messages, stateConstraints, stateTransitionPriorities, "OFFLINE");
+ new NewMessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates,
+ messages, stateConstraints, stateTransitionPriorities, State.from("OFFLINE"));
Assert.assertEquals(selectedMsg.size(), 0);
System.out.println("END testMasterXferAfterMasterResume at "
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 7cd942e..a3f38ea 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -39,7 +39,6 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.Attributes;
-import org.apache.helix.model.Partition;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -76,17 +75,17 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
// cluster data cache refresh pipeline
Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new ReadClusterDataStage());
+ dataRefresh.addStage(new NewReadClusterDataStage());
// rebalance pipeline
Pipeline rebalancePipeline = new Pipeline();
- rebalancePipeline.addStage(new ResourceComputationStage());
- rebalancePipeline.addStage(new CurrentStateComputationStage());
- rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new MessageGenerationPhase());
- rebalancePipeline.addStage(new MessageSelectionStage());
- rebalancePipeline.addStage(new MessageThrottleStage());
- rebalancePipeline.addStage(new TaskAssignmentStage());
+ rebalancePipeline.addStage(new NewResourceComputationStage());
+ rebalancePipeline.addStage(new NewCurrentStateComputationStage());
+ rebalancePipeline.addStage(new NewBestPossibleStateCalcStage());
+ rebalancePipeline.addStage(new NewMessageGenerationStage());
+ rebalancePipeline.addStage(new NewMessageSelectionStage());
+ rebalancePipeline.addStage(new NewMessageThrottleStage());
+ rebalancePipeline.addStage(new NewTaskAssignmentStage());
// round1: set node0 currentState to OFFLINE and node1 currentState to OFFLINE
setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
@@ -96,10 +95,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
- MessageSelectionStageOutput msgSelOutput =
- event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ NewMessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
List<Message> messages =
- msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 1, "Should output 1 message: OFFLINE-SLAVE for node0");
Message message = messages.get(0);
Assert.assertEquals(message.getFromState().toString(), "OFFLINE");
@@ -114,7 +112,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ messages =
+ msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node1");
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
@@ -220,17 +219,17 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
// cluster data cache refresh pipeline
Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new ReadClusterDataStage());
+ dataRefresh.addStage(new NewReadClusterDataStage());
// rebalance pipeline
Pipeline rebalancePipeline = new Pipeline();
- rebalancePipeline.addStage(new ResourceComputationStage());
- rebalancePipeline.addStage(new CurrentStateComputationStage());
- rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new MessageGenerationPhase());
- rebalancePipeline.addStage(new MessageSelectionStage());
- rebalancePipeline.addStage(new MessageThrottleStage());
- rebalancePipeline.addStage(new TaskAssignmentStage());
+ rebalancePipeline.addStage(new NewResourceComputationStage());
+ rebalancePipeline.addStage(new NewCurrentStateComputationStage());
+ rebalancePipeline.addStage(new NewBestPossibleStateCalcStage());
+ rebalancePipeline.addStage(new NewMessageGenerationStage());
+ rebalancePipeline.addStage(new NewMessageSelectionStage());
+ rebalancePipeline.addStage(new NewMessageThrottleStage());
+ rebalancePipeline.addStage(new NewTaskAssignmentStage());
// round1: set node0 currentState to OFFLINE and node1 currentState to SLAVE
setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
@@ -240,10 +239,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
- MessageSelectionStageOutput msgSelOutput =
- event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ NewMessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
List<Message> messages =
- msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 1, "Should output 1 message: OFFLINE-SLAVE for node0");
Message message = messages.get(0);
Assert.assertEquals(message.getFromState().toString(), "OFFLINE");
@@ -258,7 +256,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ messages =
+ msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 1,
"Should output only 1 message: OFFLINE->DROPPED for localhost_1");
@@ -275,7 +274,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ messages =
+ msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 1,
"Should output 1 message: OFFLINE->DROPPED for localhost_0");
message = messages.get(0);
@@ -315,17 +315,17 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
// cluster data cache refresh pipeline
Pipeline dataRefresh = new Pipeline();
- dataRefresh.addStage(new ReadClusterDataStage());
+ dataRefresh.addStage(new NewReadClusterDataStage());
// rebalance pipeline
Pipeline rebalancePipeline = new Pipeline();
- rebalancePipeline.addStage(new ResourceComputationStage());
- rebalancePipeline.addStage(new CurrentStateComputationStage());
- rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new MessageGenerationPhase());
- rebalancePipeline.addStage(new MessageSelectionStage());
- rebalancePipeline.addStage(new MessageThrottleStage());
- rebalancePipeline.addStage(new TaskAssignmentStage());
+ rebalancePipeline.addStage(new NewResourceComputationStage());
+ rebalancePipeline.addStage(new NewCurrentStateComputationStage());
+ rebalancePipeline.addStage(new NewBestPossibleStateCalcStage());
+ rebalancePipeline.addStage(new NewMessageGenerationStage());
+ rebalancePipeline.addStage(new NewMessageSelectionStage());
+ rebalancePipeline.addStage(new NewMessageThrottleStage());
+ rebalancePipeline.addStage(new NewTaskAssignmentStage());
// round1: set node1 currentState to SLAVE
setCurrentState(clusterName, "localhost_1", resourceName, resourceName + "_0", "session_1",
@@ -333,10 +333,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
- MessageSelectionStageOutput msgSelOutput =
- event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ NewMessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
List<Message> messages =
- msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 1, "Should output 1 message: SLAVE-MASTER for node1");
Message message = messages.get(0);
Assert.assertEquals(message.getFromState().toString(), "SLAVE");
@@ -354,7 +353,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
- messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
+ messages =
+ msgSelOutput.getMessages(Id.resource(resourceName), Id.partition(resourceName + "_0"));
Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node0");
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
index 86bd060..d4f3de6 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
@@ -28,6 +28,8 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.Id;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
import org.apache.helix.controller.pipeline.StageContext;
import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
@@ -35,7 +37,6 @@ import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Resource;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
@@ -63,19 +64,21 @@ public class TestResourceComputationStage extends BaseStageTest {
HelixDataAccessor accessor = manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.idealState(resourceName), idealState);
- ResourceComputationStage stage = new ResourceComputationStage();
- runStage(event, new ReadClusterDataStage());
+ NewResourceComputationStage stage = new NewResourceComputationStage();
+ runStage(event, new NewReadClusterDataStage());
runStage(event, stage);
- Map<String, Resource> resource = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resource =
+ event.getAttribute(AttributeName.RESOURCES.toString());
AssertJUnit.assertEquals(1, resource.size());
- AssertJUnit.assertEquals(resource.keySet().iterator().next(), resourceName);
- AssertJUnit.assertEquals(resource.values().iterator().next().getResourceName(), resourceName);
- AssertJUnit.assertEquals(resource.values().iterator().next().getStateModelDefRef(),
- idealState.getStateModelDefRef());
+ AssertJUnit.assertEquals(resource.keySet().iterator().next(), Id.resource(resourceName));
AssertJUnit
- .assertEquals(resource.values().iterator().next().getPartitions().size(), partitions);
+ .assertEquals(resource.values().iterator().next().getId(), Id.resource(resourceName));
+ AssertJUnit.assertEquals(resource.values().iterator().next().getRebalancerConfig()
+ .getStateModelDefId(), idealState.getStateModelDefId());
+ AssertJUnit.assertEquals(resource.values().iterator().next().getPartitionSet().size(),
+ partitions);
}
@Test
@@ -85,21 +88,23 @@ public class TestResourceComputationStage extends BaseStageTest {
"testResource1", "testResource2"
};
List<IdealState> idealStates = setupIdealState(5, resources, 10, 1, RebalanceMode.SEMI_AUTO);
- ResourceComputationStage stage = new ResourceComputationStage();
- runStage(event, new ReadClusterDataStage());
+ NewResourceComputationStage stage = new NewResourceComputationStage();
+ runStage(event, new NewReadClusterDataStage());
runStage(event, stage);
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
AssertJUnit.assertEquals(resources.length, resourceMap.size());
for (int i = 0; i < resources.length; i++) {
String resourceName = resources[i];
+ ResourceId resourceId = Id.resource(resourceName);
IdealState idealState = idealStates.get(i);
AssertJUnit.assertTrue(resourceMap.containsKey(resourceName));
- AssertJUnit.assertEquals(resourceMap.get(resourceName).getResourceName(), resourceName);
- AssertJUnit.assertEquals(resourceMap.get(resourceName).getStateModelDefRef(),
- idealState.getStateModelDefRef());
- AssertJUnit.assertEquals(resourceMap.get(resourceName).getPartitions().size(),
+ AssertJUnit.assertEquals(resourceMap.get(resourceId).getId(), resourceId);
+ AssertJUnit.assertEquals(resourceMap.get(resourceId).getRebalancerConfig()
+ .getStateModelDefId(), idealState.getStateModelDefRef());
+ AssertJUnit.assertEquals(resourceMap.get(resourceId).getPartitionSet().size(),
idealState.getNumPartitions());
}
}
@@ -151,41 +156,47 @@ public class TestResourceComputationStage extends BaseStageTest {
accessor.setProperty(keyBuilder.currentState(instanceName, sessionId, oldResource),
currentState);
- ResourceComputationStage stage = new ResourceComputationStage();
- runStage(event, new ReadClusterDataStage());
+ NewResourceComputationStage stage = new NewResourceComputationStage();
+ runStage(event, new NewReadClusterDataStage());
runStage(event, stage);
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Map<ResourceId, ResourceConfig> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES.toString());
// +1 because it will have one for current state
AssertJUnit.assertEquals(resources.length + 1, resourceMap.size());
for (int i = 0; i < resources.length; i++) {
String resourceName = resources[i];
+ ResourceId resourceId = Id.resource(resourceName);
IdealState idealState = idealStates.get(i);
- AssertJUnit.assertTrue(resourceMap.containsKey(resourceName));
- AssertJUnit.assertEquals(resourceMap.get(resourceName).getResourceName(), resourceName);
- AssertJUnit.assertEquals(resourceMap.get(resourceName).getStateModelDefRef(),
- idealState.getStateModelDefRef());
- AssertJUnit.assertEquals(resourceMap.get(resourceName).getPartitions().size(),
+ AssertJUnit.assertTrue(resourceMap.containsKey(resourceId));
+ AssertJUnit.assertEquals(resourceMap.get(resourceId).getId(), resourceId);
+ AssertJUnit.assertEquals(resourceMap.get(resourceId).getRebalancerConfig()
+ .getStateModelDefId(), idealState.getStateModelDefId());
+ AssertJUnit.assertEquals(resourceMap.get(resourceId).getPartitionSet().size(),
idealState.getNumPartitions());
}
// Test the data derived from CurrentState
- AssertJUnit.assertTrue(resourceMap.containsKey(oldResource));
- AssertJUnit.assertEquals(resourceMap.get(oldResource).getResourceName(), oldResource);
- AssertJUnit.assertEquals(resourceMap.get(oldResource).getStateModelDefRef(),
- currentState.getStateModelDefRef());
- AssertJUnit.assertEquals(resourceMap.get(oldResource).getPartitions().size(), currentState
- .getPartitionStateStringMap().size());
- AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_0"));
- AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_1"));
- AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_2"));
+ ResourceId oldResourceId = Id.resource(oldResource);
+ AssertJUnit.assertTrue(resourceMap.containsKey(oldResourceId));
+ AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getId(), oldResourceId);
+ AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getRebalancerConfig()
+ .getStateModelDefId(), currentState.getStateModelDefId());
+ AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getPartitionSet().size(), currentState
+ .getPartitionStateMap().size());
+ AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getPartition(
+ Id.partition("testResourceOld_0")));
+ AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getPartition(
+ Id.partition("testResourceOld_1")));
+ AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getPartition(
+ Id.partition("testResourceOld_2")));
}
@Test
public void testNull() {
ClusterEvent event = new ClusterEvent("sampleEvent");
- ResourceComputationStage stage = new ResourceComputationStage();
+ NewResourceComputationStage stage = new NewResourceComputationStage();
StageContext context = new StageContext();
stage.init(context);
stage.preProcess();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5972a44e/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
index 747a185..d8afec5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -26,10 +26,10 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper;
import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.ZNRecord;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -176,7 +176,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
_startCMResultMap.put(storageNodeName, resultx);
}
- Thread.sleep(1000);
+ Thread.sleep(5000);
result =
ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
CLUSTER_NAME, TEST_DB));