You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:29 UTC
[21/53] [abbrv] git commit: [HELIX-238] Added basic updater logic
[HELIX-238] Added basic updater logic
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/89b3bc59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/89b3bc59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/89b3bc59
Branch: refs/heads/master
Commit: 89b3bc59e367a8d0dbe268d755d59b6030d3fdf7
Parents: 0b67257
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Sep 25 18:08:20 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Nov 6 13:17:34 2013 -0800
----------------------------------------------------------------------
.../main/java/org/apache/helix/PropertyKey.java | 16 ++
.../main/java/org/apache/helix/api/Cluster.java | 2 +-
.../org/apache/helix/api/ClusterAccessor.java | 171 +++++++++++---
.../org/apache/helix/api/ClusterConfig.java | 220 ++++++++++++++++++-
.../java/org/apache/helix/api/Participant.java | 2 +-
.../apache/helix/api/ParticipantAccessor.java | 28 ++-
.../org/apache/helix/api/ParticipantConfig.java | 156 ++++++++++++-
.../java/org/apache/helix/api/Resource.java | 12 +-
.../org/apache/helix/api/ResourceAccessor.java | 29 ++-
.../org/apache/helix/api/ResourceConfig.java | 121 +++++++++-
.../helix/api/StateModelDefinitionAccessor.java | 4 +-
.../context/PartitionedRebalancerContext.java | 7 +-
.../rebalancer/context/RebalancerConfig.java | 4 +-
.../helix/model/ResourceConfiguration.java | 37 +---
.../helix/controller/stages/BaseStageTest.java | 5 +-
.../apache/helix/examples/NewModelExample.java | 7 +-
16 files changed, 727 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 1a6d11d..16c6c1c 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -38,6 +38,7 @@ import static org.apache.helix.PropertyType.MESSAGES;
import static org.apache.helix.PropertyType.MESSAGES_CONTROLLER;
import static org.apache.helix.PropertyType.PAUSE;
import static org.apache.helix.PropertyType.PERSISTENTSTATS;
+import static org.apache.helix.PropertyType.PROPERTYSTORE;
import static org.apache.helix.PropertyType.RESOURCEASSIGNMENTS;
import static org.apache.helix.PropertyType.STATEMODELDEFS;
import static org.apache.helix.PropertyType.STATUSUPDATES;
@@ -594,6 +595,14 @@ public class PropertyKey {
}
/**
+ * Get the root of all controller status updates
+ * @return {@link PropertyKey}
+ */
+ public PropertyKey controllerTaskStatuses() {
+ return new PropertyKey(STATUSUPDATES_CONTROLLER, StatusUpdate.class, _clusterName);
+ }
+
+ /**
* Get a property key associated with {@link StatusUpdate} of controller status updates
* @param subPath
* @return {@link PropertyKey}
@@ -705,6 +714,13 @@ public class PropertyKey {
return new PropertyKey(HEALTHREPORT, HealthStat.class, _clusterName, instanceName);
}
+ /**
+ * Get a propertykey associated with the root of the Helix property store
+ * @return {@link PropertyStore}
+ */
+ public PropertyKey propertyStore() {
+ return new PropertyKey(PROPERTYSTORE, null, _clusterName);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index 56a84e9..9e71904 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -102,7 +102,7 @@ public class Cluster {
_config =
new ClusterConfig.Builder(id).addResources(resourceConfigMap.values())
.addParticipants(participantConfigMap.values()).addConstraints(constraintMap.values())
- .addStateModelDefinitions(stateModelMap.values()).setPausedStatus(isPaused)
+ .addStateModelDefinitions(stateModelMap.values()).pausedStatus(isPaused)
.userConfig(userConfig).build();
_resourceMap = ImmutableMap.copyOf(resourceMap);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
index a87ce74..6302e33 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
@@ -25,8 +25,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
import org.apache.helix.controller.rebalancer.context.RebalancerContext;
@@ -65,15 +66,13 @@ public class ClusterAccessor {
public boolean createCluster(ClusterConfig cluster) {
boolean created = _accessor.createProperty(_keyBuilder.cluster(), null);
if (!created) {
- // LOG.warn("Cluster already created. Aborting.");
- // return false;
+ LOG.error("Cluster already created. Aborting.");
+ return false;
}
-
- StateModelDefinitionAccessor stateModelDefAccessor =
- new StateModelDefinitionAccessor(_accessor);
+ initClusterStructure();
Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
for (StateModelDefinition stateModelDef : stateModelDefs.values()) {
- stateModelDefAccessor.addStateModelDefinition(stateModelDef);
+ addStateModelDefinitionToCluster(stateModelDef);
}
Map<ResourceId, ResourceConfig> resources = cluster.getResourceMap();
for (ResourceConfig resource : resources.values()) {
@@ -97,25 +96,38 @@ public class ClusterAccessor {
return true;
}
+ public ClusterConfig updateCluster(ClusterConfig.Delta clusterDelta) {
+ Cluster cluster = readCluster();
+ if (cluster == null) {
+ LOG.error("Cluster does not exist, cannot be updated");
+ return null;
+ }
+ ClusterConfig config = clusterDelta.mergeInto(cluster.getConfig());
+ // TODO: persist this
+ return config;
+ }
+
/**
* drop a cluster
+ * @return true if the cluster was dropped, false if there was an error
*/
- public void dropCluster() {
+ public boolean dropCluster() {
LOG.info("Dropping cluster: " + _clusterId);
List<String> liveInstanceNames = _accessor.getChildNames(_keyBuilder.liveInstances());
if (liveInstanceNames.size() > 0) {
- throw new HelixException("Can't drop cluster: " + _clusterId
- + " because there are running participant: " + liveInstanceNames
- + ", shutdown participants first.");
+ LOG.error("Can't drop cluster: " + _clusterId + " because there are running participant: "
+ + liveInstanceNames + ", shutdown participants first.");
+ return false;
}
LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
if (leader != null) {
- throw new HelixException("Can't drop cluster: " + _clusterId + ", because leader: "
- + leader.getId() + " are running, shutdown leader first.");
+ LOG.error("Can't drop cluster: " + _clusterId + ", because leader: " + leader.getId()
+ + " are running, shutdown leader first.");
+ return false;
}
- _accessor.removeProperty(_keyBuilder.cluster());
+ return _accessor.removeProperty(_keyBuilder.cluster());
}
/**
@@ -271,30 +283,42 @@ public class ClusterAccessor {
/**
* add a resource to cluster
* @param resource
+ * @return true if resource added, false if there was an error
*/
- public void addResourceToCluster(ResourceConfig resource) {
+ public boolean addResourceToCluster(ResourceConfig resource) {
+ if (!isClusterStructureValid()) {
+ LOG.error("Cluster: " + _clusterId + " structure is not valid");
+ return false;
+ }
RebalancerContext context =
resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
StateModelDefId stateModelDefId = context.getStateModelDefId();
if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
- throw new HelixException("State model: " + stateModelDefId + " not found in cluster: "
- + _clusterId);
+ LOG.error("State model: " + stateModelDefId + " not found in cluster: " + _clusterId);
+ return false;
}
ResourceId resourceId = resource.getId();
if (_accessor.getProperty(_keyBuilder.idealState(resourceId.stringify())) != null) {
- throw new HelixException("Skip adding resource: " + resourceId
+ LOG.error("Skip adding resource: " + resourceId
+ ", because resource ideal state already exists in cluster: " + _clusterId);
+ return false;
+ }
+ if (_accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify())) != null) {
+ LOG.error("Skip adding resource: " + resourceId
+ + ", because resource config already exists in cluster: " + _clusterId);
+ return false;
}
// Add resource user config
if (resource.getUserConfig() != null) {
ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
+ configuration.setType(resource.getType());
configuration.addNamespacedConfig(resource.getUserConfig());
configuration.addNamespacedConfig(resource.getRebalancerConfig().toNamespacedConfig());
configuration.setBucketSize(resource.getBucketSize());
configuration.setBatchMessageMode(resource.getBatchMessageMode());
- _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+ _accessor.createProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
}
// Create an IdealState from a RebalancerConfig (if the resource is partitioned)
@@ -305,15 +329,23 @@ public class ClusterAccessor {
if (idealState != null) {
_accessor.createProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
}
+ return true;
}
/**
* drop a resource from cluster
* @param resourceId
+ * @return true if removal succeeded, false otherwise
*/
- public void dropResourceFromCluster(ResourceId resourceId) {
+ public boolean dropResourceFromCluster(ResourceId resourceId) {
+ if (_accessor.getProperty(_keyBuilder.idealState(resourceId.stringify())) == null) {
+ LOG.error("Skip removing resource: " + resourceId
+ + ", because resource ideal state already removed from cluster: " + _clusterId);
+ return false;
+ }
_accessor.removeProperty(_keyBuilder.idealState(resourceId.stringify()));
_accessor.removeProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+ return true;
}
/**
@@ -321,23 +353,71 @@ public class ClusterAccessor {
* @return true if valid or false otherwise
*/
public boolean isClusterStructureValid() {
- // TODO impl this
+ List<String> paths = getRequiredPaths();
+ BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+ boolean[] existsResults = baseAccessor.exists(paths, 0);
+ for (boolean exists : existsResults) {
+ if (!exists) {
+ return false;
+ }
+ }
return true;
}
/**
+ * Create empty persistent properties to ensure that there is a valid cluster structure
+ */
+ private void initClusterStructure() {
+ BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+ List<String> paths = getRequiredPaths();
+ for (String path : paths) {
+ boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
+ if (!status && LOG.isDebugEnabled()) {
+ LOG.debug(path + " already exists");
+ }
+ }
+ }
+
+ /**
+ * Get all property paths that must be set for a cluster structure to be valid
+ * @return list of paths as strings
+ */
+ private List<String> getRequiredPaths() {
+ List<String> paths = new ArrayList<String>();
+ paths.add(_keyBuilder.cluster().getPath());
+ paths.add(_keyBuilder.idealStates().getPath());
+ paths.add(_keyBuilder.clusterConfigs().getPath());
+ paths.add(_keyBuilder.instanceConfigs().getPath());
+ paths.add(_keyBuilder.resourceConfigs().getPath());
+ paths.add(_keyBuilder.propertyStore().getPath());
+ paths.add(_keyBuilder.liveInstances().getPath());
+ paths.add(_keyBuilder.instances().getPath());
+ paths.add(_keyBuilder.externalViews().getPath());
+ paths.add(_keyBuilder.controller().getPath());
+ paths.add(_keyBuilder.stateModelDefs().getPath());
+ paths.add(_keyBuilder.controllerMessages().getPath());
+ paths.add(_keyBuilder.controllerTaskErrors().getPath());
+ paths.add(_keyBuilder.controllerTaskStatuses().getPath());
+ paths.add(_keyBuilder.controllerLeaderHistory().getPath());
+ return paths;
+ }
+
+ /**
* add a participant to cluster
* @param participant
+ * @return true if participant added, false otherwise
*/
- public void addParticipantToCluster(ParticipantConfig participant) {
+ public boolean addParticipantToCluster(ParticipantConfig participant) {
if (!isClusterStructureValid()) {
- throw new HelixException("Cluster: " + _clusterId + " structure is not valid");
+ LOG.error("Cluster: " + _clusterId + " structure is not valid");
+ return false;
}
ParticipantId participantId = participant.getId();
if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) != null) {
- throw new HelixException("Config for participant: " + participantId
- + " already exists in cluster: " + _clusterId);
+ LOG.error("Config for participant: " + participantId + " already exists in cluster: "
+ + _clusterId);
+ return false;
}
// add empty root ZNodes
@@ -361,27 +441,31 @@ public class ClusterAccessor {
for (String tag : tags) {
instanceConfig.addTag(tag);
}
- Set<PartitionId> disabledPartitions = participant.getDisablePartitionIds();
+ Set<PartitionId> disabledPartitions = participant.getDisabledPartitions();
for (PartitionId partitionId : disabledPartitions) {
instanceConfig.setInstanceEnabledForPartition(partitionId, false);
}
_accessor.createProperty(_keyBuilder.instanceConfig(participantId.stringify()), instanceConfig);
_accessor.createProperty(_keyBuilder.messages(participantId.stringify()), null);
+ return true;
}
/**
* drop a participant from cluster
* @param participantId
+ * @return true if participant dropped, false if there was an error
*/
- public void dropParticipantFromCluster(ParticipantId participantId) {
+ public boolean dropParticipantFromCluster(ParticipantId participantId) {
if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) {
- throw new HelixException("Config for participant: " + participantId
- + " does NOT exist in cluster: " + _clusterId);
+ LOG.error("Config for participant: " + participantId + " does NOT exist in cluster: "
+ + _clusterId);
+ return false;
}
if (_accessor.getProperty(_keyBuilder.instance(participantId.stringify())) == null) {
- throw new HelixException("Participant: " + participantId
- + " structure does NOT exist in cluster: " + _clusterId);
+ LOG.error("Participant: " + participantId + " structure does NOT exist in cluster: "
+ + _clusterId);
+ return false;
}
// delete participant config path
@@ -389,5 +473,30 @@ public class ClusterAccessor {
// delete participant path
_accessor.removeProperty(_keyBuilder.instance(participantId.stringify()));
+ return true;
+ }
+
+ /**
+ * Add a state model definition. Updates the existing state model definition if it already exists.
+ * @param stateModelDef fully initialized state model definition
+ * @return true if the model is persisted, false otherwise
+ */
+ public boolean addStateModelDefinitionToCluster(StateModelDefinition stateModelDef) {
+ if (!isClusterStructureValid()) {
+ LOG.error("Cluster: " + _clusterId + " structure is not valid");
+ return false;
+ }
+
+ StateModelDefinitionAccessor smdAccessor = new StateModelDefinitionAccessor(_accessor);
+ return smdAccessor.setStateModelDefinition(stateModelDef);
+ }
+
+ /**
+ * Remove a state model definition if it exists
+ * @param stateModelDefId state model definition id
+ * @return true if removed, false if it did not exist
+ */
+ public boolean dropStateModelDefinitionFromCluster(StateModelDefId stateModelDefId) {
+ return _accessor.removeProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
index ba1e78e..590fb01 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
@@ -18,6 +18,7 @@ import org.apache.log4j.Logger;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -224,7 +225,7 @@ public class ClusterConfig {
}
/**
- * Check the pasued status of the cluster
+ * Check the paused status of the cluster
* @return true if paused, false otherwise
*/
public boolean isPaused() {
@@ -232,6 +233,205 @@ public class ClusterConfig {
}
/**
+ * Update context for a ClusterConfig
+ */
+ public static class Delta {
+ private enum Fields {
+ PAUSE_STATUS,
+ USER_CONFIG
+ }
+
+ private Set<Fields> _updateFields;
+ private Map<ConstraintType, Set<ConstraintId>> _removedConstraints;
+ private Builder _builder;
+
+ /**
+ * Instantiate the delta for a cluster config
+ * @param clusterId the cluster to update
+ */
+ public Delta(ClusterId clusterId) {
+ _updateFields = Sets.newHashSet();
+ _removedConstraints = Maps.newHashMap();
+ for (ConstraintType type : ConstraintType.values()) {
+ Set<ConstraintId> constraints = Sets.newHashSet();
+ _removedConstraints.put(type, constraints);
+ }
+ _builder = new Builder(clusterId);
+ }
+
+ /**
+ * Add a state upper bound constraint
+ * @param scope scope under which the constraint is valid
+ * @param stateModelDefId identifier of the state model that owns the state
+ * @param state the state to constrain
+ * @param upperBound maximum number of replicas per partition in the state
+ * @return Delta
+ */
+ public Delta addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ State state, int upperBound) {
+ return addStateUpperBoundConstraint(scope, stateModelDefId, state,
+ Integer.toString(upperBound));
+ }
+
+ /**
+ * Add a state upper bound constraint
+ * @param scope scope under which the constraint is valid
+ * @param stateModelDefId identifier of the state model that owns the state
+ * @param state the state to constrain
+ * @param dynamicUpperBound the upper bound of replicas per partition in the state, can be a
+ * number, or the currently supported special bound values:<br />
+ * "R" - Refers to the number of replicas specified during resource
+ * creation. This allows having different replication factor for each
+ * resource without having to create a different state machine. <br />
+ * "N" - Refers to all nodes in the cluster. Useful for resources that need
+ * to exist on all nodes. This way one can add/remove nodes without having
+ * the change the bounds.
+ * @return Delta
+ */
+ public Delta addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ State state, String dynamicUpperBound) {
+ _builder.addStateUpperBoundConstraint(scope, stateModelDefId, state, dynamicUpperBound);
+ return this;
+ }
+
+ /**
+ * Remove state upper bound constraint
+ * @param scope scope under which the constraint is valid
+ * @param stateModelDefId identifier of the state model that owns the state
+ * @param state the state to constrain
+ * @return Delta
+ */
+ public Delta removeStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ State state) {
+ _removedConstraints.get(ConstraintType.STATE_CONSTRAINT).add(
+ ConstraintId.from(scope, stateModelDefId, state));
+ return this;
+ }
+
+ /**
+ * Add a constraint on the maximum number of in-flight transitions of a certain type
+ * @param scope scope of the constraint
+ * @param stateModelDefId identifies the state model containing the transition
+ * @param transition the transition to constrain
+ * @param maxInFlightTransitions number of allowed in-flight transitions in the scope
+ * @return Delta
+ */
+ public Delta addTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ Transition transition, int maxInFlightTransitions) {
+ _builder.addTransitionConstraint(scope, stateModelDefId, transition, maxInFlightTransitions);
+ return this;
+ }
+
+ /**
+ * Remove a constraint on the maximum number of in-flight transitions of a certain type
+ * @param scope scope of the constraint
+ * @param stateModelDefId identifies the state model containing the transition
+ * @param transition the transition to constrain
+ * @return Delta
+ */
+ public Delta removeTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ Transition transition) {
+ _removedConstraints.get(ConstraintType.MESSAGE_CONSTRAINT).add(
+ ConstraintId.from(scope, stateModelDefId, transition));
+ return this;
+ }
+
+ /**
+ * Add a single constraint item
+ * @param type type of the constraint item
+ * @param constraintId unique constraint id
+ * @param item instantiated ConstraintItem
+ * @return Delta
+ */
+ public Delta addConstraintItem(ConstraintType type, ConstraintId constraintId,
+ ConstraintItem item) {
+ _builder.addConstraint(type, constraintId, item);
+ return this;
+ }
+
+ /**
+ * Remove a single constraint item
+ * @param type type of the constraint item
+ * @param constraintId unique constraint id
+ * @return Delta
+ */
+ public Delta removeConstraintItem(ConstraintType type, ConstraintId constraintId) {
+ _removedConstraints.get(type).add(constraintId);
+ return this;
+ }
+
+ /**
+ * Set the paused status of the cluster
+ * @param isPaused true if paused, false otherwise
+ * @return Delta
+ */
+ public Delta setPausedStatus(boolean isPaused) {
+ _builder.pausedStatus(isPaused);
+ _updateFields.add(Fields.PAUSE_STATUS);
+ return this;
+ }
+
+ /**
+ * Set the user configuration
+ * @param userConfig user-specified properties
+ * @return Builder
+ */
+ public Delta setUserConfig(UserConfig userConfig) {
+ _builder.userConfig(userConfig);
+ _updateFields.add(Fields.USER_CONFIG);
+ return this;
+ }
+
+ /**
+ * Create a ClusterConfig that is the combination of an existing ClusterConfig and this delta
+ * @param orig the original ClusterConfig
+ * @return updated ClusterConfig
+ */
+ public ClusterConfig mergeInto(ClusterConfig orig) {
+ // copy in original and updated fields
+ ClusterConfig deltaConfig = _builder.build();
+ Builder builder =
+ new Builder(orig.getId()).addResources(orig.getResourceMap().values())
+ .addParticipants(orig.getParticipantMap().values())
+ .addStateModelDefinitions(orig.getStateModelMap().values())
+ .userConfig(orig.getUserConfig()).pausedStatus(orig.isPaused());
+ for (Fields field : _updateFields) {
+ switch (field) {
+ case PAUSE_STATUS:
+ _builder.pausedStatus(deltaConfig.isPaused());
+ break;
+ case USER_CONFIG:
+ _builder.userConfig(deltaConfig.getUserConfig());
+ break;
+ }
+ }
+ // add constraint deltas
+ for (ConstraintType type : ConstraintType.values()) {
+ ClusterConstraints constraints;
+ if (orig.getConstraintMap().containsKey(type)) {
+ constraints = orig.getConstraintMap().get(type);
+ } else {
+ constraints = new ClusterConstraints(type);
+ }
+ // add new constraints
+ if (deltaConfig.getConstraintMap().containsKey(type)) {
+ ClusterConstraints deltaConstraints = deltaConfig.getConstraintMap().get(type);
+ for (ConstraintId constraintId : deltaConstraints.getConstraintItems().keySet()) {
+ ConstraintItem constraintItem = deltaConstraints.getConstraintItem(constraintId);
+ constraints.addConstraintItem(constraintId, constraintItem);
+ }
+ }
+ // remove constraints
+ for (ConstraintId constraintId : _removedConstraints.get(type)) {
+ constraints.removeConstraintItem(constraintId);
+ }
+ builder.addConstraint(constraints);
+ }
+ return builder.build();
+ }
+ }
+
+ /**
* Assembles a cluster configuration
*/
public static class Builder {
@@ -316,6 +516,19 @@ public class ClusterConfig {
}
/**
+ * Add a single constraint item
+ * @param type type of the constraint
+ * @param constraintId unique constraint identifier
+ * @param item instantiated ConstraintItem
+ * @return Builder
+ */
+ public Builder addConstraint(ConstraintType type, ConstraintId constraintId, ConstraintItem item) {
+ ClusterConstraints existConstraints = getConstraintsInstance(type);
+ existConstraints.addConstraintItem(constraintId, item);
+ return this;
+ }
+
+ /**
* Add multiple constraints to the cluster
* @param constraints cluster constraints of multiple distinct types
* @return Builder
@@ -330,7 +543,6 @@ public class ClusterConfig {
/**
* Add a constraint on the maximum number of in-flight transitions of a certain type
* @param scope scope of the constraint
- * @param constraintId unique constraint identifier
* @param stateModelDefId identifies the state model containing the transition
* @param transition the transition to constrain
* @param maxInFlightTransitions number of allowed in-flight transitions in the scope
@@ -371,7 +583,7 @@ public class ClusterConfig {
* @param stateModelDefId identifier of the state model that owns the state
* @param state the state to constrain
* @param upperBound maximum number of replicas per partition in the state
- * @return
+ * @return Builder
*/
public Builder addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
State state, int upperBound) {
@@ -451,7 +663,7 @@ public class ClusterConfig {
* @param isPaused true if paused, false otherwise
* @return Builder
*/
- public Builder setPausedStatus(boolean isPaused) {
+ public Builder pausedStatus(boolean isPaused) {
_isPaused = isPaused;
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/Participant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Participant.java b/helix-core/src/main/java/org/apache/helix/api/Participant.java
index 45d0315..8b02f0e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Participant.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Participant.java
@@ -106,7 +106,7 @@ public class Participant {
* @return set of disabled partition id's, or empty set if none
*/
public Set<PartitionId> getDisablePartitionIds() {
- return _config.getDisablePartitionIds();
+ return _config.getDisabledPartitions();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
index 46895d3..11e3608 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
@@ -64,8 +64,9 @@ public class ParticipantAccessor {
void enableParticipant(ParticipantId participantId, boolean isEnabled) {
String participantName = participantId.stringify();
if (_accessor.getProperty(_keyBuilder.instanceConfig(participantName)) == null) {
- throw new HelixException("Config for participant: " + participantId
- + " does NOT exist in cluster: " + _clusterId);
+ LOG.error("Config for participant: " + participantId + " does NOT exist in cluster: "
+ + _clusterId);
+ return;
}
InstanceConfig config = new InstanceConfig(participantName);
@@ -156,8 +157,9 @@ public class ParticipantAccessor {
// check instanceConfig exists
PropertyKey instanceConfigKey = _keyBuilder.instanceConfig(participantName);
if (_accessor.getProperty(instanceConfigKey) == null) {
- throw new HelixException("Config for participant: " + participantId
- + " does NOT exist in cluster: " + _clusterId);
+ LOG.error("Config for participant: " + participantId + " does NOT exist in cluster: "
+ + _clusterId);
+ return;
}
// check resource exist. warn if not
@@ -253,11 +255,21 @@ public class ParticipantAccessor {
}
/**
- * create live instance for the participant
- * @param participantId
+ * Update a participant configuration
+ * @param participantId the participant to update
+ * @param participantDelta changes to the participant
+ * @return ParticipantConfig, or null if participant is not persisted
*/
- public void connectParticipant(ParticipantId participantId) {
- // TODO impl this
+ public ParticipantConfig updateParticipant(ParticipantId participantId,
+ ParticipantConfig.Delta participantDelta) {
+ Participant participant = readParticipant(participantId);
+ if (participant == null) {
+ LOG.error("Participant " + participantId + " does not exist, cannot be updated");
+ return null;
+ }
+ ParticipantConfig config = participantDelta.mergeInto(participant.getConfig());
+ // TODO: persist this
+ return config;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java b/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
index f41056c..5498ca3 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
@@ -4,6 +4,7 @@ import java.util.HashSet;
import java.util.Set;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -84,7 +85,7 @@ public class ParticipantConfig {
* Get disabled partition id's
* @return set of disabled partition id's, or empty set if none
*/
- public Set<PartitionId> getDisablePartitionIds() {
+ public Set<PartitionId> getDisabledPartitions() {
return _disabledPartitions;
}
@@ -122,6 +123,159 @@ public class ParticipantConfig {
}
/**
+ * Update context for a ParticipantConfig
+ */
+ public static class Delta {
+ private enum Fields {
+ HOST_NAME,
+ PORT,
+ ENABLED,
+ USER_CONFIG
+ }
+
+ private Set<Fields> _updateFields;
+ private Set<String> _removedTags;
+ private Set<PartitionId> _removedDisabledPartitions;
+ private Builder _builder;
+
+ /**
+ * Instantiate the delta for a participant config
+ * @param participantId the participant to update
+ */
+ public Delta(ParticipantId participantId) {
+ _updateFields = Sets.newHashSet();
+ _removedTags = Sets.newHashSet();
+ _removedDisabledPartitions = Sets.newHashSet();
+ _builder = new Builder(participantId);
+ }
+
+ /**
+ * Set the participant host name
+ * @param hostName reachable host when live
+ * @return Delta
+ */
+ public Delta setHostName(String hostName) {
+ _builder.hostName(hostName);
+ _updateFields.add(Fields.HOST_NAME);
+ return this;
+ }
+
+ /**
+ * Set the participant port
+ * @param port port number
+ * @return Delta
+ */
+ public Delta setPort(int port) {
+ _builder.port(port);
+ _updateFields.add(Fields.PORT);
+ return this;
+ }
+
+ /**
+ * Set whether or not the participant is enabled
+ * @param isEnabled true if enabled, false otherwise
+ * @return Delta
+ */
+ public Delta setEnabled(boolean isEnabled) {
+ _builder.enabled(isEnabled);
+ _updateFields.add(Fields.ENABLED);
+ return this;
+ }
+
+ /**
+ * Set the user configuration
+ * @param userConfig user-specified properties
+ * @return Delta
+ */
+ public Delta setUserConfig(UserConfig userConfig) {
+ _builder.userConfig(userConfig);
+ _updateFields.add(Fields.USER_CONFIG);
+ return this;
+ }
+
+ /**
+ * Add an new tag for this participant
+ * @param tag the tag to add
+ * @return Delta
+ */
+ public Delta addTag(String tag) {
+ _builder.addTag(tag);
+ return this;
+ }
+
+ /**
+ * Remove a tag for this participant
+ * @param tag the tag to remove
+ * @return Delta
+ */
+ public Delta removeTag(String tag) {
+ _removedTags.add(tag);
+ return this;
+ }
+
+ /**
+ * Add a partition to disable for this participant
+ * @param partitionId the partition to disable
+ * @return Delta
+ */
+ public Delta addDisabledPartition(PartitionId partitionId) {
+ _builder.addDisabledPartition(partitionId);
+ return this;
+ }
+
+ /**
+ * Remove a partition from the disabled set for this participant
+ * @param partitionId the partition to enable
+ * @return Delta
+ */
+ public Delta removeDisabledPartition(PartitionId partitionId) {
+ _removedDisabledPartitions.add(partitionId);
+ return this;
+ }
+
+ /**
+ * Create a ParticipantConfig that is the combination of an existing ParticipantConfig and this
+ * delta
+ * @param orig the original ParticipantConfig
+ * @return updated ParticipantConfig
+ */
+ public ParticipantConfig mergeInto(ParticipantConfig orig) {
+ ParticipantConfig deltaConfig = _builder.build();
+ Builder builder =
+ new Builder(orig.getId()).hostName(orig.getHostName()).port(orig.getPort())
+ .userConfig(orig.getUserConfig());
+ for (Fields field : _updateFields) {
+ switch (field) {
+ case HOST_NAME:
+ builder.hostName(deltaConfig.getHostName());
+ break;
+ case PORT:
+ builder.port(deltaConfig.getPort());
+ break;
+ case ENABLED:
+ builder.enabled(deltaConfig.isEnabled());
+ break;
+ case USER_CONFIG:
+ builder.userConfig(deltaConfig.getUserConfig());
+ break;
+ }
+ }
+ Set<String> tags = Sets.newHashSet(orig.getTags());
+ tags.addAll(deltaConfig.getTags());
+ tags.removeAll(_removedTags);
+ for (String tag : tags) {
+ builder.addTag(tag);
+ }
+ Set<PartitionId> disabledPartitions = Sets.newHashSet(orig.getDisabledPartitions());
+ disabledPartitions.addAll(deltaConfig.getDisabledPartitions());
+ for (PartitionId partitionId : disabledPartitions) {
+ builder.addDisabledPartition(partitionId);
+ }
+ return builder.build();
+ }
+ }
+
+ /**
* Assemble a participant
*/
public static class Builder {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 7816888..2c3b7ca 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -43,6 +43,7 @@ public class Resource {
/**
* Construct a resource
* @param id resource id
+ * @param type ResourceType type
* @param idealState ideal state of the resource
* @param externalView external view of the resource
* @param resourceAssignment current resource assignment of the cluster
@@ -51,9 +52,10 @@ public class Resource {
* @param bucketSize the bucket size to use for physically saved state
* @param batchMessageMode true if batch messaging allowed, false otherwise
*/
- public Resource(ResourceId id, IdealState idealState, ResourceAssignment resourceAssignment,
- ExternalView externalView, RebalancerContext rebalancerContext, UserConfig userConfig,
- int bucketSize, boolean batchMessageMode) {
+ public Resource(ResourceId id, ResourceType type, IdealState idealState,
+ ResourceAssignment resourceAssignment, ExternalView externalView,
+ RebalancerContext rebalancerContext, UserConfig userConfig, int bucketSize,
+ boolean batchMessageMode) {
Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
new HashMap<PartitionId, Map<String, String>>();
Set<PartitionId> partitionSet = idealState.getPartitionSet();
@@ -71,8 +73,8 @@ public class Resource {
RebalancerConfig rebalancerConfig = new RebalancerConfig(rebalancerContext);
_config =
- new ResourceConfig(id, ResourceType.DATA, schedulerTaskConfig, rebalancerConfig,
- userConfig, bucketSize, batchMessageMode);
+ new ResourceConfig(id, type, schedulerTaskConfig, rebalancerConfig, userConfig, bucketSize,
+ batchMessageMode);
_externalView = externalView;
_resourceAssignment = resourceAssignment;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
index bdc44c7..5adec4e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
@@ -22,6 +22,7 @@ package org.apache.helix.api;
import org.apache.helix.HelixConstants.StateModelToken;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
+import org.apache.helix.api.ResourceConfig.ResourceType;
import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
@@ -32,8 +33,10 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfiguration;
+import org.apache.log4j.Logger;
public class ResourceAccessor {
+ private static final Logger LOG = Logger.getLogger(ResourceAccessor.class);
private final HelixDataAccessor _accessor;
private final PropertyKey.Builder _keyBuilder;
@@ -59,6 +62,23 @@ public class ResourceAccessor {
}
/**
+ * Update a resource configuration
+ * @param resourceId the resource id to update
+ * @param resourceDelta changes to the resource
+ * @return ResourceConfig, or null if the resource is not persisted
+ */
+ public ResourceConfig updateResource(ResourceId resourceId, ResourceConfig.Delta resourceDelta) {
+ Resource resource = readResource(resourceId);
+ if (resource == null) {
+ LOG.error("Resource " + resourceId + " does not exist, cannot be updated");
+ return null;
+ }
+ ResourceConfig config = resourceDelta.mergeInto(resource.getConfig());
+ // TODO: persist this
+ return config;
+ }
+
+ /**
* save resource assignment
* @param resourceId
* @param resourceAssignment
@@ -73,8 +93,8 @@ public class ResourceAccessor {
* @param resourceId
* @return resource assignment or null
*/
- public void getResourceAssignment(ResourceId resourceId) {
- _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
+ public ResourceAssignment getResourceAssignment(ResourceId resourceId) {
+ return _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
}
/**
@@ -182,10 +202,11 @@ public class ResourceAccessor {
static Resource createResource(ResourceId resourceId,
ResourceConfiguration resourceConfiguration, IdealState idealState,
ExternalView externalView, ResourceAssignment resourceAssignment) {
- // TODO pass resource assignment
UserConfig userConfig;
+ ResourceType type = ResourceType.DATA;
if (resourceConfiguration != null) {
userConfig = UserConfig.from(resourceConfiguration);
+ type = resourceConfiguration.getType();
} else {
userConfig = new UserConfig(Scope.resource(resourceId));
}
@@ -206,7 +227,7 @@ public class ResourceAccessor {
rebalancerContext = new PartitionedRebalancerContext(RebalanceMode.NONE);
}
}
- return new Resource(resourceId, idealState, resourceAssignment, externalView,
+ return new Resource(resourceId, type, idealState, resourceAssignment, externalView,
rebalancerContext, userConfig, bucketSize, batchMessageMode);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
index a21301b..dc3dc1d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
@@ -6,6 +6,8 @@ import java.util.Set;
import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import com.google.common.collect.Sets;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -156,6 +158,123 @@ public class ResourceConfig {
}
/**
+ * Update context for a ResourceConfig
+ */
+ public static class Delta {
+ private enum Fields {
+ TYPE,
+ REBALANCER_CONTEXT,
+ USER_CONFIG,
+ BUCKET_SIZE,
+ BATCH_MESSAGE_MODE
+ }
+
+ private Set<Fields> _updateFields;
+ private Builder _builder;
+
+ /**
+ * Instantiate the delta for a resource config
+ * @param resourceId the resource to update
+ */
+ public Delta(ResourceId resourceId) {
+ _builder = new Builder(resourceId);
+ _updateFields = Sets.newHashSet();
+ }
+
+ /**
+ * Set the type of this resource
+ * @param type ResourceType
+ * @return Delta
+ */
+ public Delta setType(ResourceType type) {
+ _builder.type(type);
+ _updateFields.add(Fields.TYPE);
+ return this;
+ }
+
+ /**
+ * Set the rebalancer configuration
+ * @param context properties of interest for rebalancing
+ * @return Delta
+ */
+ public Delta setRebalancerContext(RebalancerContext context) {
+ _builder.rebalancerContext(context);
+ _updateFields.add(Fields.REBALANCER_CONTEXT);
+ return this;
+ }
+
+ /**
+ * Set the user configuration
+ * @param userConfig user-specified properties
+ * @return Delta
+ */
+ public Delta setUserConfig(UserConfig userConfig) {
+ _builder.userConfig(userConfig);
+ _updateFields.add(Fields.USER_CONFIG);
+ return this;
+ }
+
+ /**
+ * Set the bucket size
+ * @param bucketSize the size to use
+ * @return Delta
+ */
+ public Delta setBucketSize(int bucketSize) {
+ _builder.bucketSize(bucketSize);
+ _updateFields.add(Fields.BUCKET_SIZE);
+ return this;
+ }
+
+ /**
+ * Set the batch message mode
+ * @param batchMessageMode true to enable, false to disable
+ * @return Delta
+ */
+ public Delta setBatchMessageMode(boolean batchMessageMode) {
+ _builder.batchMessageMode(batchMessageMode);
+ _updateFields.add(Fields.BATCH_MESSAGE_MODE);
+ return this;
+ }
+
+ /**
+ * Create a ResourceConfig that is the combination of an existing ResourceConfig and this delta
+ * @param orig the original ResourceConfig
+ * @return updated ResourceConfig
+ */
+ public ResourceConfig mergeInto(ResourceConfig orig) {
+ ResourceConfig deltaConfig = _builder.build();
+ Builder builder =
+ new Builder(orig.getId())
+ .type(orig.getType())
+ .rebalancerContext(
+ orig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class))
+ .schedulerTaskConfig(orig.getSchedulerTaskConfig()).userConfig(orig.getUserConfig())
+ .bucketSize(orig.getBucketSize()).batchMessageMode(orig.getBatchMessageMode());
+ for (Fields field : _updateFields) {
+ switch (field) {
+ case TYPE:
+ builder.type(deltaConfig.getType());
+ break;
+ case REBALANCER_CONTEXT:
+ builder.rebalancerContext(deltaConfig.getRebalancerConfig().getRebalancerContext(
+ RebalancerContext.class));
+ break;
+ case USER_CONFIG:
+ builder.userConfig(deltaConfig.getUserConfig());
+ break;
+ case BUCKET_SIZE:
+ builder.bucketSize(deltaConfig.getBucketSize());
+ break;
+ case BATCH_MESSAGE_MODE:
+ builder.batchMessageMode(deltaConfig.getBatchMessageMode());
+ break;
+ }
+ }
+ return builder.build();
+ }
+ }
+
+ /**
* Assembles a ResourceConfig
*/
public static class Builder {
@@ -191,7 +310,7 @@ public class ResourceConfig {
/**
* Set the rebalancer configuration
- * @param rebalancerConfig properties of interest for rebalancing
+ * @param rebalancerContext properties of interest for rebalancing
* @return Builder
*/
public Builder rebalancerContext(RebalancerContext rebalancerContext) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java b/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
index 988484b..60b6210 100644
--- a/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
@@ -59,11 +59,11 @@ public class StateModelDefinitionAccessor {
}
/**
- * Add a state model definition. Updates the existing state model definition if it already exists.
+ * Set a state model definition. Adds the state model definition if it does not exist
* @param stateModelDef fully initialized state model definition
* @return true if the model is persisted, false otherwise
*/
- public boolean addStateModelDefinition(StateModelDefinition stateModelDef) {
+ public boolean setStateModelDefinition(StateModelDefinition stateModelDef) {
return _accessor.setProperty(_keyBuilder.stateModelDef(stateModelDef.getId()), stateModelDef);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
index 5165ba7..3925c2b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
@@ -13,9 +13,8 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.testng.collections.Maps;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -62,7 +61,7 @@ public class PartitionedRebalancerContext extends BasicRebalancerContext impleme
/**
* Get a map from partition id to partition
- * @return partition map
+ * @return partition map (mutable)
*/
public Map<PartitionId, Partition> getPartitionMap() {
return _partitionMap;
@@ -73,7 +72,7 @@ public class PartitionedRebalancerContext extends BasicRebalancerContext impleme
* @param partitionMap partition map
*/
public void setPartitionMap(Map<PartitionId, Partition> partitionMap) {
- _partitionMap = ImmutableMap.copyOf(partitionMap);
+ _partitionMap = Maps.newHashMap(partitionMap);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
index 26f134b..7d5fed2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
@@ -105,7 +105,9 @@ public final class RebalancerConfig {
String serialized = _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
return _serializer.deserialize(contextClass, serialized);
} catch (ClassNotFoundException e) {
- LOG.info(className + " is not a valid class");
+ LOG.error(className + " is not a valid class");
+ } catch (ClassCastException e) {
+ LOG.error(className + " does not implement RebalancerContext");
}
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
index 8966434..c753f22 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
@@ -1,16 +1,10 @@
package org.apache.helix.model;
-import java.util.List;
-
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
-import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceConfig.ResourceType;
import org.apache.helix.api.ResourceId;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.collect.Lists;
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -35,7 +29,7 @@ import com.google.common.collect.Lists;
*/
public class ResourceConfiguration extends HelixProperty {
public enum Fields {
- PARTITION_LIST
+ TYPE
}
/**
@@ -63,29 +57,18 @@ public class ResourceConfiguration extends HelixProperty {
}
/**
- * Set the partitions for this resource
- * @param partitionIds list of partition ids
+ * Set the resource type
+ * @param type ResourceType type
*/
- public void setPartitionIds(List<PartitionId> partitionIds) {
- _record.setListField(Fields.PARTITION_LIST.toString(),
- Lists.transform(partitionIds, Functions.toStringFunction()));
+ public void setType(ResourceType type) {
+ _record.setEnumField(Fields.TYPE.toString(), type);
}
/**
- * Get the partitions for this resource
- * @return list of partition ids
+ * Get the resource type
+ * @return ResourceType type
*/
- public List<PartitionId> getPartitionIds() {
- List<String> partitionNames = _record.getListField(Fields.PARTITION_LIST.toString());
- if (partitionNames != null) {
- return Lists.transform(partitionNames, new Function<String, PartitionId>() {
- @Override
- public PartitionId apply(String partitionName) {
- return PartitionId.from(partitionName);
- }
- });
- }
- return null;
+ public ResourceType getType() {
+ return _record.getEnumField(Fields.TYPE.toString(), ResourceType.class, ResourceType.DATA);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index 262f779..141f6b7 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -34,6 +34,7 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.Resource;
import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceConfig.ResourceType;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.Scope;
import org.apache.helix.api.StateModelDefId;
@@ -170,8 +171,8 @@ public class BaseStageTest {
ResourceId resourceId = idealState.getResourceId();
RebalancerContext context = PartitionedRebalancerContext.from(idealState);
Resource resource =
- new Resource(resourceId, idealState, null, null, context, new UserConfig(
- Scope.resource(resourceId)), idealState.getBucketSize(),
+ new Resource(resourceId, ResourceType.DATA, idealState, null, null, context,
+ new UserConfig(Scope.resource(resourceId)), idealState.getBucketSize(),
idealState.getBatchMessageMode());
resourceMap.put(resourceId, resource.getConfig());
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/89b3bc59/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
index 6791bfd..72ea9f8 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
@@ -50,6 +50,9 @@ import com.google.common.collect.Lists;
* under the License.
*/
+/**
+ * Example showing all major interactions with the new Helix logical model
+ */
public class NewModelExample {
private static final Logger LOG = Logger.getLogger(NewModelExample.class);
@@ -130,10 +133,10 @@ public class NewModelExample {
ResourceId resourceId = ResourceId.from("exampleResource");
// create a partition
- Partition partition1 = new Partition(PartitionId.from("partition1"));
+ Partition partition1 = new Partition(PartitionId.from(resourceId, "1"));
// create a second partition
- Partition partition2 = new Partition(PartitionId.from("partition2"));
+ Partition partition2 = new Partition(PartitionId.from(resourceId, "2"));
// specify the rebalancer configuration
// this resource will be rebalanced in FULL_AUTO mode, so use the FullAutoRebalancerConfig