You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/12/04 02:21:07 UTC
[4/4] git commit: [HELIX-327] Simplify rebalancer,
rename rebalancer configs, support settable contexts, rb=15981
[HELIX-327] Simplify rebalancer, rename rebalancer configs, support settable contexts, rb=15981
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/015e7dda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/015e7dda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/015e7dda
Branch: refs/heads/master
Commit: 015e7dda2c1f96beb6d387ba71c118d246840f1b
Parents: 7521c0c
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Dec 3 16:52:23 2013 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Dec 3 17:20:31 2013 -0800
----------------------------------------------------------------------
.../main/java/org/apache/helix/PropertyKey.java | 18 +
.../org/apache/helix/PropertyPathConfig.java | 5 +
.../java/org/apache/helix/PropertyType.java | 3 +-
.../main/java/org/apache/helix/api/Cluster.java | 20 +-
.../java/org/apache/helix/api/Resource.java | 9 +-
.../api/accessor/AtomicResourceAccessor.java | 6 +-
.../helix/api/accessor/ClusterAccessor.java | 52 ++-
.../helix/api/accessor/ParticipantAccessor.java | 23 +-
.../helix/api/accessor/ResourceAccessor.java | 138 ++++---
.../apache/helix/api/config/ResourceConfig.java | 32 +-
.../java/org/apache/helix/api/id/ContextId.java | 57 +++
.../controller/GenericHelixController.java | 8 +-
.../context/BasicControllerContext.java | 60 +++
.../controller/context/ControllerContext.java | 40 ++
.../context/ControllerContextHolder.java | 156 +++++++
.../context/ControllerContextProvider.java | 120 ++++++
.../controller/rebalancer/CustomRebalancer.java | 14 +-
.../rebalancer/FallbackRebalancer.java | 20 +-
.../rebalancer/FullAutoRebalancer.java | 16 +-
.../controller/rebalancer/HelixRebalancer.java | 20 +-
.../rebalancer/SemiAutoRebalancer.java | 18 +-
.../config/BasicRebalancerConfig.java | 256 ++++++++++++
.../config/CustomRebalancerConfig.java | 164 ++++++++
.../config/FullAutoRebalancerConfig.java | 64 +++
.../config/PartitionedRebalancerConfig.java | 405 +++++++++++++++++++
.../rebalancer/config/RebalancerConfig.java | 95 +++++
.../config/RebalancerConfigHolder.java | 185 +++++++++
.../config/ReplicatedRebalancerConfig.java | 40 ++
.../config/SemiAutoRebalancerConfig.java | 178 ++++++++
.../context/BasicRebalancerContext.java | 240 -----------
.../rebalancer/context/ContextSerializer.java | 37 --
.../context/CustomRebalancerContext.java | 164 --------
.../context/DefaultContextSerializer.java | 83 ----
.../context/FullAutoRebalancerContext.java | 64 ---
.../context/PartitionedRebalancerContext.java | 393 ------------------
.../rebalancer/context/RebalancerConfig.java | 182 ---------
.../rebalancer/context/RebalancerContext.java | 94 -----
.../context/ReplicatedRebalancerContext.java | 40 --
.../context/SemiAutoRebalancerContext.java | 178 --------
.../serializer/DefaultStringSerializer.java | 82 ++++
.../controller/serializer/StringSerializer.java | 37 ++
.../helix/controller/stages/AttributeName.java | 3 +-
.../stages/BestPossibleStateCalcStage.java | 26 +-
.../stages/ExternalViewComputeStage.java | 10 +-
.../stages/MessageGenerationStage.java | 16 +-
.../stages/MessageSelectionStage.java | 21 +-
.../stages/PersistAssignmentStage.java | 14 +-
.../controller/stages/PersistContextStage.java | 59 +++
.../controller/stages/ReadClusterDataStage.java | 17 +
.../stages/ResourceComputationStage.java | 25 +-
.../helix/model/ResourceConfiguration.java | 14 +-
.../org/apache/helix/tools/NewClusterSetup.java | 66 +--
.../org/apache/helix/api/TestNewStages.java | 22 +-
.../org/apache/helix/api/TestUpdateConfig.java | 27 +-
.../context/TestSerializeRebalancerContext.java | 20 +-
.../helix/controller/stages/BaseStageTest.java | 6 +-
.../stages/TestResourceComputationStage.java | 13 +-
.../TestCustomizedIdealStateRebalancer.java | 35 +-
.../helix/integration/TestHelixConnection.java | 10 +-
.../helix/examples/LogicalModelExample.java | 10 +-
.../LockManagerRebalancer.java | 18 +-
61 files changed, 2450 insertions(+), 1798 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index ebd7a35..2af63a5 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -24,6 +24,7 @@ import static org.apache.helix.PropertyType.ALERT_HISTORY;
import static org.apache.helix.PropertyType.ALERT_STATUS;
import static org.apache.helix.PropertyType.CLUSTER;
import static org.apache.helix.PropertyType.CONFIGS;
+import static org.apache.helix.PropertyType.CONTEXT;
import static org.apache.helix.PropertyType.CONTROLLER;
import static org.apache.helix.PropertyType.CURRENTSTATES;
import static org.apache.helix.PropertyType.ERRORS;
@@ -46,6 +47,7 @@ import static org.apache.helix.PropertyType.STATUSUPDATES_CONTROLLER;
import java.util.Arrays;
+import org.apache.helix.controller.context.ControllerContextHolder;
import org.apache.helix.model.AlertHistory;
import org.apache.helix.model.AlertStatus;
import org.apache.helix.model.Alerts;
@@ -721,6 +723,22 @@ public class PropertyKey {
public PropertyKey propertyStore() {
return new PropertyKey(PROPERTYSTORE, null, _clusterName);
}
+
+ /**
+ * Get a propertykey associated with the root of the Helix contexts
+ * @return {@link PropertyKey}
+ */
+ public PropertyKey controllerContexts() {
+ return new PropertyKey(CONTEXT, ControllerContextHolder.class, _clusterName);
+ }
+
+ /**
+ * Get a propertykey associated with the root of the Helix contexts
+ * @return {@link PropertyKey}
+ */
+ public PropertyKey controllerContext(String contextId) {
+ return new PropertyKey(CONTEXT, ControllerContextHolder.class, _clusterName, contextId);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java b/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
index 60a92f4..d66e7d9 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
@@ -22,6 +22,7 @@ package org.apache.helix;
import static org.apache.helix.PropertyType.ALERTS;
import static org.apache.helix.PropertyType.ALERT_STATUS;
import static org.apache.helix.PropertyType.CONFIGS;
+import static org.apache.helix.PropertyType.CONTEXT;
import static org.apache.helix.PropertyType.CURRENTSTATES;
import static org.apache.helix.PropertyType.EXTERNALVIEW;
import static org.apache.helix.PropertyType.HEALTHREPORT;
@@ -40,6 +41,7 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.helix.controller.context.ControllerContextHolder;
import org.apache.helix.model.AlertStatus;
import org.apache.helix.model.Alerts;
import org.apache.helix.model.CurrentState;
@@ -81,6 +83,7 @@ public class PropertyPathConfig {
typeToClassMapping.put(ALERT_STATUS, AlertStatus.class);
typeToClassMapping.put(PAUSE, PauseSignal.class);
typeToClassMapping.put(RESOURCEASSIGNMENTS, ResourceAssignment.class);
+ typeToClassMapping.put(CONTEXT, ControllerContextHolder.class);
// @formatter:off
addEntry(PropertyType.CLUSTER, 1, "/{clusterName}");
@@ -148,6 +151,8 @@ public class PropertyPathConfig {
addEntry(PropertyType.ALERTS, 1, "/{clusterName}/CONTROLLER/ALERTS");
addEntry(PropertyType.ALERT_STATUS, 1, "/{clusterName}/CONTROLLER/ALERT_STATUS");
addEntry(PropertyType.ALERT_HISTORY, 1, "/{clusterName}/CONTROLLER/ALERT_HISTORY");
+ addEntry(PropertyType.CONTEXT, 1, "/{clusterName}/CONTROLLER/CONTEXT");
+ addEntry(PropertyType.CONTEXT, 2, "/{clusterName}/CONTROLLER/CONTEXT/{contextId}");
// @formatter:on
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/PropertyType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyType.java b/helix-core/src/main/java/org/apache/helix/PropertyType.java
index 680dc06..75adb20 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyType.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyType.java
@@ -64,7 +64,8 @@ public enum PropertyType {
PERSISTENTSTATS(Type.CONTROLLER, true, false, false, false),
ALERTS(Type.CONTROLLER, true, false, false, false),
ALERT_STATUS(Type.CONTROLLER, true, false, false, false),
- ALERT_HISTORY(Type.CONTROLLER, true, false, false, false);
+ ALERT_HISTORY(Type.CONTROLLER, true, false, false, false),
+ CONTEXT(Type.CONTROLLER, true, false);
// @formatter:on
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index fdeb879..98072d1 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -27,11 +27,13 @@ import org.apache.helix.api.config.ParticipantConfig;
import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ContextId;
import org.apache.helix.api.id.ControllerId;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SpectatorId;
import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.context.ControllerContext;
import org.apache.helix.model.Alerts;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -73,6 +75,8 @@ public class Cluster {
*/
private final Map<SpectatorId, Spectator> _spectatorMap;
+ private final Map<ContextId, ControllerContext> _contextMap;
+
private final ControllerId _leaderId;
private final ClusterConfig _config;
@@ -86,6 +90,7 @@ public class Cluster {
* @param leaderId
* @param constraintMap
* @param stateModelMap
+ * @param contextMap
* @param stats
* @param alerts
* @param userConfig
@@ -95,8 +100,9 @@ public class Cluster {
public Cluster(ClusterId id, Map<ResourceId, Resource> resourceMap,
Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap,
- Map<StateModelDefId, StateModelDefinition> stateModelMap, PersistentStats stats,
- Alerts alerts, UserConfig userConfig, boolean isPaused, boolean autoJoinAllowed) {
+ Map<StateModelDefId, StateModelDefinition> stateModelMap,
+ Map<ContextId, ControllerContext> contextMap, PersistentStats stats, Alerts alerts,
+ UserConfig userConfig, boolean isPaused, boolean autoJoinAllowed) {
// build the config
// Guava's transform and "copy" functions really return views so the maps all reflect each other
@@ -137,6 +143,8 @@ public class Cluster {
_leaderId = leaderId;
+ _contextMap = ImmutableMap.copyOf(contextMap);
+
// TODO impl this when we persist controllers and spectators on zookeeper
_controllerMap = ImmutableMap.copyOf(controllerMap);
_spectatorMap = Collections.emptyMap();
@@ -224,6 +232,14 @@ public class Cluster {
}
/**
+ * Get all the controller context currently on the cluster
+ * @return map of context id to controller context objects
+ */
+ public Map<ContextId, ControllerContext> getContextMap() {
+ return _contextMap;
+ }
+
+ /**
* Get all the persisted stats for the cluster
* @return PersistentStats instance
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 79a1e09..0726510 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -30,8 +30,7 @@ import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
@@ -52,18 +51,16 @@ public class Resource {
* @param idealState ideal state of the resource
* @param externalView external view of the resource
* @param resourceAssignment current resource assignment of the cluster
- * @param rebalancerContext contextual parameters that the rebalancer should be aware of
+ * @param rebalancerConfig parameters that the rebalancer should be aware of
* @param userConfig any resource user-defined configuration
* @param bucketSize the bucket size to use for physically saved state
* @param batchMessageMode true if batch messaging allowed, false otherwise
*/
public Resource(ResourceId id, ResourceType type, IdealState idealState,
ResourceAssignment resourceAssignment, ExternalView externalView,
- RebalancerContext rebalancerContext, UserConfig userConfig, int bucketSize,
+ RebalancerConfig rebalancerConfig, UserConfig userConfig, int bucketSize,
boolean batchMessageMode) {
SchedulerTaskConfig schedulerTaskConfig = schedulerTaskConfig(idealState);
- RebalancerConfig rebalancerConfig = new RebalancerConfig(rebalancerContext);
-
_config =
new ResourceConfig(id, type, schedulerTaskConfig, rebalancerConfig, userConfig, bucketSize,
batchMessageMode);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
index 6d69981..cda83d8 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
@@ -6,7 +6,7 @@ import org.apache.helix.api.Scope;
import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.lock.HelixLock;
import org.apache.helix.lock.HelixLockable;
import org.apache.log4j.Logger;
@@ -96,12 +96,12 @@ public class AtomicResourceAccessor extends ResourceAccessor {
}
@Override
- public boolean setRebalancerContext(ResourceId resourceId, RebalancerContext context) {
+ public boolean setRebalancerConfig(ResourceId resourceId, RebalancerConfig config) {
HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
boolean locked = lock.lock();
if (locked) {
try {
- return _resourceAccessor.setRebalancerContext(resourceId, context);
+ return _resourceAccessor.setRebalancerConfig(resourceId, config);
} finally {
lock.unlock();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index b01a3ec..36c7aaa 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -45,14 +45,18 @@ import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ConstraintId;
+import org.apache.helix.api.id.ContextId;
import org.apache.helix.api.id.ControllerId;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SessionId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.context.ControllerContext;
+import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
import org.apache.helix.model.Alerts;
import org.apache.helix.model.ClusterConfiguration;
import org.apache.helix.model.ClusterConstraints;
@@ -60,6 +64,7 @@ import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
@@ -266,9 +271,13 @@ public class ClusterAccessor {
// read the alerts
Alerts alerts = _accessor.getProperty(_keyBuilder.alerts());
+ // read controller context
+ Map<ContextId, ControllerContext> contextMap = readControllerContext();
+
// create the cluster snapshot object
return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
- clusterConstraintMap, stateModelMap, stats, alerts, userConfig, isPaused, autoJoinAllowed);
+ clusterConstraintMap, stateModelMap, contextMap, stats, alerts, userConfig, isPaused,
+ autoJoinAllowed);
}
/**
@@ -459,6 +468,20 @@ public class ClusterAccessor {
}
/**
+ * Read the persisted controller contexts
+ * @return map of context id to controller context
+ */
+ public Map<ContextId, ControllerContext> readControllerContext() {
+ Map<String, ControllerContextHolder> contextHolders =
+ _accessor.getChildValuesMap(_keyBuilder.controllerContexts());
+ Map<ContextId, ControllerContext> contexts = Maps.newHashMap();
+ for (String contextName : contextHolders.keySet()) {
+ contexts.put(ContextId.from(contextName), contextHolders.get(contextName).getContext());
+ }
+ return contexts;
+ }
+
+ /**
* Add a statistic specification to the cluster. Existing stat specifications will not be
* overwritten
* @param statName string representing a stat specification
@@ -623,7 +646,7 @@ public class ClusterAccessor {
*/
public boolean addResourceToCluster(ResourceConfig resource) {
if (resource == null || resource.getRebalancerConfig() == null) {
- LOG.error("Resource not fully defined with a rebalancer context");
+ LOG.error("Resource not fully defined with a rebalancer config");
return false;
}
@@ -631,9 +654,8 @@ public class ClusterAccessor {
LOG.error("Cluster: " + _clusterId + " structure is not valid");
return false;
}
- RebalancerContext context =
- resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
- StateModelDefId stateModelDefId = context.getStateModelDefId();
+ RebalancerConfig config = resource.getRebalancerConfig();
+ StateModelDefId stateModelDefId = config.getStateModelDefId();
if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
LOG.error("State model: " + stateModelDefId + " not found in cluster: " + _clusterId);
return false;
@@ -656,17 +678,21 @@ public class ClusterAccessor {
ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
configuration.setType(resource.getType());
configuration.addNamespacedConfig(resource.getUserConfig());
- configuration.addNamespacedConfig(resource.getRebalancerConfig().toNamespacedConfig());
- configuration.setBucketSize(resource.getBucketSize());
- configuration.setBatchMessageMode(resource.getBatchMessageMode());
+ PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
+ if (partitionedConfig == null
+ || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+ // only persist if this is not easily convertible to an ideal state
+ configuration
+ .addNamespacedConfig(new RebalancerConfigHolder(resource.getRebalancerConfig())
+ .toNamespacedConfig());
+ }
_accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
}
// Create an IdealState from a RebalancerConfig (if the resource is partitioned)
- RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
IdealState idealState =
- ResourceAccessor.rebalancerConfigToIdealState(rebalancerConfig, resource.getBucketSize(),
- resource.getBatchMessageMode());
+ ResourceAccessor.rebalancerConfigToIdealState(resource.getRebalancerConfig(),
+ resource.getBucketSize(), resource.getBatchMessageMode());
if (idealState != null) {
_accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index 2721d91..c3deafe 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -51,8 +51,8 @@ import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SessionId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -315,16 +315,15 @@ public class ParticipantAccessor {
return false;
}
- // need the rebalancer context for the resource
- RebalancerContext context =
- resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
- if (context == null) {
- LOG.error("Rebalancer context for resource does not exist");
+ // need the rebalancer config for the resource
+ RebalancerConfig config = resource.getRebalancerConfig();
+ if (config == null) {
+ LOG.error("Rebalancer config for resource does not exist");
return false;
}
// ensure that all partitions to reset exist
- Set<PartitionId> partitionSet = ImmutableSet.copyOf(context.getSubUnitIdSet());
+ Set<PartitionId> partitionSet = ImmutableSet.copyOf(config.getSubUnitIdSet());
if (!partitionSet.containsAll(resetPartitionIdSet)) {
LOG.error("Not all of the specified partitions to reset exist for the resource");
return false;
@@ -368,7 +367,7 @@ public class ParticipantAccessor {
}
// build messages to signal the transition
- StateModelDefId stateModelDefId = context.getStateModelDefId();
+ StateModelDefId stateModelDefId = config.getStateModelDefId();
StateModelDefinition stateModelDef =
_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify()));
Map<MessageId, Message> messageMap = Maps.newHashMap();
@@ -385,7 +384,7 @@ public class ParticipantAccessor {
message.setStateModelDef(stateModelDefId);
message.setFromState(State.from(HelixDefinedState.ERROR.toString()));
message.setToState(stateModelDef.getTypedInitialState());
- message.setStateModelFactoryId(context.getStateModelFactoryId());
+ message.setStateModelFactoryId(config.getStateModelFactoryId());
messageMap.put(message.getMessageId(), message);
}
@@ -666,8 +665,8 @@ public class ParticipantAccessor {
for (String resourceName : idealStateMap.keySet()) {
IdealState idealState = idealStateMap.get(resourceName);
swapParticipantsInIdealState(idealState, oldParticipantId, newParticipantId);
- PartitionedRebalancerContext context = PartitionedRebalancerContext.from(idealState);
- resourceAccessor.setRebalancerContext(ResourceId.from(resourceName), context);
+ PartitionedRebalancerConfig config = PartitionedRebalancerConfig.from(idealState);
+ resourceAccessor.setRebalancerConfig(ResourceId.from(resourceName), config);
_accessor.setProperty(_keyBuilder.idealStates(resourceName), idealState);
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index e5ac57c..b308b98 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -38,11 +38,12 @@ import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
@@ -131,11 +132,11 @@ public class ResourceAccessor {
* @param configuration
* @return true if set, false otherwise
*/
- private boolean setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) {
+ private boolean setConfiguration(ResourceId resourceId, ResourceConfiguration configuration,
+ RebalancerConfig rebalancerConfig) {
boolean status =
_accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
- // also set an ideal state if the resource supports it
- RebalancerConfig rebalancerConfig = new RebalancerConfig(configuration);
+ // set an ideal state if the resource supports it
IdealState idealState =
rebalancerConfigToIdealState(rebalancerConfig, configuration.getBucketSize(),
configuration.getBatchMessageMode());
@@ -146,16 +147,20 @@ public class ResourceAccessor {
}
/**
- * Set the context of the rebalancer. This includes all properties required for rebalancing this
+ * Set the config of the rebalancer. This includes all properties required for rebalancing this
* resource
* @param resourceId the resource to update
- * @param context the new rebalancer context
- * @return true if the context was set, false otherwise
+ * @param config the new rebalancer config
+ * @return true if the config was set, false otherwise
*/
- public boolean setRebalancerContext(ResourceId resourceId, RebalancerContext context) {
- RebalancerConfig config = new RebalancerConfig(context);
+ public boolean setRebalancerConfig(ResourceId resourceId, RebalancerConfig config) {
ResourceConfiguration resourceConfig = new ResourceConfiguration(resourceId);
- resourceConfig.addNamespacedConfig(config.toNamespacedConfig());
+ PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
+ if (partitionedConfig == null
+ || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+ // only persist if this is not easily convertible to an ideal state
+ resourceConfig.addNamespacedConfig(new RebalancerConfigHolder(config).toNamespacedConfig());
+ }
// update the ideal state if applicable
IdealState oldIdealState =
@@ -190,9 +195,13 @@ public class ResourceAccessor {
* @return RebalancerConfig, or null
*/
public RebalancerConfig readRebalancerConfig(ResourceId resourceId) {
+ IdealState idealState = _accessor.getProperty(_keyBuilder.idealStates(resourceId.stringify()));
+ if (idealState != null && idealState.getRebalanceMode() != RebalanceMode.USER_DEFINED) {
+ return PartitionedRebalancerConfig.from(idealState);
+ }
ResourceConfiguration resourceConfig =
_accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
- return resourceConfig != null ? RebalancerConfig.from(resourceConfig) : null;
+ return resourceConfig.getRebalancerConfig(RebalancerConfig.class);
}
/**
@@ -242,10 +251,17 @@ public class ResourceAccessor {
ResourceId resourceId = resourceConfig.getId();
ResourceConfiguration config = new ResourceConfiguration(resourceId);
config.addNamespacedConfig(resourceConfig.getUserConfig());
- config.addNamespacedConfig(resourceConfig.getRebalancerConfig().toNamespacedConfig());
+ PartitionedRebalancerConfig partitionedConfig =
+ PartitionedRebalancerConfig.from(resourceConfig.getRebalancerConfig());
+ if (partitionedConfig == null
+ || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+ // only persist if this is not easily convertible to an ideal state
+ config.addNamespacedConfig(new RebalancerConfigHolder(resourceConfig.getRebalancerConfig())
+ .toNamespacedConfig());
+ }
config.setBucketSize(resourceConfig.getBucketSize());
config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
- setConfiguration(resourceId, config);
+ setConfiguration(resourceId, config, resourceConfig.getRebalancerConfig());
return true;
}
@@ -332,28 +348,28 @@ public class ResourceAccessor {
String participantGroupTag) {
Resource resource = readResource(resourceId);
RebalancerConfig config = resource.getRebalancerConfig();
- PartitionedRebalancerContext context =
- config.getRebalancerContext(PartitionedRebalancerContext.class);
- if (context == null) {
+ PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
+ if (partitionedConfig == null) {
LOG.error("Only partitioned resource types are supported");
return false;
}
if (replicaCount != -1) {
- context.setReplicaCount(replicaCount);
+ partitionedConfig.setReplicaCount(replicaCount);
}
if (participantGroupTag != null) {
- context.setParticipantGroupTag(participantGroupTag);
+ partitionedConfig.setParticipantGroupTag(participantGroupTag);
}
StateModelDefinition stateModelDef =
- _accessor.getProperty(_keyBuilder.stateModelDef(context.getStateModelDefId().stringify()));
+ _accessor.getProperty(_keyBuilder.stateModelDef(partitionedConfig.getStateModelDefId()
+ .stringify()));
List<InstanceConfig> participantConfigs =
_accessor.getChildValues(_keyBuilder.instanceConfigs());
Set<ParticipantId> participantSet = Sets.newHashSet();
for (InstanceConfig participantConfig : participantConfigs) {
participantSet.add(participantConfig.getParticipantId());
}
- context.generateDefaultConfiguration(stateModelDef, participantSet);
- setRebalancerContext(resourceId, context);
+ partitionedConfig.generateDefaultConfiguration(stateModelDef, participantSet);
+ setRebalancerConfig(resourceId, partitionedConfig);
return true;
}
@@ -366,41 +382,40 @@ public class ResourceAccessor {
*/
static IdealState rebalancerConfigToIdealState(RebalancerConfig config, int bucketSize,
boolean batchMessageMode) {
- PartitionedRebalancerContext partitionedContext =
- config.getRebalancerContext(PartitionedRebalancerContext.class);
- if (partitionedContext != null) {
- IdealState idealState = new IdealState(partitionedContext.getResourceId());
- idealState.setRebalanceMode(partitionedContext.getRebalanceMode());
- idealState.setRebalancerRef(partitionedContext.getRebalancerRef());
+ PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
+ if (partitionedConfig != null) {
+ IdealState idealState = new IdealState(partitionedConfig.getResourceId());
+ idealState.setRebalanceMode(partitionedConfig.getRebalanceMode());
+ idealState.setRebalancerRef(partitionedConfig.getRebalancerRef());
String replicas = null;
- if (partitionedContext.anyLiveParticipant()) {
+ if (partitionedConfig.anyLiveParticipant()) {
replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
} else {
- replicas = Integer.toString(partitionedContext.getReplicaCount());
+ replicas = Integer.toString(partitionedConfig.getReplicaCount());
}
idealState.setReplicas(replicas);
- idealState.setNumPartitions(partitionedContext.getPartitionSet().size());
- idealState.setInstanceGroupTag(partitionedContext.getParticipantGroupTag());
- idealState.setMaxPartitionsPerInstance(partitionedContext.getMaxPartitionsPerParticipant());
- idealState.setStateModelDefId(partitionedContext.getStateModelDefId());
- idealState.setStateModelFactoryId(partitionedContext.getStateModelFactoryId());
+ idealState.setNumPartitions(partitionedConfig.getPartitionSet().size());
+ idealState.setInstanceGroupTag(partitionedConfig.getParticipantGroupTag());
+ idealState.setMaxPartitionsPerInstance(partitionedConfig.getMaxPartitionsPerParticipant());
+ idealState.setStateModelDefId(partitionedConfig.getStateModelDefId());
+ idealState.setStateModelFactoryId(partitionedConfig.getStateModelFactoryId());
idealState.setBucketSize(bucketSize);
idealState.setBatchMessageMode(batchMessageMode);
- if (partitionedContext.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
- SemiAutoRebalancerContext semiAutoContext =
- config.getRebalancerContext(SemiAutoRebalancerContext.class);
- for (PartitionId partitionId : semiAutoContext.getPartitionSet()) {
- idealState.setPreferenceList(partitionId, semiAutoContext.getPreferenceList(partitionId));
+ if (partitionedConfig.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+ SemiAutoRebalancerConfig semiAutoConfig =
+ BasicRebalancerConfig.convert(config, SemiAutoRebalancerConfig.class);
+ for (PartitionId partitionId : semiAutoConfig.getPartitionSet()) {
+ idealState.setPreferenceList(partitionId, semiAutoConfig.getPreferenceList(partitionId));
}
- } else if (partitionedContext.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
- CustomRebalancerContext customContext =
- config.getRebalancerContext(CustomRebalancerContext.class);
- for (PartitionId partitionId : customContext.getPartitionSet()) {
- idealState.setParticipantStateMap(partitionId,
- customContext.getPreferenceMap(partitionId));
+ } else if (partitionedConfig.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+ CustomRebalancerConfig customConfig =
+ BasicRebalancerConfig.convert(config, CustomRebalancerConfig.class);
+ for (PartitionId partitionId : customConfig.getPartitionSet()) {
+ idealState
+ .setParticipantStateMap(partitionId, customConfig.getPreferenceMap(partitionId));
}
} else {
- for (PartitionId partitionId : partitionedContext.getPartitionSet()) {
+ for (PartitionId partitionId : partitionedConfig.getPartitionSet()) {
List<ParticipantId> preferenceList = Collections.emptyList();
idealState.setPreferenceList(partitionId, preferenceList);
Map<ParticipantId, State> participantStateMap = Collections.emptyMap();
@@ -425,7 +440,7 @@ public class ResourceAccessor {
ResourceConfiguration resourceConfiguration, IdealState idealState,
ExternalView externalView, ResourceAssignment resourceAssignment) {
UserConfig userConfig;
- RebalancerContext rebalancerContext = null;
+ RebalancerConfig rebalancerConfig = null;
ResourceType type = ResourceType.DATA;
if (resourceConfiguration != null) {
userConfig = resourceConfiguration.getUserConfig();
@@ -437,14 +452,14 @@ public class ResourceAccessor {
boolean batchMessageMode = false;
if (idealState != null) {
if (resourceConfiguration != null
- && idealState.getRebalanceMode() != RebalanceMode.USER_DEFINED) {
- // prefer rebalancer context for non-user_defined data rebalancing
- rebalancerContext =
- resourceConfiguration.getRebalancerContext(PartitionedRebalancerContext.class);
+ && idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+ // prefer rebalancer config for user_defined data rebalancing
+ rebalancerConfig =
+ resourceConfiguration.getRebalancerConfig(PartitionedRebalancerConfig.class);
}
- if (rebalancerContext == null) {
+ if (rebalancerConfig == null) {
// prefer ideal state for non-user_defined data rebalancing
- rebalancerContext = PartitionedRebalancerContext.from(idealState);
+ rebalancerConfig = PartitionedRebalancerConfig.from(idealState);
}
bucketSize = idealState.getBucketSize();
batchMessageMode = idealState.getBatchMessageMode();
@@ -452,14 +467,13 @@ public class ResourceAccessor {
} else if (resourceConfiguration != null) {
bucketSize = resourceConfiguration.getBucketSize();
batchMessageMode = resourceConfiguration.getBatchMessageMode();
- RebalancerConfig rebalancerConfig = new RebalancerConfig(resourceConfiguration);
- rebalancerContext = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+ rebalancerConfig = resourceConfiguration.getRebalancerConfig(RebalancerConfig.class);
}
- if (rebalancerContext == null) {
- rebalancerContext = new PartitionedRebalancerContext();
+ if (rebalancerConfig == null) {
+ rebalancerConfig = new PartitionedRebalancerConfig();
}
return new Resource(resourceId, type, idealState, resourceAssignment, externalView,
- rebalancerContext, userConfig, bucketSize, batchMessageMode);
+ rebalancerConfig, userConfig, bucketSize, batchMessageMode);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
index 38d48ab..b148a49 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
@@ -7,8 +7,7 @@ import org.apache.helix.api.Partition;
import org.apache.helix.api.Scope;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import com.google.common.collect.Sets;
@@ -80,7 +79,7 @@ public class ResourceConfig {
* @return map of subunit id to subunit or empty map if none
*/
public Map<? extends PartitionId, ? extends Partition> getSubUnitMap() {
- return _rebalancerConfig.getRebalancerContext(RebalancerContext.class).getSubUnitMap();
+ return _rebalancerConfig.getSubUnitMap();
}
/**
@@ -167,7 +166,7 @@ public class ResourceConfig {
public static class Delta {
private enum Fields {
TYPE,
- REBALANCER_CONTEXT,
+ REBALANCER_CONFIG,
USER_CONFIG,
BUCKET_SIZE,
BATCH_MESSAGE_MODE
@@ -198,12 +197,12 @@ public class ResourceConfig {
/**
* Set the rebalancer configuration
- * @param context properties of interest for rebalancing
+ * @param config properties of interest for rebalancing
* @return Delta
*/
- public Delta setRebalancerContext(RebalancerContext context) {
- _builder.rebalancerContext(context);
- _updateFields.add(Fields.REBALANCER_CONTEXT);
+ public Delta setRebalancerConfig(RebalancerConfig config) {
+ _builder.rebalancerConfig(config);
+ _updateFields.add(Fields.REBALANCER_CONFIG);
return this;
}
@@ -248,10 +247,8 @@ public class ResourceConfig {
public ResourceConfig mergeInto(ResourceConfig orig) {
ResourceConfig deltaConfig = _builder.build();
Builder builder =
- new Builder(orig.getId())
- .type(orig.getType())
- .rebalancerContext(
- orig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class))
+ new Builder(orig.getId()).type(orig.getType())
+ .rebalancerConfig(orig.getRebalancerConfig())
.schedulerTaskConfig(orig.getSchedulerTaskConfig()).userConfig(orig.getUserConfig())
.bucketSize(orig.getBucketSize()).batchMessageMode(orig.getBatchMessageMode());
for (Fields field : _updateFields) {
@@ -259,9 +256,8 @@ public class ResourceConfig {
case TYPE:
builder.type(deltaConfig.getType());
break;
- case REBALANCER_CONTEXT:
- builder.rebalancerContext(deltaConfig.getRebalancerConfig().getRebalancerContext(
- RebalancerContext.class));
+ case REBALANCER_CONFIG:
+ builder.rebalancerConfig(deltaConfig.getRebalancerConfig());
break;
case USER_CONFIG:
builder.userConfig(deltaConfig.getUserConfig());
@@ -314,11 +310,11 @@ public class ResourceConfig {
/**
* Set the rebalancer configuration
- * @param rebalancerContext properties of interest for rebalancing
+ * @param rebalancerConfig properties of interest for rebalancing
* @return Builder
*/
- public Builder rebalancerContext(RebalancerContext rebalancerContext) {
- _rebalancerConfig = new RebalancerConfig(rebalancerContext);
+ public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
+ _rebalancerConfig = rebalancerConfig;
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/id/ContextId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/id/ContextId.java b/helix-core/src/main/java/org/apache/helix/api/id/ContextId.java
new file mode 100644
index 0000000..29a72b7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/id/ContextId.java
@@ -0,0 +1,57 @@
+package org.apache.helix.api.id;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Identifies a context
+ */
+public class ContextId extends Id {
+ @JsonProperty("id")
+ final private String _id;
+
+ /**
+ * Create a context id
+ * @param id string representation of the id
+ */
+ @JsonCreator
+ public ContextId(@JsonProperty("id") String id) {
+ _id = id;
+ }
+
+ @Override
+ public String stringify() {
+ return _id;
+ }
+
+ /**
+ * Get a concrete context id for a string name
+ * @param contextId string context identifier
+ * @return ContextId
+ */
+ public static ContextId from(String contextId) {
+ if (contextId == null) {
+ return null;
+ }
+ return new ContextId(contextId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index eec745e..96be0fa 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -44,18 +44,19 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.PipelineRegistry;
-import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.CompatibilityCheckStage;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.ExternalViewComputeStage;
import org.apache.helix.controller.stages.MessageGenerationStage;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.PersistAssignmentStage;
+import org.apache.helix.controller.stages.PersistContextStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.controller.stages.TaskAssignmentStage;
-import org.apache.helix.controller.stages.PersistAssignmentStage;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HealthStat;
@@ -183,11 +184,12 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
rebalancePipeline.addStage(new ResourceComputationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new PersistAssignmentStage());
rebalancePipeline.addStage(new MessageGenerationStage());
rebalancePipeline.addStage(new MessageSelectionStage());
rebalancePipeline.addStage(new MessageThrottleStage());
rebalancePipeline.addStage(new TaskAssignmentStage());
+ rebalancePipeline.addStage(new PersistAssignmentStage());
+ rebalancePipeline.addStage(new PersistContextStage());
// external view generation
Pipeline externalViewPipeline = new Pipeline();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/context/BasicControllerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/context/BasicControllerContext.java b/helix-core/src/main/java/org/apache/helix/controller/context/BasicControllerContext.java
new file mode 100644
index 0000000..2d98347
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/context/BasicControllerContext.java
@@ -0,0 +1,60 @@
+package org.apache.helix.controller.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.id.ContextId;
+import org.apache.helix.controller.serializer.DefaultStringSerializer;
+import org.apache.helix.controller.serializer.StringSerializer;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * A simple context that can be serialized by a {@link DefaultStringSerializer}
+ */
+public class BasicControllerContext implements ControllerContext {
+ private final ContextId _id;
+ private Class<? extends StringSerializer> _serializer;
+
+ /**
+ * Instantiate with an id
+ * @param id ContextId, unique among all contexts in this cluster
+ */
+ public BasicControllerContext(@JsonProperty("id") ContextId id) {
+ _id = id;
+ _serializer = DefaultStringSerializer.class;
+ }
+
+ /**
+ * Set the class that can serialize this object into a String, and back
+ * @param serializer StringSerializer implementation class
+ */
+ public void setSerializerClass(Class<? extends StringSerializer> serializer) {
+ _serializer = serializer;
+ }
+
+ @Override
+ public Class<? extends StringSerializer> getSerializerClass() {
+ return _serializer;
+ }
+
+ @Override
+ public ContextId getId() {
+ return _id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContext.java b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContext.java
new file mode 100644
index 0000000..b1be5c2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContext.java
@@ -0,0 +1,40 @@
+package org.apache.helix.controller.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.id.ContextId;
+import org.apache.helix.controller.serializer.StringSerializer;
+
+/**
+ * Controller stages can implement this interface to set and get persistent state
+ */
+public interface ControllerContext {
+ /**
+ * Get the identifier of this context
+ * @return {@link ContextId}
+ */
+ ContextId getId();
+
+ /**
+ * Get the class that can be used to convert this object to and from a String
+ * @return {@link StringSerializer} implementation class
+ */
+ Class<? extends StringSerializer> getSerializerClass();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextHolder.java b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextHolder.java
new file mode 100644
index 0000000..8c1d03a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextHolder.java
@@ -0,0 +1,156 @@
+package org.apache.helix.controller.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.serializer.StringSerializer;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Wrap a {@link ControllerContext} so that it can be persisted in the backing store
+ */
+public class ControllerContextHolder extends HelixProperty {
+ private enum Fields {
+ SERIALIZER_CLASS,
+ CONTEXT,
+ CONTEXT_CLASS
+ }
+
+ private static final Logger LOG = Logger.getLogger(ControllerContextHolder.class);
+
+ private final ControllerContext _context;
+
+ /**
+ * Instantiate from a populated ControllerContext
+ * @param context instantiated context
+ */
+ public ControllerContextHolder(ControllerContext context) {
+ super(context.getId().toString());
+ _context = context;
+ StringSerializer serializer = instantiateSerializerFromContext(_context);
+ if (_context != null && serializer != null) {
+ _record.setSimpleField(Fields.SERIALIZER_CLASS.toString(), _context.getSerializerClass()
+ .getName());
+ _record.setSimpleField(Fields.CONTEXT_CLASS.toString(), _context.getClass().getName());
+ _record.setSimpleField(Fields.CONTEXT.toString(), serializer.serialize(_context));
+ }
+ }
+
+ /**
+ * Instantiate from a record
+ * @param record populated ZNRecord
+ */
+ public ControllerContextHolder(ZNRecord record) {
+ super(record);
+ StringSerializer serializer =
+ instantiateSerializerFromName(_record.getSimpleField(Fields.SERIALIZER_CLASS.toString()));
+ _context =
+ loadContext(serializer, _record.getSimpleField(Fields.CONTEXT_CLASS.toString()),
+ _record.getSimpleField(Fields.CONTEXT.toString()));
+ }
+
+ /**
+ * Get a ControllerContext
+ * @return instantiated {@link ControllerContext}
+ */
+ public ControllerContext getContext() {
+ return _context;
+ }
+
+ /**
+ * Get a ControllerContext as a specific subtyple
+ * @return instantiated typed {@link ControllerContext}, or null
+ */
+ public <T extends ControllerContext> T getContext(Class<T> contextClass) {
+ if (_context != null) {
+ try {
+ return contextClass.cast(_context);
+ } catch (ClassCastException e) {
+ LOG.warn(contextClass + " is incompatible with context class: " + _context.getClass());
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Instantiate a StringSerializer that can serialize the context
+ * @param context instantiated ControllerContext
+ * @return StringSerializer object, or null if it could not be instantiated
+ */
+ private StringSerializer instantiateSerializerFromContext(ControllerContext context) {
+ if (context == null) {
+ return null;
+ }
+ try {
+ return context.getSerializerClass().newInstance();
+ } catch (InstantiationException e) {
+ LOG.error("Serializer could not be instatiated", e);
+ } catch (IllegalAccessException e) {
+ LOG.error("Serializer default constructor not accessible", e);
+ }
+ return null;
+ }
+
+ /**
+ * Instantiate a StringSerializer from its class name
+ * @param serializerClassName name of a StringSerializer implementation class
+ * @return instantiated StringSerializer, or null if it could not be instantiated
+ */
+ private StringSerializer instantiateSerializerFromName(String serializerClassName) {
+ if (serializerClassName != null) {
+ try {
+ return (StringSerializer) HelixUtil.loadClass(getClass(), serializerClassName)
+ .newInstance();
+ } catch (InstantiationException e) {
+ LOG.error("Error getting the serializer", e);
+ } catch (IllegalAccessException e) {
+ LOG.error("Error getting the serializer", e);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Error getting the serializer", e);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Deserialize a context
+ * @param serializer StringSerializer that can deserialize the context
+ * @param className the name of the context class
+ * @param contextData the raw context data as a String
+ * @return ControllerContext, or null if there was a conversion issue
+ */
+ private ControllerContext loadContext(StringSerializer serializer, String className,
+ String contextData) {
+ if (serializer != null && className != null && contextData != null) {
+ try {
+ Class<? extends ControllerContext> contextClass =
+ HelixUtil.loadClass(getClass(), className).asSubclass(ControllerContext.class);
+ return serializer.deserialize(contextClass, contextData);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Could not convert the persisted data into a " + className, e);
+ } catch (ClassCastException e) {
+ LOG.error("Could not convert the persisted data into a " + className, e);
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextProvider.java b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextProvider.java
new file mode 100644
index 0000000..1541585
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextProvider.java
@@ -0,0 +1,120 @@
+package org.apache.helix.controller.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.api.id.ContextId;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+
+/**
+ * An interface for getting and setting {@link ControllerContext} objects, which will eventually
+ * be persisted and acessible across runs of the controller pipeline.
+ */
+public class ControllerContextProvider {
+ private static final Logger LOG = Logger.getLogger(ControllerContextProvider.class);
+
+ private Map<ContextId, ControllerContext> _persistedContexts;
+ private Map<ContextId, ControllerContext> _pendingContexts;
+
+ /**
+ * Instantiate with already-persisted controller contexts
+ * @param contexts
+ */
+ public ControllerContextProvider(Map<ContextId, ControllerContext> contexts) {
+ _persistedContexts = contexts != null ? contexts : new HashMap<ContextId, ControllerContext>();
+ _pendingContexts = Maps.newHashMap();
+ }
+
+ /**
+ * Get a base ControllerContext
+ * @param contextId the context id to look up
+ * @return a ControllerContext, or null if not found
+ */
+ public ControllerContext getControllerContext(ContextId contextId) {
+ return getControllerContext(contextId, ControllerContext.class);
+ }
+
+ /**
+ * Get a typed ControllerContext
+ * @param contextId the context id to look up
+ * @param contextClass the class which the context should be returned as
+ * @return a typed ControllerContext, or null if no context with given id is available for this
+ * type
+ */
+ public <T extends ControllerContext> T getControllerContext(ContextId contextId,
+ Class<T> contextClass) {
+ try {
+ if (_pendingContexts.containsKey(contextId)) {
+ return contextClass.cast(_pendingContexts.get(contextId));
+ } else if (_persistedContexts.containsKey(contextId)) {
+ return contextClass.cast(_persistedContexts.get(contextId));
+ }
+ } catch (ClassCastException e) {
+ LOG.error("Could not convert context " + contextId + " into " + contextClass.getName());
+ }
+ return null;
+ }
+
+ /**
+ * Put a controller context, overwriting any existing ones
+ * @param contextId the id to set
+ * @param context the context object
+ */
+ public void putControllerContext(ContextId contextId, ControllerContext context) {
+ putControllerContext(contextId, context, true);
+ }
+
+ /**
+ * Put a controller context, specifying overwrite behavior
+ * @param contextId the id to set
+ * @param context the context object
+ * @param overwriteAllowed true if existing objects can be overwritten, false otherwise
+ * @return true if saved, false if an object with that id exists and overwrite is not allowed
+ */
+ public boolean putControllerContext(ContextId contextId, ControllerContext context,
+ boolean overwriteAllowed) {
+ if (overwriteAllowed || !exists(contextId)) {
+ _pendingContexts.put(contextId, context);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Check if a context exists
+ * @param contextId the id to look up
+ * @return true if a context exists with that id, false otherwise
+ */
+ public boolean exists(ContextId contextId) {
+ return _persistedContexts.containsKey(contextId) || _pendingContexts.containsKey(contextId);
+ }
+
+ /**
+ * Get all contexts that have been put, but not yet persisted
+ * @return a map of context id to context
+ */
+ public Map<ContextId, ControllerContext> getPendingContexts() {
+ return _pendingContexts;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 5209e2c..6629bec 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -8,8 +8,10 @@ import org.apache.helix.api.Cluster;
import org.apache.helix.api.State;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.model.ResourceAssignment;
@@ -40,15 +42,15 @@ public class CustomRebalancer implements HelixRebalancer {
private static final Logger LOG = Logger.getLogger(CustomRebalancer.class);
@Override
- public void init(HelixManager helixManager) {
+ public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
// do nothing
}
@Override
public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
- Cluster cluster, ResourceCurrentState currentState) {
- CustomRebalancerContext config =
- rebalancerConfig.getRebalancerContext(CustomRebalancerContext.class);
+ ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+ CustomRebalancerConfig config =
+ BasicRebalancerConfig.convert(rebalancerConfig, CustomRebalancerConfig.class);
StateModelDefinition stateModelDef =
cluster.getStateModelMap().get(config.getStateModelDefId());
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
index fc4bfa0..3aa41d7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
@@ -30,8 +30,9 @@ import org.apache.helix.api.State;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -54,31 +55,30 @@ public class FallbackRebalancer implements HelixRebalancer {
private HelixManager _helixManager;
@Override
- public void init(HelixManager helixManager) {
+ public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
_helixManager = helixManager;
}
@Override
public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
- Cluster cluster, ResourceCurrentState currentState) {
+ ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
// make sure the manager is not null
if (_helixManager == null) {
LOG.info("HelixManager is null!");
return null;
}
- // get the context
- PartitionedRebalancerContext context =
- rebalancerConfig.getRebalancerContext(PartitionedRebalancerContext.class);
- if (context == null) {
+ // get the config
+ PartitionedRebalancerConfig config = PartitionedRebalancerConfig.from(rebalancerConfig);
+ if (config == null) {
LOG.info("Resource is not partitioned");
return null;
}
// get the ideal state and rebalancer class
- ResourceId resourceId = context.getResourceId();
+ ResourceId resourceId = config.getResourceId();
StateModelDefinition stateModelDef =
- cluster.getStateModelMap().get(context.getStateModelDefId());
+ cluster.getStateModelMap().get(config.getStateModelDefId());
if (stateModelDef == null) {
LOG.info("StateModelDefinition unavailable for " + resourceId);
return null;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
index 6d7b0ef..0c55d45 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
@@ -16,8 +16,10 @@ import org.apache.helix.api.Participant;
import org.apache.helix.api.State;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
@@ -56,15 +58,15 @@ public class FullAutoRebalancer implements HelixRebalancer {
private static Logger LOG = Logger.getLogger(FullAutoRebalancer.class);
@Override
- public void init(HelixManager helixManager) {
+ public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
// do nothing
}
@Override
public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
- Cluster cluster, ResourceCurrentState currentState) {
- FullAutoRebalancerContext config =
- rebalancerConfig.getRebalancerContext(FullAutoRebalancerContext.class);
+ ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+ FullAutoRebalancerConfig config =
+ BasicRebalancerConfig.convert(rebalancerConfig, FullAutoRebalancerConfig.class);
StateModelDefinition stateModelDef =
cluster.getStateModelMap().get(config.getStateModelDefId());
// Compute a preference list based on the current ideal state
@@ -181,7 +183,7 @@ public class FullAutoRebalancer implements HelixRebalancer {
}
private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
- FullAutoRebalancerContext config, ResourceCurrentState currentStateOutput,
+ FullAutoRebalancerConfig config, ResourceCurrentState currentStateOutput,
Map<State, Integer> stateCountMap) {
Map<PartitionId, Map<ParticipantId, State>> map =
new HashMap<PartitionId, Map<ParticipantId, State>>();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
index 7fcbba5..1fbb02f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
@@ -2,7 +2,8 @@ package org.apache.helix.controller.rebalancer;
import org.apache.helix.HelixManager;
import org.apache.helix.api.Cluster;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.model.ResourceAssignment;
@@ -32,10 +33,11 @@ import org.apache.helix.model.ResourceAssignment;
*/
public interface HelixRebalancer {
/**
- * Initialize the rebalancer with a HelixManager if necessary
- * @param manager
+ * Initialize the rebalancer with a HelixManager and ControllerContextProvider if necessary
+ * @param manager HelixManager instance
+ * @param contextProvider An object that supports getting and setting context across pipeline runs
*/
- public void init(HelixManager helixManager);
+ public void init(HelixManager helixManager, ControllerContextProvider contextProvider);
/**
* Given an ideal state for a resource and liveness of participants, compute a assignment of
@@ -47,18 +49,20 @@ public interface HelixRebalancer {
* Say that you have:<br/>
*
* <pre>
- * class MyRebalancerContext implements RebalancerContext
+ * class MyRebalancerConfig implements RebalancerConfig
* </pre>
*
- * as your rebalancer context. To extract it from a RebalancerConfig, do the following:<br/>
+ * as your rebalancer config. To get a typed version, you can do the following:<br/>
*
* <pre>
- * MyRebalancerContext context = rebalancerConfig.getRebalancerContext(MyRebalancerContext.class);
+ * MyRebalancerConfig config = BasicRebalancerConfig.convert(rebalancerConfig,
+ * MyRebalancerConfig.class);
* </pre>
* @param rebalancerConfig the properties of the resource for which a mapping will be computed
+ * @param prevAssignment the previous ResourceAssignment of this cluster, or null if none
* @param cluster complete snapshot of the cluster
* @param currentState the current states of all partitions
*/
public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
- Cluster cluster, ResourceCurrentState currentState);
+ ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index a0ad6f3..51ca463 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -9,9 +9,10 @@ import org.apache.helix.api.Cluster;
import org.apache.helix.api.State;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.model.ResourceAssignment;
@@ -45,15 +46,15 @@ public class SemiAutoRebalancer implements HelixRebalancer {
private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
@Override
- public void init(HelixManager helixManager) {
+ public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
// do nothing
}
@Override
public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
- Cluster cluster, ResourceCurrentState currentState) {
- SemiAutoRebalancerContext config =
- rebalancerConfig.getRebalancerContext(SemiAutoRebalancerContext.class);
+ ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+ SemiAutoRebalancerConfig config =
+ BasicRebalancerConfig.convert(rebalancerConfig, SemiAutoRebalancerConfig.class);
StateModelDefinition stateModelDef =
cluster.getStateModelMap().get(config.getStateModelDefId());
if (LOG.isDebugEnabled()) {
@@ -65,8 +66,7 @@ public class SemiAutoRebalancer implements HelixRebalancer {
Map<ParticipantId, State> currentStateMap =
currentState.getCurrentStateMap(config.getResourceId(), partition);
Set<ParticipantId> disabledInstancesForPartition =
- ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
- partition);
+ ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(), partition);
List<ParticipantId> preferenceList =
ConstraintBasedAssignment.getPreferenceList(cluster, partition,
config.getPreferenceList(partition));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java
new file mode 100644
index 0000000..d6aa10e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java
@@ -0,0 +1,256 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import java.util.Set;
+
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.serializer.DefaultStringSerializer;
+import org.apache.helix.controller.serializer.StringSerializer;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Abstract RebalancerConfig that functions for generic subunits. Use a subclass that more
+ * concretely defines the subunits.
+ */
+public abstract class BasicRebalancerConfig implements RebalancerConfig {
+ private ResourceId _resourceId;
+ private StateModelDefId _stateModelDefId;
+ private StateModelFactoryId _stateModelFactoryId;
+ private String _participantGroupTag;
+ private Class<? extends StringSerializer> _serializer;
+ private RebalancerRef _rebalancerRef;
+
+ /**
+ * Instantiate a basic rebalancer config
+ */
+ public BasicRebalancerConfig() {
+ _serializer = DefaultStringSerializer.class;
+ }
+
+ @Override
+ public ResourceId getResourceId() {
+ return _resourceId;
+ }
+
+ /**
+ * Set the resource to rebalance
+ * @param resourceId resource id
+ */
+ public void setResourceId(ResourceId resourceId) {
+ _resourceId = resourceId;
+ }
+
+ @Override
+ public StateModelDefId getStateModelDefId() {
+ return _stateModelDefId;
+ }
+
+ /**
+ * Set the state model definition that the resource follows
+ * @param stateModelDefId state model definition id
+ */
+ public void setStateModelDefId(StateModelDefId stateModelDefId) {
+ _stateModelDefId = stateModelDefId;
+ }
+
+ @Override
+ public StateModelFactoryId getStateModelFactoryId() {
+ return _stateModelFactoryId;
+ }
+
+ /**
+ * Set the state model factory that the resource uses
+ * @param stateModelFactoryId state model factory id
+ */
+ public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+ _stateModelFactoryId = stateModelFactoryId;
+ }
+
+ @Override
+ public String getParticipantGroupTag() {
+ return _participantGroupTag;
+ }
+
+ /**
+ * Set a tag that participants must have in order to serve this resource
+ * @param participantGroupTag string group tag
+ */
+ public void setParticipantGroupTag(String participantGroupTag) {
+ _participantGroupTag = participantGroupTag;
+ }
+
+ /**
+ * Get the serializer. If none is provided, {@link DefaultStringSerializer} is used
+ */
+ @Override
+ public Class<? extends StringSerializer> getSerializerClass() {
+ return _serializer;
+ }
+
+ /**
+ * Set the class that can serialize this config
+ * @param serializer serializer class that implements StringSerializer
+ */
+ public void setSerializerClass(Class<? extends StringSerializer> serializer) {
+ _serializer = serializer;
+ }
+
+ @Override
+ @JsonIgnore
+ public Set<? extends PartitionId> getSubUnitIdSet() {
+ return getSubUnitMap().keySet();
+ }
+
+ @Override
+ @JsonIgnore
+ public Partition getSubUnit(PartitionId subUnitId) {
+ return getSubUnitMap().get(subUnitId);
+ }
+
+ @Override
+ public RebalancerRef getRebalancerRef() {
+ return _rebalancerRef;
+ }
+
+ /**
+ * Set the reference to the class used to rebalance this resource
+ * @param rebalancerRef RebalancerRef instance
+ */
+ public void setRebalancerRef(RebalancerRef rebalancerRef) {
+ _rebalancerRef = rebalancerRef;
+ }
+
+ /**
+ * Safely cast a RebalancerConfig into a subtype
+ * @param config RebalancerConfig object
+ * @param clazz the target class
+ * @return An instance of clazz, or null if the conversion is not possible
+ */
+ public static <T extends RebalancerConfig> T convert(RebalancerConfig config, Class<T> clazz) {
+ try {
+ return clazz.cast(config);
+ } catch (ClassCastException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Abstract builder for the base rebalancer config
+ */
+ public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
+ private final ResourceId _resourceId;
+ private StateModelDefId _stateModelDefId;
+ private StateModelFactoryId _stateModelFactoryId;
+ private String _participantGroupTag;
+ private Class<? extends StringSerializer> _serializerClass;
+ private RebalancerRef _rebalancerRef;
+
+ /**
+ * Instantiate with a resource id
+ * @param resourceId resource id
+ */
+ public AbstractBuilder(ResourceId resourceId) {
+ _resourceId = resourceId;
+ _serializerClass = DefaultStringSerializer.class;
+ }
+
+ /**
+ * Set the state model definition that the resource should follow
+ * @param stateModelDefId state model definition id
+ * @return Builder
+ */
+ public T stateModelDefId(StateModelDefId stateModelDefId) {
+ _stateModelDefId = stateModelDefId;
+ return self();
+ }
+
+ /**
+ * Set the state model factory that the resource should use
+ * @param stateModelFactoryId state model factory id
+ * @return Builder
+ */
+ public T stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+ _stateModelFactoryId = stateModelFactoryId;
+ return self();
+ }
+
+ /**
+ * Set the tag that all participants require in order to serve this resource
+ * @param participantGroupTag the tag
+ * @return Builder
+ */
+ public T participantGroupTag(String participantGroupTag) {
+ _participantGroupTag = participantGroupTag;
+ return self();
+ }
+
+ /**
+ * Set the serializer class for this rebalancer config
+ * @param serializerClass class that implements StringSerializer
+ * @return Builder
+ */
+ public T serializerClass(Class<? extends StringSerializer> serializerClass) {
+ _serializerClass = serializerClass;
+ return self();
+ }
+
+ /**
+ * Specify a custom class to use for rebalancing
+ * @param rebalancerRef RebalancerRef instance
+ * @return Builder
+ */
+ public T rebalancerRef(RebalancerRef rebalancerRef) {
+ _rebalancerRef = rebalancerRef;
+ return self();
+ }
+
+ /**
+ * Update an existing context with base fields
+ * @param config derived config
+ */
+ protected final void update(BasicRebalancerConfig config) {
+ config.setResourceId(_resourceId);
+ config.setStateModelDefId(_stateModelDefId);
+ config.setStateModelFactoryId(_stateModelFactoryId);
+ config.setParticipantGroupTag(_participantGroupTag);
+ config.setSerializerClass(_serializerClass);
+ config.setRebalancerRef(_rebalancerRef);
+ }
+
+ /**
+ * Get a typed reference to "this" class. Final derived classes should simply return the this
+ * reference.
+ * @return this for the most specific type
+ */
+ protected abstract T self();
+
+ /**
+ * Get the rebalancer config from the built fields
+ * @return RebalancerConfig
+ */
+ public abstract RebalancerConfig build();
+ }
+}