You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:25 UTC
[17/53] [abbrv] git commit: [HELIX-238] Logical model accessor
implementation
[HELIX-238] Logical model accessor implementation
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/bc657373
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/bc657373
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/bc657373
Branch: refs/heads/master
Commit: bc6573734c5a2db439cc10d6755ff4e629ff0375
Parents: 917af3e
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Sep 24 14:04:36 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Nov 6 13:17:34 2013 -0800
----------------------------------------------------------------------
.../main/java/org/apache/helix/PropertyKey.java | 21 +++
.../org/apache/helix/PropertyPathConfig.java | 6 +
.../java/org/apache/helix/PropertyType.java | 1 +
.../org/apache/helix/api/ClusterAccessor.java | 92 +++--------
.../apache/helix/api/ControllerAccessor.java | 17 +-
.../java/org/apache/helix/api/Resource.java | 10 ++
.../org/apache/helix/api/ResourceAccessor.java | 155 ++++++++++++++++++-
.../helix/api/StateModelDefinitionAccessor.java | 3 -
.../controller/GenericHelixController.java | 2 +
.../rebalancer/context/RebalancerContext.java | 2 +-
.../stages/NewBestPossibleStateCalcStage.java | 2 +-
.../stages/NewBestPossibleStateOutput.java | 5 +-
.../stages/PersistAssignmentStage.java | 45 ++++++
.../controller/stages/ResourceCurrentState.java | 2 +-
.../java/org/apache/helix/model/IdealState.java | 4 +-
.../apache/helix/model/ResourceAssignment.java | 17 +-
.../helix/tools/ClusterStateVerifier.java | 2 +-
.../src/test/java/org/apache/helix/Mocks.java | 2 +-
.../java/org/apache/helix/TestPerfCounters.java | 1 +
.../stages/TestMessageThrottleStage.java | 2 -
.../stages/TestParseInfoFromAlert.java | 16 +-
.../stages/TestRebalancePipeline.java | 2 -
.../org/apache/helix/model/TestConstraint.java | 4 +-
.../apache/helix/examples/NewModelExample.java | 19 ++-
24 files changed, 316 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 1733d39..1a6d11d 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -38,6 +38,7 @@ import static org.apache.helix.PropertyType.MESSAGES;
import static org.apache.helix.PropertyType.MESSAGES_CONTROLLER;
import static org.apache.helix.PropertyType.PAUSE;
import static org.apache.helix.PropertyType.PERSISTENTSTATS;
+import static org.apache.helix.PropertyType.RESOURCEASSIGNMENTS;
import static org.apache.helix.PropertyType.STATEMODELDEFS;
import static org.apache.helix.PropertyType.STATUSUPDATES;
import static org.apache.helix.PropertyType.STATUSUPDATES_CONTROLLER;
@@ -62,6 +63,7 @@ import org.apache.helix.model.Message;
import org.apache.helix.model.PartitionConfiguration;
import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.PersistentStats;
+import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfiguration;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.StatusUpdate;
@@ -173,6 +175,25 @@ public class PropertyKey {
}
/**
+ * Get a property key associated with all {@link ResourceAssignment}s on the cluster
+ * @return {@link PropertyKey}
+ */
+ public PropertyKey resourceAssignments() {
+ return new PropertyKey(RESOURCEASSIGNMENTS, ResourceAssignment.class, _clusterName);
+ }
+
+ /**
+ * Get a property key associated with {@link ResourceAssignment} representing the most recent
+ * assignment
+ * @param resourceName name of the resource
+ * @return {@link PropertyKey}
+ */
+ public PropertyKey resourceAssignment(String resourceName) {
+ return new PropertyKey(RESOURCEASSIGNMENTS, ResourceAssignment.class, _clusterName,
+ resourceName);
+ }
+
+ /**
* Get a property key associated with {@link StateModelDefinition}
* @return {@link PropertyKey}
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java b/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
index 7fa4404..60a92f4 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
@@ -30,6 +30,7 @@ import static org.apache.helix.PropertyType.IDEALSTATES;
import static org.apache.helix.PropertyType.LIVEINSTANCES;
import static org.apache.helix.PropertyType.MESSAGES;
import static org.apache.helix.PropertyType.PAUSE;
+import static org.apache.helix.PropertyType.RESOURCEASSIGNMENTS;
import static org.apache.helix.PropertyType.STATEMODELDEFS;
import static org.apache.helix.PropertyType.STATUSUPDATES;
@@ -50,6 +51,7 @@ import org.apache.helix.model.LeaderHistory;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.StatusUpdate;
import org.apache.log4j.Logger;
@@ -78,6 +80,7 @@ public class PropertyPathConfig {
typeToClassMapping.put(ALERTS, Alerts.class);
typeToClassMapping.put(ALERT_STATUS, AlertStatus.class);
typeToClassMapping.put(PAUSE, PauseSignal.class);
+ typeToClassMapping.put(RESOURCEASSIGNMENTS, ResourceAssignment.class);
// @formatter:off
addEntry(PropertyType.CLUSTER, 1, "/{clusterName}");
@@ -92,6 +95,9 @@ public class PropertyPathConfig {
addEntry(PropertyType.INSTANCES, 2, "/{clusterName}/INSTANCES/{instanceName}");
addEntry(PropertyType.IDEALSTATES, 1, "/{clusterName}/IDEALSTATES");
addEntry(PropertyType.IDEALSTATES, 2, "/{clusterName}/IDEALSTATES/{resourceName}");
+ addEntry(PropertyType.RESOURCEASSIGNMENTS, 1, "/{clusterName}/RESOURCEASSIGNMENTS");
+ addEntry(PropertyType.RESOURCEASSIGNMENTS, 2,
+ "/{clusterName}/RESOURCEASSIGNMENTS/{resourceName}");
addEntry(PropertyType.EXTERNALVIEW, 1, "/{clusterName}/EXTERNALVIEW");
addEntry(PropertyType.EXTERNALVIEW, 2, "/{clusterName}/EXTERNALVIEW/{resourceName}");
addEntry(PropertyType.STATEMODELDEFS, 1, "/{clusterName}/STATEMODELDEFS");
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/PropertyType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyType.java b/helix-core/src/main/java/org/apache/helix/PropertyType.java
index 446f1a2..680dc06 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyType.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyType.java
@@ -41,6 +41,7 @@ public enum PropertyType {
LIVEINSTANCES(Type.CLUSTER, false, false, false, true, true),
INSTANCES(Type.CLUSTER, true, false),
IDEALSTATES(Type.CLUSTER, true, false, false, false, true),
+ RESOURCEASSIGNMENTS(Type.CLUSTER, true, false),
EXTERNALVIEW(Type.CLUSTER, true, false),
STATEMODELDEFS(Type.CLUSTER, true, false, false, false, true),
CONTROLLER(Type.CLUSTER, true, false),
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
index fd608e3..a87ce74 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
@@ -25,26 +25,22 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.helix.HelixConstants.StateModelToken;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
-import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
import org.apache.helix.model.ClusterConfiguration;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfiguration;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -127,7 +123,6 @@ public class ClusterAccessor {
* @return cluster
*/
public Cluster readCluster() {
- // TODO many of these should live in resource, participant, etc accessors
/**
* map of instance-id to instance-config
*/
@@ -190,40 +185,22 @@ public class ClusterAccessor {
Map<String, ResourceConfiguration> resourceConfigMap =
_accessor.getChildValuesMap(_keyBuilder.resourceConfigs());
+ /**
+ * Map of resource id to resource assignment
+ */
+ Map<String, ResourceAssignment> resourceAssignmentMap =
+ _accessor.getChildValuesMap(_keyBuilder.resourceAssignments());
+
+ // read all the resources
Map<ResourceId, Resource> resourceMap = new HashMap<ResourceId, Resource>();
for (String resourceName : idealStateMap.keySet()) {
- IdealState idealState = idealStateMap.get(resourceName);
- // TODO pass resource assignment
ResourceId resourceId = ResourceId.from(resourceName);
- UserConfig userConfig;
- if (resourceConfigMap != null && resourceConfigMap.containsKey(resourceName)) {
- userConfig = UserConfig.from(resourceConfigMap.get(resourceName));
- } else {
- userConfig = new UserConfig(Scope.resource(resourceId));
- }
- int bucketSize = 0;
- boolean batchMessageMode = false;
- RebalancerContext rebalancerContext;
- if (idealState != null) {
- rebalancerContext = PartitionedRebalancerContext.from(idealState);
- bucketSize = idealState.getBucketSize();
- batchMessageMode = idealState.getBatchMessageMode();
- } else {
- ResourceConfiguration resourceConfiguration = resourceConfigMap.get(resourceName);
- if (resourceConfiguration != null) {
- bucketSize = resourceConfiguration.getBucketSize();
- batchMessageMode = resourceConfiguration.getBatchMessageMode();
- RebalancerConfig rebalancerConfig = new RebalancerConfig(resourceConfiguration);
- rebalancerContext = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
- } else {
- rebalancerContext = new PartitionedRebalancerContext(RebalanceMode.NONE);
- }
- }
- resourceMap.put(resourceId,
- new Resource(resourceId, idealState, null, externalViewMap.get(resourceName),
- rebalancerContext, userConfig, bucketSize, batchMessageMode));
+ resourceMap.put(resourceId, ResourceAccessor.createResource(resourceId,
+ resourceConfigMap.get(resourceName), idealStateMap.get(resourceName),
+ externalViewMap.get(resourceName), resourceAssignmentMap.get(resourceName)));
}
+ // read all the participants
Map<ParticipantId, Participant> participantMap = new HashMap<ParticipantId, Participant>();
for (String participantName : instanceConfigMap.keySet()) {
InstanceConfig instanceConfig = instanceConfigMap.get(participantName);
@@ -238,6 +215,7 @@ public class ClusterAccessor {
currentStateMap.get(participantName)));
}
+ // read the controllers
Map<ControllerId, Controller> controllerMap = new HashMap<ControllerId, Controller>();
ControllerId leaderId = null;
if (leader != null) {
@@ -245,6 +223,7 @@ public class ClusterAccessor {
controllerMap.put(leaderId, new Controller(leaderId, leader, true));
}
+ // read the constraints
Map<ConstraintType, ClusterConstraints> clusterConstraintMap =
new HashMap<ConstraintType, ClusterConstraints>();
for (String constraintType : constraintMap.keySet()) {
@@ -252,6 +231,7 @@ public class ClusterAccessor {
constraintMap.get(constraintType));
}
+ // read the pause status
PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
boolean isPaused = pauseSignal != null;
@@ -263,10 +243,13 @@ public class ClusterAccessor {
userConfig = new UserConfig(Scope.cluster(_clusterId));
}
+ // read the state model definitions
StateModelDefinitionAccessor stateModelDefAccessor =
new StateModelDefinitionAccessor(_accessor);
Map<StateModelDefId, StateModelDefinition> stateModelMap =
stateModelDefAccessor.readStateModelDefinitions();
+
+ // create the cluster snapshot object
return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
clusterConstraintMap, stateModelMap, userConfig, isPaused);
}
@@ -290,7 +273,6 @@ public class ClusterAccessor {
* @param resource
*/
public void addResourceToCluster(ResourceConfig resource) {
- // TODO: this belongs in ResourceAccessor
RebalancerContext context =
resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
StateModelDefId stateModelDefId = context.getStateModelDefId();
@@ -317,40 +299,10 @@ public class ClusterAccessor {
// Create an IdealState from a RebalancerConfig (if the resource is partitioned)
RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
- PartitionedRebalancerContext partitionedContext =
- rebalancerConfig.getRebalancerContext(PartitionedRebalancerContext.class);
- if (context != null) {
- IdealState idealState = new IdealState(resourceId);
- idealState.setRebalanceMode(partitionedContext.getRebalanceMode());
- idealState.setRebalancerRef(partitionedContext.getRebalancerRef());
- String replicas = null;
- if (partitionedContext.anyLiveParticipant()) {
- replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
- } else {
- replicas = Integer.toString(partitionedContext.getReplicaCount());
- }
- idealState.setReplicas(replicas);
- idealState.setNumPartitions(partitionedContext.getPartitionSet().size());
- idealState.setInstanceGroupTag(partitionedContext.getParticipantGroupTag());
- idealState.setMaxPartitionsPerInstance(partitionedContext.getMaxPartitionsPerParticipant());
- idealState.setStateModelDefId(partitionedContext.getStateModelDefId());
- idealState.setStateModelFactoryId(partitionedContext.getStateModelFactoryId());
- idealState.setBucketSize(resource.getBucketSize());
- idealState.setBatchMessageMode(resource.getBatchMessageMode());
- if (partitionedContext.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
- SemiAutoRebalancerContext semiAutoContext =
- rebalancerConfig.getRebalancerContext(SemiAutoRebalancerContext.class);
- for (PartitionId partitionId : semiAutoContext.getPartitionSet()) {
- idealState.setPreferenceList(partitionId, semiAutoContext.getPreferenceList(partitionId));
- }
- } else if (partitionedContext.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
- CustomRebalancerContext customContext =
- rebalancerConfig.getRebalancerContext(CustomRebalancerContext.class);
- for (PartitionId partitionId : customContext.getPartitionSet()) {
- idealState.setParticipantStateMap(partitionId,
- customContext.getPreferenceMap(partitionId));
- }
- }
+ IdealState idealState =
+ ResourceAccessor.rebalancerConfigToIdealState(rebalancerConfig, resource.getBucketSize(),
+ resource.getBatchMessageMode());
+ if (idealState != null) {
_accessor.createProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java
index 153bab6..b835a4c 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ControllerAccessor.java
@@ -20,19 +20,28 @@ package org.apache.helix.api;
*/
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.LiveInstance;
public class ControllerAccessor {
private final HelixDataAccessor _accessor;
+ private final PropertyKey.Builder _keyBuilder;
public ControllerAccessor(HelixDataAccessor accessor) {
_accessor = accessor;
+ _keyBuilder = accessor.keyBuilder();
}
/**
- * create leader
- * @param controllerId
+ * Read the leader controller if it is live
+ * @return Controller snapshot, or null
*/
- public void connectLeader(ControllerId controllerId) {
- // TODO impl this
+ public Controller readLeader() {
+ LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+ if (leader != null) {
+ ControllerId leaderId = ControllerId.from(leader.getId());
+ return new Controller(leaderId, leader, true);
+ }
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index b8a6daf..7816888 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -38,6 +38,7 @@ import org.apache.helix.model.ResourceAssignment;
public class Resource {
private final ResourceConfig _config;
private final ExternalView _externalView;
+ private final ResourceAssignment _resourceAssignment;
/**
* Construct a resource
@@ -73,6 +74,7 @@ public class Resource {
new ResourceConfig(id, ResourceType.DATA, schedulerTaskConfig, rebalancerConfig,
userConfig, bucketSize, batchMessageMode);
_externalView = externalView;
+ _resourceAssignment = resourceAssignment;
}
/**
@@ -151,6 +153,14 @@ public class Resource {
}
/**
+ * Get the current resource assignment
+ * @return ResourceAssignment, or null if no current assignment
+ */
+ public ResourceAssignment getResourceAssignment() {
+ return _resourceAssignment;
+ }
+
+ /**
* Get the resource properties configuring rebalancing
* @return RebalancerConfig properties
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
index fa36247..bdc44c7 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
@@ -19,38 +19,90 @@ package org.apache.helix.api;
* under the License.
*/
+import org.apache.helix.HelixConstants.StateModelToken;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
+import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfiguration;
public class ResourceAccessor {
- private final ClusterId _clusterId;
private final HelixDataAccessor _accessor;
private final PropertyKey.Builder _keyBuilder;
- public ResourceAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
- _clusterId = clusterId;
+ public ResourceAccessor(HelixDataAccessor accessor) {
_accessor = accessor;
_keyBuilder = accessor.keyBuilder();
}
/**
+ * Read a single snapshot of a resource
+ * @param resourceId the resource id to read
+ * @return Resource
+ */
+ public Resource readResource(ResourceId resourceId) {
+ ResourceConfiguration config =
+ _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+ IdealState idealState = _accessor.getProperty(_keyBuilder.idealState(resourceId.stringify()));
+ ExternalView externalView =
+ _accessor.getProperty(_keyBuilder.externalView(resourceId.stringify()));
+ ResourceAssignment resourceAssignment =
+ _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
+ return createResource(resourceId, config, idealState, externalView, resourceAssignment);
+ }
+
+ /**
* save resource assignment
* @param resourceId
* @param resourceAssignment
*/
- public void setRresourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
+ public void setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
+ _accessor.setProperty(_keyBuilder.resourceAssignment(resourceId.stringify()),
+ resourceAssignment);
+ }
+
+ /**
+ * save resource assignment
+ * @param resourceId
+ * @return resource assignment or null
+ */
+ public void getResourceAssignment(ResourceId resourceId) {
+ _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
+ }
+
+ /**
+ * Set a resource configuration, which may include user-defined configuration, as well as
+ * rebalancer configuration
+ * @param resourceId
+ * @param configuration
+ */
+ public void setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) {
+ _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+ // also set an ideal state if the resource supports it
+ RebalancerConfig rebalancerConfig = new RebalancerConfig(configuration);
+ IdealState idealState =
+ rebalancerConfigToIdealState(rebalancerConfig, configuration.getBucketSize(),
+ configuration.getBatchMessageMode());
+ if (idealState != null) {
+ _accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
+ }
}
/**
- * set ideal-state
+ * Get a resource configuration, which may include user-defined configuration, as well as
+ * rebalancer configuration
* @param resourceId
- * @param idealState
+ * @return configuration
*/
- public void setIdealState(ResourceId resourceId, IdealState idealState) {
- _accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
+ public void getConfiguration(ResourceId resourceId) {
+ _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
}
/**
@@ -70,4 +122,91 @@ public class ResourceAccessor {
_accessor.removeProperty(_keyBuilder.externalView(resourceId.stringify()));
}
+ /**
+ * Get an ideal state from a rebalancer config if the resource is partitioned
+ * @param config RebalancerConfig instance
+ * @param bucketSize bucket size to use
+ * @param batchMessageMode true if batch messaging allowed, false otherwise
+ * @return IdealState, or null
+ */
+ static IdealState rebalancerConfigToIdealState(RebalancerConfig config, int bucketSize,
+ boolean batchMessageMode) {
+ PartitionedRebalancerContext partitionedContext =
+ config.getRebalancerContext(PartitionedRebalancerContext.class);
+ if (partitionedContext != null) {
+ IdealState idealState = new IdealState(partitionedContext.getResourceId());
+ idealState.setRebalanceMode(partitionedContext.getRebalanceMode());
+ idealState.setRebalancerRef(partitionedContext.getRebalancerRef());
+ String replicas = null;
+ if (partitionedContext.anyLiveParticipant()) {
+ replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
+ } else {
+ replicas = Integer.toString(partitionedContext.getReplicaCount());
+ }
+ idealState.setReplicas(replicas);
+ idealState.setNumPartitions(partitionedContext.getPartitionSet().size());
+ idealState.setInstanceGroupTag(partitionedContext.getParticipantGroupTag());
+ idealState.setMaxPartitionsPerInstance(partitionedContext.getMaxPartitionsPerParticipant());
+ idealState.setStateModelDefId(partitionedContext.getStateModelDefId());
+ idealState.setStateModelFactoryId(partitionedContext.getStateModelFactoryId());
+ idealState.setBucketSize(bucketSize);
+ idealState.setBatchMessageMode(batchMessageMode);
+ if (partitionedContext.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+ SemiAutoRebalancerContext semiAutoContext =
+ config.getRebalancerContext(SemiAutoRebalancerContext.class);
+ for (PartitionId partitionId : semiAutoContext.getPartitionSet()) {
+ idealState.setPreferenceList(partitionId, semiAutoContext.getPreferenceList(partitionId));
+ }
+ } else if (partitionedContext.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+ CustomRebalancerContext customContext =
+ config.getRebalancerContext(CustomRebalancerContext.class);
+ for (PartitionId partitionId : customContext.getPartitionSet()) {
+ idealState.setParticipantStateMap(partitionId,
+ customContext.getPreferenceMap(partitionId));
+ }
+ }
+ return idealState;
+ }
+ return null;
+ }
+
+ /**
+ * Create a resource snapshot instance from the physical model
+ * @param resourceId the resource id
+ * @param resourceConfiguration physical resource configuration
+ * @param idealState ideal state of the resource
+ * @param externalView external view of the resource
+ * @param resourceAssignment current resource assignment
+ * @return Resource
+ */
+ static Resource createResource(ResourceId resourceId,
+ ResourceConfiguration resourceConfiguration, IdealState idealState,
+ ExternalView externalView, ResourceAssignment resourceAssignment) {
+ // TODO pass resource assignment
+ UserConfig userConfig;
+ if (resourceConfiguration != null) {
+ userConfig = UserConfig.from(resourceConfiguration);
+ } else {
+ userConfig = new UserConfig(Scope.resource(resourceId));
+ }
+ int bucketSize = 0;
+ boolean batchMessageMode = false;
+ RebalancerContext rebalancerContext;
+ if (idealState != null) {
+ rebalancerContext = PartitionedRebalancerContext.from(idealState);
+ bucketSize = idealState.getBucketSize();
+ batchMessageMode = idealState.getBatchMessageMode();
+ } else {
+ if (resourceConfiguration != null) {
+ bucketSize = resourceConfiguration.getBucketSize();
+ batchMessageMode = resourceConfiguration.getBatchMessageMode();
+ RebalancerConfig rebalancerConfig = new RebalancerConfig(resourceConfiguration);
+ rebalancerContext = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+ } else {
+ rebalancerContext = new PartitionedRebalancerContext(RebalanceMode.NONE);
+ }
+ }
+ return new Resource(resourceId, idealState, resourceAssignment, externalView,
+ rebalancerContext, userConfig, bucketSize, batchMessageMode);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java b/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
index f2bbf6d..988484b 100644
--- a/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
@@ -25,13 +25,10 @@ import java.util.Map;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.StateModelDefinition;
-import org.apache.log4j.Logger;
import com.google.common.collect.ImmutableMap;
public class StateModelDefinitionAccessor {
- private static Logger LOG = Logger.getLogger(StateModelDefinitionAccessor.class);
-
private final HelixDataAccessor _accessor;
private final PropertyKey.Builder _keyBuilder;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index ab1f824..3c118ce 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -55,6 +55,7 @@ import org.apache.helix.controller.stages.NewMessageThrottleStage;
import org.apache.helix.controller.stages.NewReadClusterDataStage;
import org.apache.helix.controller.stages.NewResourceComputationStage;
import org.apache.helix.controller.stages.NewTaskAssignmentStage;
+import org.apache.helix.controller.stages.PersistAssignmentStage;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HealthStat;
@@ -182,6 +183,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
rebalancePipeline.addStage(new NewResourceComputationStage());
rebalancePipeline.addStage(new NewCurrentStateComputationStage());
rebalancePipeline.addStage(new NewBestPossibleStateCalcStage());
+ rebalancePipeline.addStage(new PersistAssignmentStage());
rebalancePipeline.addStage(new NewMessageGenerationStage());
rebalancePipeline.addStage(new NewMessageSelectionStage());
rebalancePipeline.addStage(new NewMessageThrottleStage());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
index 3aff151..87863b5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
@@ -53,7 +53,7 @@ public interface RebalancerContext {
* @param subUnitId the id of the subunit
* @return SubUnit
*/
- public Partition getSubUnit(PartitionId partitionId);
+ public Partition getSubUnit(PartitionId subUnitId);
/**
* Get the resource to rebalance
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
index 6334279..7434c2a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -85,7 +85,7 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
private ResourceAssignment mapDroppedResource(Cluster cluster, ResourceId resourceId,
ResourceCurrentState currentStateOutput, StateModelDefinition stateModelDef) {
ResourceAssignment partitionMapping = new ResourceAssignment(resourceId);
- Set<PartitionId> mappedPartitions =
+ Set<? extends PartitionId> mappedPartitions =
currentStateOutput.getCurrentStateMappedPartitions(resourceId);
if (mappedPartitions == null) {
return partitionMapping;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
index fbdea55..aaeeb34 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
@@ -1,18 +1,19 @@
package org.apache.helix.controller.stages;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.helix.api.ResourceId;
import org.apache.helix.model.ResourceAssignment;
+import com.google.common.collect.Maps;
+
public class NewBestPossibleStateOutput {
Map<ResourceId, ResourceAssignment> _resourceAssignmentMap;
public NewBestPossibleStateOutput() {
- _resourceAssignmentMap = new HashMap<ResourceId, ResourceAssignment>();
+ _resourceAssignmentMap = Maps.newHashMap();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
new file mode 100644
index 0000000..f982847
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -0,0 +1,45 @@
+package org.apache.helix.controller.stages;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.ResourceAccessor;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.model.ResourceAssignment;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Persist the ResourceAssignment of each resource that went through rebalancing
+ */
+public class PersistAssignmentStage extends AbstractBaseStage {
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ HelixManager helixManager = event.getAttribute("helixmanager");
+ HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
+ ResourceAccessor resourceAccessor = new ResourceAccessor(accessor);
+ NewBestPossibleStateOutput assignments =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+ for (ResourceId resourceId : assignments.getAssignedResources()) {
+ ResourceAssignment assignment = assignments.getResourceAssignment(resourceId);
+ resourceAccessor.setResourceAssignment(resourceId, assignment);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
index db33a4f..e3e8f94 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
@@ -225,7 +225,7 @@ public class ResourceCurrentState {
* @param resourceId resource to look up
* @return set of mapped partitions, or empty set if there are none
*/
- public Set<PartitionId> getCurrentStateMappedPartitions(ResourceId resourceId) {
+ public Set<? extends PartitionId> getCurrentStateMappedPartitions(ResourceId resourceId) {
Map<PartitionId, Map<ParticipantId, State>> currentStateMap = _currentStateMap.get(resourceId);
if (currentStateMap != null) {
return currentStateMap.keySet();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index ff0923d..ba62391 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -747,7 +747,7 @@ public class IdealState extends HelixProperty {
* @param rawPreferenceLists a map of partition name to a list of participant names
* @return converted lists as a map
*/
- public static Map<PartitionId, List<ParticipantId>> preferenceListsFromStringLists(
+ public static Map<? extends PartitionId, List<ParticipantId>> preferenceListsFromStringLists(
Map<String, List<String>> rawPreferenceLists) {
if (rawPreferenceLists == null) {
return Collections.emptyMap();
@@ -812,7 +812,7 @@ public class IdealState extends HelixProperty {
* @param rawMaps the map of partition name to participant name and state
* @return converted maps
*/
- public static Map<PartitionId, Map<ParticipantId, State>> participantStateMapsFromStringMaps(
+ public static Map<? extends PartitionId, Map<ParticipantId, State>> participantStateMapsFromStringMaps(
Map<String, Map<String, String>> rawMaps) {
return ResourceAssignment.replicaMapsFromStringMaps(rawMaps);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
index 65fa14d..14183f5 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.ResourceId;
@@ -63,6 +64,14 @@ public class ResourceAssignment extends HelixProperty {
}
/**
+ * Instantiate from a record. This supports reading the assignment directly from the backing store
+ * @param record backing record
+ */
+ public ResourceAssignment(ZNRecord record) {
+ super(record);
+ }
+
+ /**
* Get the resource for which this assignment was created
* @return resource id
*/
@@ -74,7 +83,7 @@ public class ResourceAssignment extends HelixProperty {
* Get the currently mapped partitions
* @return list of Partition objects
*/
- public List<PartitionId> getMappedPartitions() {
+ public List<? extends PartitionId> getMappedPartitions() {
ImmutableList.Builder<PartitionId> builder = new ImmutableList.Builder<PartitionId>();
for (String partitionName : _record.getMapFields().keySet()) {
builder.add(PartitionId.from(partitionName));
@@ -86,7 +95,7 @@ public class ResourceAssignment extends HelixProperty {
* Get the entire map of a resource
* @return map of partition to participant to state
*/
- public Map<PartitionId, Map<ParticipantId, State>> getResourceMap() {
+ public Map<? extends PartitionId, Map<ParticipantId, State>> getResourceMap() {
return replicaMapsFromStringMaps(_record.getMapFields());
}
@@ -144,7 +153,7 @@ public class ResourceAssignment extends HelixProperty {
* @param rawMaps the map of partition name to participant name and state
* @return converted maps
*/
- public static Map<PartitionId, Map<ParticipantId, State>> replicaMapsFromStringMaps(
+ public static Map<? extends PartitionId, Map<ParticipantId, State>> replicaMapsFromStringMaps(
Map<String, Map<String, String>> rawMaps) {
if (rawMaps == null) {
return Collections.emptyMap();
@@ -180,7 +189,7 @@ public class ResourceAssignment extends HelixProperty {
* @return converted maps
*/
public static Map<String, Map<String, String>> stringMapsFromReplicaMaps(
- Map<PartitionId, Map<ParticipantId, State>> replicaMaps) {
+ Map<? extends PartitionId, Map<ParticipantId, State>> replicaMaps) {
if (replicaMaps == null) {
return Collections.emptyMap();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index 7e16de0..d4c1691 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -266,7 +266,7 @@ public class ClusterStateVerifier {
ResourceAssignment resourceAssignment = bestPossOutput.getResourceAssignment(resourceId);
ResourceAssignmentBuilder raBuilder = new ResourceAssignmentBuilder(resourceId);
- List<PartitionId> mappedPartitions = resourceAssignment.getMappedPartitions();
+ List<? extends PartitionId> mappedPartitions = resourceAssignment.getMappedPartitions();
for (PartitionId partitionId : mappedPartitions) {
raBuilder.addAssignments(partitionId, resourceAssignment.getReplicaMap(partitionId));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 1fdf9fd..0558764 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -674,7 +674,7 @@ public class Mocks {
}
@Override
- public BaseDataAccessor getBaseDataAccessor() {
+ public BaseDataAccessor<ZNRecord> getBaseDataAccessor() {
// TODO Auto-generated method stub
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/test/java/org/apache/helix/TestPerfCounters.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestPerfCounters.java b/helix-core/src/test/java/org/apache/helix/TestPerfCounters.java
index d95bff8..bac6e7a 100644
--- a/helix-core/src/test/java/org/apache/helix/TestPerfCounters.java
+++ b/helix-core/src/test/java/org/apache/helix/TestPerfCounters.java
@@ -24,6 +24,7 @@ import org.testng.AssertJUnit;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
+@SuppressWarnings("deprecation")
public class TestPerfCounters {
final String INSTANCE_NAME = "instance_123";
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index b0c3e58..9f31609 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -43,12 +43,10 @@ import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
-import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestMessageThrottleStage extends ZkUnitTestBase {
- private static final Logger LOG = Logger.getLogger(TestMessageThrottleStage.class.getName());
final String _className = getShortClassName();
@Test
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
index ec5d503..8e7b85a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParseInfoFromAlert.java
@@ -27,27 +27,29 @@ import org.testng.annotations.Test;
public class TestParseInfoFromAlert extends ZkStandAloneCMTestBase {
@Test
public void TestParse() {
- StatsAggregationStage stage = new StatsAggregationStage();
String controllerName = CONTROLLER_PREFIX + "_0";
HelixManager manager = _startCMResultMap.get(controllerName)._manager;
String instanceName =
- stage.parseInstanceName("localhost_12918.TestStat@DB=123.latency", manager);
+ StatsAggregationStage.parseInstanceName("localhost_12918.TestStat@DB=123.latency", manager);
Assert.assertTrue(instanceName.equals("localhost_12918"));
- instanceName = stage.parseInstanceName("localhost_12955.TestStat@DB=123.latency", manager);
+ instanceName =
+ StatsAggregationStage.parseInstanceName("localhost_12955.TestStat@DB=123.latency", manager);
Assert.assertTrue(instanceName == null);
- instanceName = stage.parseInstanceName("localhost_12922.TestStat@DB=123.latency", manager);
+ instanceName =
+ StatsAggregationStage.parseInstanceName("localhost_12922.TestStat@DB=123.latency", manager);
Assert.assertTrue(instanceName.equals("localhost_12922"));
String resourceName =
- stage.parseResourceName("localhost_12918.TestStat@DB=TestDB.latency", manager);
+ StatsAggregationStage.parseResourceName("localhost_12918.TestStat@DB=TestDB.latency",
+ manager);
Assert.assertTrue(resourceName.equals("TestDB"));
String partitionName =
- stage.parsePartitionName("localhost_12918.TestStat@DB=TestDB;Partition=TestDB_22.latency",
- manager);
+ StatsAggregationStage.parsePartitionName(
+ "localhost_12918.TestStat@DB=TestDB;Partition=TestDB_22.latency", manager);
Assert.assertTrue(partitionName.equals("TestDB_22"));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index e5674dc..f7d2317 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -42,12 +42,10 @@ import org.apache.helix.model.CurrentState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.Attributes;
-import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestRebalancePipeline extends ZkUnitTestBase {
- private static final Logger LOG = Logger.getLogger(TestRebalancePipeline.class.getName());
final String _className = getShortClassName();
@Test
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java b/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
index deea4d1..b1fc1cd 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
@@ -35,12 +35,10 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.Message.MessageType;
-import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestConstraint extends ZkUnitTestBase {
- private static Logger LOG = Logger.getLogger(TestConstraint.class);
@Test
public void testMsgConstraint() {
@@ -185,7 +183,7 @@ public class TestConstraint extends ZkUnitTestBase {
ConstraintItem constraint2 = new ConstraintItem(record.getMapField("constraint2"));
ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.constraint(ConstraintType.STATE_CONSTRAINT.toString()),
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bc657373/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
index ed66ae2..6791bfd 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
@@ -26,6 +26,7 @@ import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.Transition;
import org.apache.log4j.Logger;
import com.google.common.collect.Lists;
@@ -77,11 +78,21 @@ public class NewModelExample {
userConfig.setIntField("sampleInt", 1);
// fully specify the cluster with a ClusterConfig
- ClusterConfig cluster =
+ ClusterConfig.Builder clusterBuilder =
new ClusterConfig.Builder(clusterId).addResource(resource).addParticipant(participant)
- .addStateModelDefinition(lockUnlock).userConfig(userConfig).build();
+ .addStateModelDefinition(lockUnlock).userConfig(userConfig);
- // set up accessors to work with Zookeeper-persisted data
+ // add a state constraint that is more restrictive than what is in the state model
+ clusterBuilder.addStateUpperBoundConstraint(Scope.cluster(clusterId),
+ lockUnlock.getStateModelDefId(), State.from("LOCKED"), 1);
+
+ // add a transition constraint (this time with a resource scope)
+ clusterBuilder.addTransitionConstraint(Scope.resource(resource.getId()),
+ lockUnlock.getStateModelDefId(), Transition.from("RELEASED-LOCKED"), 1);
+
+ ClusterConfig cluster = clusterBuilder.build();
+
+ // set up accessors to work with ZooKeeper-persisted data
int timeOutInSec = Integer.parseInt(System.getProperty(ZKHelixAdmin.CONNECTION_TIMEOUT, "30"));
ZkClient zkClient = new ZkClient(args[0], timeOutInSec * 1000);
zkClient.setZkSerializer(new ZNRecordSerializer());
@@ -150,7 +161,7 @@ public class NewModelExample {
StateModelDefinition.Builder stateModelBuilder =
new StateModelDefinition.Builder(stateModelId).addState(LOCKED, 0).addState(RELEASED, 1)
.addState(DROPPED, 2).addTransition(RELEASED, LOCKED, 0)
- .addTransition(LOCKED, RELEASED, 1).upperBound(LOCKED, 1).upperBound(RELEASED, -1)
+ .addTransition(LOCKED, RELEASED, 1).upperBound(LOCKED, 2).upperBound(RELEASED, -1)
.upperBound(DROPPED, -1).initialState(RELEASED);
return stateModelBuilder.build();
}