You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/12/04 02:21:04 UTC
[1/4] [HELIX-327] Simplify rebalancer, rename rebalancer configs,
support settable contexts, rb=15981
Updated Branches:
refs/heads/master 7521c0cb5 -> 015e7dda2
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java
index ff98cd2..3693c2b 100644
--- a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java
+++ b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java
@@ -30,9 +30,10 @@ import org.apache.helix.api.State;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.context.ControllerContextProvider;
import org.apache.helix.controller.rebalancer.HelixRebalancer;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
@@ -42,7 +43,7 @@ public class LockManagerRebalancer implements HelixRebalancer {
private static final Logger LOG = Logger.getLogger(LockManagerRebalancer.class);
@Override
- public void init(HelixManager manager) {
+ public void init(HelixManager manager, ControllerContextProvider contextProvider) {
// do nothing; this rebalancer is independent of the manager
}
@@ -54,20 +55,19 @@ public class LockManagerRebalancer implements HelixRebalancer {
*/
@Override
public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
- Cluster cluster, ResourceCurrentState currentState) {
+ ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
// get a typed context
- PartitionedRebalancerContext context =
- rebalancerConfig.getRebalancerContext(PartitionedRebalancerContext.class);
+ PartitionedRebalancerConfig config = PartitionedRebalancerConfig.from(rebalancerConfig);
// Initialize an empty mapping of locks to participants
- ResourceAssignment assignment = new ResourceAssignment(context.getResourceId());
+ ResourceAssignment assignment = new ResourceAssignment(config.getResourceId());
// Get the list of live participants in the cluster
List<ParticipantId> liveParticipants =
new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
// Get the state model (should be a simple lock/unlock model) and the highest-priority state
- StateModelDefId stateModelDefId = context.getStateModelDefId();
+ StateModelDefId stateModelDefId = config.getStateModelDefId();
StateModelDefinition stateModelDef = cluster.getStateModelMap().get(stateModelDefId);
if (stateModelDef.getStatesPriorityList().size() < 1) {
LOG.error("Invalid state model definition. There should be at least one state.");
@@ -93,7 +93,7 @@ public class LockManagerRebalancer implements HelixRebalancer {
// This assumes a simple lock-unlock model where the only state of interest is which nodes have
// acquired each lock.
int i = 0;
- for (PartitionId partition : context.getPartitionSet()) {
+ for (PartitionId partition : config.getPartitionSet()) {
Map<ParticipantId, State> replicaMap = new HashMap<ParticipantId, State>();
for (int j = i; j < i + lockHolders; j++) {
int participantIndex = j % liveParticipants.size();
[4/4] git commit: [HELIX-327] Simplify rebalancer,
rename rebalancer configs, support settable contexts, rb=15981
Posted by ka...@apache.org.
[HELIX-327] Simplify rebalancer, rename rebalancer configs, support settable contexts, rb=15981
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/015e7dda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/015e7dda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/015e7dda
Branch: refs/heads/master
Commit: 015e7dda2c1f96beb6d387ba71c118d246840f1b
Parents: 7521c0c
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Dec 3 16:52:23 2013 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Dec 3 17:20:31 2013 -0800
----------------------------------------------------------------------
.../main/java/org/apache/helix/PropertyKey.java | 18 +
.../org/apache/helix/PropertyPathConfig.java | 5 +
.../java/org/apache/helix/PropertyType.java | 3 +-
.../main/java/org/apache/helix/api/Cluster.java | 20 +-
.../java/org/apache/helix/api/Resource.java | 9 +-
.../api/accessor/AtomicResourceAccessor.java | 6 +-
.../helix/api/accessor/ClusterAccessor.java | 52 ++-
.../helix/api/accessor/ParticipantAccessor.java | 23 +-
.../helix/api/accessor/ResourceAccessor.java | 138 ++++---
.../apache/helix/api/config/ResourceConfig.java | 32 +-
.../java/org/apache/helix/api/id/ContextId.java | 57 +++
.../controller/GenericHelixController.java | 8 +-
.../context/BasicControllerContext.java | 60 +++
.../controller/context/ControllerContext.java | 40 ++
.../context/ControllerContextHolder.java | 156 +++++++
.../context/ControllerContextProvider.java | 120 ++++++
.../controller/rebalancer/CustomRebalancer.java | 14 +-
.../rebalancer/FallbackRebalancer.java | 20 +-
.../rebalancer/FullAutoRebalancer.java | 16 +-
.../controller/rebalancer/HelixRebalancer.java | 20 +-
.../rebalancer/SemiAutoRebalancer.java | 18 +-
.../config/BasicRebalancerConfig.java | 256 ++++++++++++
.../config/CustomRebalancerConfig.java | 164 ++++++++
.../config/FullAutoRebalancerConfig.java | 64 +++
.../config/PartitionedRebalancerConfig.java | 405 +++++++++++++++++++
.../rebalancer/config/RebalancerConfig.java | 95 +++++
.../config/RebalancerConfigHolder.java | 185 +++++++++
.../config/ReplicatedRebalancerConfig.java | 40 ++
.../config/SemiAutoRebalancerConfig.java | 178 ++++++++
.../context/BasicRebalancerContext.java | 240 -----------
.../rebalancer/context/ContextSerializer.java | 37 --
.../context/CustomRebalancerContext.java | 164 --------
.../context/DefaultContextSerializer.java | 83 ----
.../context/FullAutoRebalancerContext.java | 64 ---
.../context/PartitionedRebalancerContext.java | 393 ------------------
.../rebalancer/context/RebalancerConfig.java | 182 ---------
.../rebalancer/context/RebalancerContext.java | 94 -----
.../context/ReplicatedRebalancerContext.java | 40 --
.../context/SemiAutoRebalancerContext.java | 178 --------
.../serializer/DefaultStringSerializer.java | 82 ++++
.../controller/serializer/StringSerializer.java | 37 ++
.../helix/controller/stages/AttributeName.java | 3 +-
.../stages/BestPossibleStateCalcStage.java | 26 +-
.../stages/ExternalViewComputeStage.java | 10 +-
.../stages/MessageGenerationStage.java | 16 +-
.../stages/MessageSelectionStage.java | 21 +-
.../stages/PersistAssignmentStage.java | 14 +-
.../controller/stages/PersistContextStage.java | 59 +++
.../controller/stages/ReadClusterDataStage.java | 17 +
.../stages/ResourceComputationStage.java | 25 +-
.../helix/model/ResourceConfiguration.java | 14 +-
.../org/apache/helix/tools/NewClusterSetup.java | 66 +--
.../org/apache/helix/api/TestNewStages.java | 22 +-
.../org/apache/helix/api/TestUpdateConfig.java | 27 +-
.../context/TestSerializeRebalancerContext.java | 20 +-
.../helix/controller/stages/BaseStageTest.java | 6 +-
.../stages/TestResourceComputationStage.java | 13 +-
.../TestCustomizedIdealStateRebalancer.java | 35 +-
.../helix/integration/TestHelixConnection.java | 10 +-
.../helix/examples/LogicalModelExample.java | 10 +-
.../LockManagerRebalancer.java | 18 +-
61 files changed, 2450 insertions(+), 1798 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index ebd7a35..2af63a5 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -24,6 +24,7 @@ import static org.apache.helix.PropertyType.ALERT_HISTORY;
import static org.apache.helix.PropertyType.ALERT_STATUS;
import static org.apache.helix.PropertyType.CLUSTER;
import static org.apache.helix.PropertyType.CONFIGS;
+import static org.apache.helix.PropertyType.CONTEXT;
import static org.apache.helix.PropertyType.CONTROLLER;
import static org.apache.helix.PropertyType.CURRENTSTATES;
import static org.apache.helix.PropertyType.ERRORS;
@@ -46,6 +47,7 @@ import static org.apache.helix.PropertyType.STATUSUPDATES_CONTROLLER;
import java.util.Arrays;
+import org.apache.helix.controller.context.ControllerContextHolder;
import org.apache.helix.model.AlertHistory;
import org.apache.helix.model.AlertStatus;
import org.apache.helix.model.Alerts;
@@ -721,6 +723,22 @@ public class PropertyKey {
public PropertyKey propertyStore() {
return new PropertyKey(PROPERTYSTORE, null, _clusterName);
}
+
+ /**
+ * Get a propertykey associated with the root of the Helix contexts
+ * @return {@link PropertyKey}
+ */
+ public PropertyKey controllerContexts() {
+ return new PropertyKey(CONTEXT, ControllerContextHolder.class, _clusterName);
+ }
+
+ /**
+ * Get a propertykey associated with the root of the Helix contexts
+ * @return {@link PropertyKey}
+ */
+ public PropertyKey controllerContext(String contextId) {
+ return new PropertyKey(CONTEXT, ControllerContextHolder.class, _clusterName, contextId);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java b/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
index 60a92f4..d66e7d9 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
@@ -22,6 +22,7 @@ package org.apache.helix;
import static org.apache.helix.PropertyType.ALERTS;
import static org.apache.helix.PropertyType.ALERT_STATUS;
import static org.apache.helix.PropertyType.CONFIGS;
+import static org.apache.helix.PropertyType.CONTEXT;
import static org.apache.helix.PropertyType.CURRENTSTATES;
import static org.apache.helix.PropertyType.EXTERNALVIEW;
import static org.apache.helix.PropertyType.HEALTHREPORT;
@@ -40,6 +41,7 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.helix.controller.context.ControllerContextHolder;
import org.apache.helix.model.AlertStatus;
import org.apache.helix.model.Alerts;
import org.apache.helix.model.CurrentState;
@@ -81,6 +83,7 @@ public class PropertyPathConfig {
typeToClassMapping.put(ALERT_STATUS, AlertStatus.class);
typeToClassMapping.put(PAUSE, PauseSignal.class);
typeToClassMapping.put(RESOURCEASSIGNMENTS, ResourceAssignment.class);
+ typeToClassMapping.put(CONTEXT, ControllerContextHolder.class);
// @formatter:off
addEntry(PropertyType.CLUSTER, 1, "/{clusterName}");
@@ -148,6 +151,8 @@ public class PropertyPathConfig {
addEntry(PropertyType.ALERTS, 1, "/{clusterName}/CONTROLLER/ALERTS");
addEntry(PropertyType.ALERT_STATUS, 1, "/{clusterName}/CONTROLLER/ALERT_STATUS");
addEntry(PropertyType.ALERT_HISTORY, 1, "/{clusterName}/CONTROLLER/ALERT_HISTORY");
+ addEntry(PropertyType.CONTEXT, 1, "/{clusterName}/CONTROLLER/CONTEXT");
+ addEntry(PropertyType.CONTEXT, 2, "/{clusterName}/CONTROLLER/CONTEXT/{contextId}");
// @formatter:on
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/PropertyType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyType.java b/helix-core/src/main/java/org/apache/helix/PropertyType.java
index 680dc06..75adb20 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyType.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyType.java
@@ -64,7 +64,8 @@ public enum PropertyType {
PERSISTENTSTATS(Type.CONTROLLER, true, false, false, false),
ALERTS(Type.CONTROLLER, true, false, false, false),
ALERT_STATUS(Type.CONTROLLER, true, false, false, false),
- ALERT_HISTORY(Type.CONTROLLER, true, false, false, false);
+ ALERT_HISTORY(Type.CONTROLLER, true, false, false, false),
+ CONTEXT(Type.CONTROLLER, true, false);
// @formatter:on
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index fdeb879..98072d1 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -27,11 +27,13 @@ import org.apache.helix.api.config.ParticipantConfig;
import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ContextId;
import org.apache.helix.api.id.ControllerId;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SpectatorId;
import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.context.ControllerContext;
import org.apache.helix.model.Alerts;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -73,6 +75,8 @@ public class Cluster {
*/
private final Map<SpectatorId, Spectator> _spectatorMap;
+ private final Map<ContextId, ControllerContext> _contextMap;
+
private final ControllerId _leaderId;
private final ClusterConfig _config;
@@ -86,6 +90,7 @@ public class Cluster {
* @param leaderId
* @param constraintMap
* @param stateModelMap
+ * @param contextMap
* @param stats
* @param alerts
* @param userConfig
@@ -95,8 +100,9 @@ public class Cluster {
public Cluster(ClusterId id, Map<ResourceId, Resource> resourceMap,
Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap,
- Map<StateModelDefId, StateModelDefinition> stateModelMap, PersistentStats stats,
- Alerts alerts, UserConfig userConfig, boolean isPaused, boolean autoJoinAllowed) {
+ Map<StateModelDefId, StateModelDefinition> stateModelMap,
+ Map<ContextId, ControllerContext> contextMap, PersistentStats stats, Alerts alerts,
+ UserConfig userConfig, boolean isPaused, boolean autoJoinAllowed) {
// build the config
// Guava's transform and "copy" functions really return views so the maps all reflect each other
@@ -137,6 +143,8 @@ public class Cluster {
_leaderId = leaderId;
+ _contextMap = ImmutableMap.copyOf(contextMap);
+
// TODO impl this when we persist controllers and spectators on zookeeper
_controllerMap = ImmutableMap.copyOf(controllerMap);
_spectatorMap = Collections.emptyMap();
@@ -224,6 +232,14 @@ public class Cluster {
}
/**
+ * Get all the controller context currently on the cluster
+ * @return map of context id to controller context objects
+ */
+ public Map<ContextId, ControllerContext> getContextMap() {
+ return _contextMap;
+ }
+
+ /**
* Get all the persisted stats for the cluster
* @return PersistentStats instance
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 79a1e09..0726510 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -30,8 +30,7 @@ import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
@@ -52,18 +51,16 @@ public class Resource {
* @param idealState ideal state of the resource
* @param externalView external view of the resource
* @param resourceAssignment current resource assignment of the cluster
- * @param rebalancerContext contextual parameters that the rebalancer should be aware of
+ * @param rebalancerConfig parameters that the rebalancer should be aware of
* @param userConfig any resource user-defined configuration
* @param bucketSize the bucket size to use for physically saved state
* @param batchMessageMode true if batch messaging allowed, false otherwise
*/
public Resource(ResourceId id, ResourceType type, IdealState idealState,
ResourceAssignment resourceAssignment, ExternalView externalView,
- RebalancerContext rebalancerContext, UserConfig userConfig, int bucketSize,
+ RebalancerConfig rebalancerConfig, UserConfig userConfig, int bucketSize,
boolean batchMessageMode) {
SchedulerTaskConfig schedulerTaskConfig = schedulerTaskConfig(idealState);
- RebalancerConfig rebalancerConfig = new RebalancerConfig(rebalancerContext);
-
_config =
new ResourceConfig(id, type, schedulerTaskConfig, rebalancerConfig, userConfig, bucketSize,
batchMessageMode);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
index 6d69981..cda83d8 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
@@ -6,7 +6,7 @@ import org.apache.helix.api.Scope;
import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.lock.HelixLock;
import org.apache.helix.lock.HelixLockable;
import org.apache.log4j.Logger;
@@ -96,12 +96,12 @@ public class AtomicResourceAccessor extends ResourceAccessor {
}
@Override
- public boolean setRebalancerContext(ResourceId resourceId, RebalancerContext context) {
+ public boolean setRebalancerConfig(ResourceId resourceId, RebalancerConfig config) {
HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
boolean locked = lock.lock();
if (locked) {
try {
- return _resourceAccessor.setRebalancerContext(resourceId, context);
+ return _resourceAccessor.setRebalancerConfig(resourceId, config);
} finally {
lock.unlock();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index b01a3ec..36c7aaa 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -45,14 +45,18 @@ import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ConstraintId;
+import org.apache.helix.api.id.ContextId;
import org.apache.helix.api.id.ControllerId;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SessionId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.context.ControllerContext;
+import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
import org.apache.helix.model.Alerts;
import org.apache.helix.model.ClusterConfiguration;
import org.apache.helix.model.ClusterConstraints;
@@ -60,6 +64,7 @@ import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
@@ -266,9 +271,13 @@ public class ClusterAccessor {
// read the alerts
Alerts alerts = _accessor.getProperty(_keyBuilder.alerts());
+ // read controller context
+ Map<ContextId, ControllerContext> contextMap = readControllerContext();
+
// create the cluster snapshot object
return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
- clusterConstraintMap, stateModelMap, stats, alerts, userConfig, isPaused, autoJoinAllowed);
+ clusterConstraintMap, stateModelMap, contextMap, stats, alerts, userConfig, isPaused,
+ autoJoinAllowed);
}
/**
@@ -459,6 +468,20 @@ public class ClusterAccessor {
}
/**
+ * Read the persisted controller contexts
+ * @return map of context id to controller context
+ */
+ public Map<ContextId, ControllerContext> readControllerContext() {
+ Map<String, ControllerContextHolder> contextHolders =
+ _accessor.getChildValuesMap(_keyBuilder.controllerContexts());
+ Map<ContextId, ControllerContext> contexts = Maps.newHashMap();
+ for (String contextName : contextHolders.keySet()) {
+ contexts.put(ContextId.from(contextName), contextHolders.get(contextName).getContext());
+ }
+ return contexts;
+ }
+
+ /**
* Add a statistic specification to the cluster. Existing stat specifications will not be
* overwritten
* @param statName string representing a stat specification
@@ -623,7 +646,7 @@ public class ClusterAccessor {
*/
public boolean addResourceToCluster(ResourceConfig resource) {
if (resource == null || resource.getRebalancerConfig() == null) {
- LOG.error("Resource not fully defined with a rebalancer context");
+ LOG.error("Resource not fully defined with a rebalancer config");
return false;
}
@@ -631,9 +654,8 @@ public class ClusterAccessor {
LOG.error("Cluster: " + _clusterId + " structure is not valid");
return false;
}
- RebalancerContext context =
- resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
- StateModelDefId stateModelDefId = context.getStateModelDefId();
+ RebalancerConfig config = resource.getRebalancerConfig();
+ StateModelDefId stateModelDefId = config.getStateModelDefId();
if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
LOG.error("State model: " + stateModelDefId + " not found in cluster: " + _clusterId);
return false;
@@ -656,17 +678,21 @@ public class ClusterAccessor {
ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
configuration.setType(resource.getType());
configuration.addNamespacedConfig(resource.getUserConfig());
- configuration.addNamespacedConfig(resource.getRebalancerConfig().toNamespacedConfig());
- configuration.setBucketSize(resource.getBucketSize());
- configuration.setBatchMessageMode(resource.getBatchMessageMode());
+ PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
+ if (partitionedConfig == null
+ || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+ // only persist if this is not easily convertible to an ideal state
+ configuration
+ .addNamespacedConfig(new RebalancerConfigHolder(resource.getRebalancerConfig())
+ .toNamespacedConfig());
+ }
_accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
}
// Create an IdealState from a RebalancerConfig (if the resource is partitioned)
- RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
IdealState idealState =
- ResourceAccessor.rebalancerConfigToIdealState(rebalancerConfig, resource.getBucketSize(),
- resource.getBatchMessageMode());
+ ResourceAccessor.rebalancerConfigToIdealState(resource.getRebalancerConfig(),
+ resource.getBucketSize(), resource.getBatchMessageMode());
if (idealState != null) {
_accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index 2721d91..c3deafe 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -51,8 +51,8 @@ import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SessionId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -315,16 +315,15 @@ public class ParticipantAccessor {
return false;
}
- // need the rebalancer context for the resource
- RebalancerContext context =
- resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
- if (context == null) {
- LOG.error("Rebalancer context for resource does not exist");
+ // need the rebalancer config for the resource
+ RebalancerConfig config = resource.getRebalancerConfig();
+ if (config == null) {
+ LOG.error("Rebalancer config for resource does not exist");
return false;
}
// ensure that all partitions to reset exist
- Set<PartitionId> partitionSet = ImmutableSet.copyOf(context.getSubUnitIdSet());
+ Set<PartitionId> partitionSet = ImmutableSet.copyOf(config.getSubUnitIdSet());
if (!partitionSet.containsAll(resetPartitionIdSet)) {
LOG.error("Not all of the specified partitions to reset exist for the resource");
return false;
@@ -368,7 +367,7 @@ public class ParticipantAccessor {
}
// build messages to signal the transition
- StateModelDefId stateModelDefId = context.getStateModelDefId();
+ StateModelDefId stateModelDefId = config.getStateModelDefId();
StateModelDefinition stateModelDef =
_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify()));
Map<MessageId, Message> messageMap = Maps.newHashMap();
@@ -385,7 +384,7 @@ public class ParticipantAccessor {
message.setStateModelDef(stateModelDefId);
message.setFromState(State.from(HelixDefinedState.ERROR.toString()));
message.setToState(stateModelDef.getTypedInitialState());
- message.setStateModelFactoryId(context.getStateModelFactoryId());
+ message.setStateModelFactoryId(config.getStateModelFactoryId());
messageMap.put(message.getMessageId(), message);
}
@@ -666,8 +665,8 @@ public class ParticipantAccessor {
for (String resourceName : idealStateMap.keySet()) {
IdealState idealState = idealStateMap.get(resourceName);
swapParticipantsInIdealState(idealState, oldParticipantId, newParticipantId);
- PartitionedRebalancerContext context = PartitionedRebalancerContext.from(idealState);
- resourceAccessor.setRebalancerContext(ResourceId.from(resourceName), context);
+ PartitionedRebalancerConfig config = PartitionedRebalancerConfig.from(idealState);
+ resourceAccessor.setRebalancerConfig(ResourceId.from(resourceName), config);
_accessor.setProperty(_keyBuilder.idealStates(resourceName), idealState);
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index e5ac57c..b308b98 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -38,11 +38,12 @@ import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
@@ -131,11 +132,11 @@ public class ResourceAccessor {
* @param configuration
* @return true if set, false otherwise
*/
- private boolean setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) {
+ private boolean setConfiguration(ResourceId resourceId, ResourceConfiguration configuration,
+ RebalancerConfig rebalancerConfig) {
boolean status =
_accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
- // also set an ideal state if the resource supports it
- RebalancerConfig rebalancerConfig = new RebalancerConfig(configuration);
+ // set an ideal state if the resource supports it
IdealState idealState =
rebalancerConfigToIdealState(rebalancerConfig, configuration.getBucketSize(),
configuration.getBatchMessageMode());
@@ -146,16 +147,20 @@ public class ResourceAccessor {
}
/**
- * Set the context of the rebalancer. This includes all properties required for rebalancing this
+ * Set the config of the rebalancer. This includes all properties required for rebalancing this
* resource
* @param resourceId the resource to update
- * @param context the new rebalancer context
- * @return true if the context was set, false otherwise
+ * @param config the new rebalancer config
+ * @return true if the config was set, false otherwise
*/
- public boolean setRebalancerContext(ResourceId resourceId, RebalancerContext context) {
- RebalancerConfig config = new RebalancerConfig(context);
+ public boolean setRebalancerConfig(ResourceId resourceId, RebalancerConfig config) {
ResourceConfiguration resourceConfig = new ResourceConfiguration(resourceId);
- resourceConfig.addNamespacedConfig(config.toNamespacedConfig());
+ PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
+ if (partitionedConfig == null
+ || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+ // only persist if this is not easily convertible to an ideal state
+ resourceConfig.addNamespacedConfig(new RebalancerConfigHolder(config).toNamespacedConfig());
+ }
// update the ideal state if applicable
IdealState oldIdealState =
@@ -190,9 +195,13 @@ public class ResourceAccessor {
* @return RebalancerConfig, or null
*/
public RebalancerConfig readRebalancerConfig(ResourceId resourceId) {
+ IdealState idealState = _accessor.getProperty(_keyBuilder.idealStates(resourceId.stringify()));
+ if (idealState != null && idealState.getRebalanceMode() != RebalanceMode.USER_DEFINED) {
+ return PartitionedRebalancerConfig.from(idealState);
+ }
ResourceConfiguration resourceConfig =
_accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
- return resourceConfig != null ? RebalancerConfig.from(resourceConfig) : null;
+ return resourceConfig.getRebalancerConfig(RebalancerConfig.class);
}
/**
@@ -242,10 +251,17 @@ public class ResourceAccessor {
ResourceId resourceId = resourceConfig.getId();
ResourceConfiguration config = new ResourceConfiguration(resourceId);
config.addNamespacedConfig(resourceConfig.getUserConfig());
- config.addNamespacedConfig(resourceConfig.getRebalancerConfig().toNamespacedConfig());
+ PartitionedRebalancerConfig partitionedConfig =
+ PartitionedRebalancerConfig.from(resourceConfig.getRebalancerConfig());
+ if (partitionedConfig == null
+ || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+ // only persist if this is not easily convertible to an ideal state
+ config.addNamespacedConfig(new RebalancerConfigHolder(resourceConfig.getRebalancerConfig())
+ .toNamespacedConfig());
+ }
config.setBucketSize(resourceConfig.getBucketSize());
config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
- setConfiguration(resourceId, config);
+ setConfiguration(resourceId, config, resourceConfig.getRebalancerConfig());
return true;
}
@@ -332,28 +348,28 @@ public class ResourceAccessor {
String participantGroupTag) {
Resource resource = readResource(resourceId);
RebalancerConfig config = resource.getRebalancerConfig();
- PartitionedRebalancerContext context =
- config.getRebalancerContext(PartitionedRebalancerContext.class);
- if (context == null) {
+ PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
+ if (partitionedConfig == null) {
LOG.error("Only partitioned resource types are supported");
return false;
}
if (replicaCount != -1) {
- context.setReplicaCount(replicaCount);
+ partitionedConfig.setReplicaCount(replicaCount);
}
if (participantGroupTag != null) {
- context.setParticipantGroupTag(participantGroupTag);
+ partitionedConfig.setParticipantGroupTag(participantGroupTag);
}
StateModelDefinition stateModelDef =
- _accessor.getProperty(_keyBuilder.stateModelDef(context.getStateModelDefId().stringify()));
+ _accessor.getProperty(_keyBuilder.stateModelDef(partitionedConfig.getStateModelDefId()
+ .stringify()));
List<InstanceConfig> participantConfigs =
_accessor.getChildValues(_keyBuilder.instanceConfigs());
Set<ParticipantId> participantSet = Sets.newHashSet();
for (InstanceConfig participantConfig : participantConfigs) {
participantSet.add(participantConfig.getParticipantId());
}
- context.generateDefaultConfiguration(stateModelDef, participantSet);
- setRebalancerContext(resourceId, context);
+ partitionedConfig.generateDefaultConfiguration(stateModelDef, participantSet);
+ setRebalancerConfig(resourceId, partitionedConfig);
return true;
}
@@ -366,41 +382,40 @@ public class ResourceAccessor {
*/
static IdealState rebalancerConfigToIdealState(RebalancerConfig config, int bucketSize,
boolean batchMessageMode) {
- PartitionedRebalancerContext partitionedContext =
- config.getRebalancerContext(PartitionedRebalancerContext.class);
- if (partitionedContext != null) {
- IdealState idealState = new IdealState(partitionedContext.getResourceId());
- idealState.setRebalanceMode(partitionedContext.getRebalanceMode());
- idealState.setRebalancerRef(partitionedContext.getRebalancerRef());
+ PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
+ if (partitionedConfig != null) {
+ IdealState idealState = new IdealState(partitionedConfig.getResourceId());
+ idealState.setRebalanceMode(partitionedConfig.getRebalanceMode());
+ idealState.setRebalancerRef(partitionedConfig.getRebalancerRef());
String replicas = null;
- if (partitionedContext.anyLiveParticipant()) {
+ if (partitionedConfig.anyLiveParticipant()) {
replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
} else {
- replicas = Integer.toString(partitionedContext.getReplicaCount());
+ replicas = Integer.toString(partitionedConfig.getReplicaCount());
}
idealState.setReplicas(replicas);
- idealState.setNumPartitions(partitionedContext.getPartitionSet().size());
- idealState.setInstanceGroupTag(partitionedContext.getParticipantGroupTag());
- idealState.setMaxPartitionsPerInstance(partitionedContext.getMaxPartitionsPerParticipant());
- idealState.setStateModelDefId(partitionedContext.getStateModelDefId());
- idealState.setStateModelFactoryId(partitionedContext.getStateModelFactoryId());
+ idealState.setNumPartitions(partitionedConfig.getPartitionSet().size());
+ idealState.setInstanceGroupTag(partitionedConfig.getParticipantGroupTag());
+ idealState.setMaxPartitionsPerInstance(partitionedConfig.getMaxPartitionsPerParticipant());
+ idealState.setStateModelDefId(partitionedConfig.getStateModelDefId());
+ idealState.setStateModelFactoryId(partitionedConfig.getStateModelFactoryId());
idealState.setBucketSize(bucketSize);
idealState.setBatchMessageMode(batchMessageMode);
- if (partitionedContext.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
- SemiAutoRebalancerContext semiAutoContext =
- config.getRebalancerContext(SemiAutoRebalancerContext.class);
- for (PartitionId partitionId : semiAutoContext.getPartitionSet()) {
- idealState.setPreferenceList(partitionId, semiAutoContext.getPreferenceList(partitionId));
+ if (partitionedConfig.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+ SemiAutoRebalancerConfig semiAutoConfig =
+ BasicRebalancerConfig.convert(config, SemiAutoRebalancerConfig.class);
+ for (PartitionId partitionId : semiAutoConfig.getPartitionSet()) {
+ idealState.setPreferenceList(partitionId, semiAutoConfig.getPreferenceList(partitionId));
}
- } else if (partitionedContext.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
- CustomRebalancerContext customContext =
- config.getRebalancerContext(CustomRebalancerContext.class);
- for (PartitionId partitionId : customContext.getPartitionSet()) {
- idealState.setParticipantStateMap(partitionId,
- customContext.getPreferenceMap(partitionId));
+ } else if (partitionedConfig.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+ CustomRebalancerConfig customConfig =
+ BasicRebalancerConfig.convert(config, CustomRebalancerConfig.class);
+ for (PartitionId partitionId : customConfig.getPartitionSet()) {
+ idealState
+ .setParticipantStateMap(partitionId, customConfig.getPreferenceMap(partitionId));
}
} else {
- for (PartitionId partitionId : partitionedContext.getPartitionSet()) {
+ for (PartitionId partitionId : partitionedConfig.getPartitionSet()) {
List<ParticipantId> preferenceList = Collections.emptyList();
idealState.setPreferenceList(partitionId, preferenceList);
Map<ParticipantId, State> participantStateMap = Collections.emptyMap();
@@ -425,7 +440,7 @@ public class ResourceAccessor {
ResourceConfiguration resourceConfiguration, IdealState idealState,
ExternalView externalView, ResourceAssignment resourceAssignment) {
UserConfig userConfig;
- RebalancerContext rebalancerContext = null;
+ RebalancerConfig rebalancerConfig = null;
ResourceType type = ResourceType.DATA;
if (resourceConfiguration != null) {
userConfig = resourceConfiguration.getUserConfig();
@@ -437,14 +452,14 @@ public class ResourceAccessor {
boolean batchMessageMode = false;
if (idealState != null) {
if (resourceConfiguration != null
- && idealState.getRebalanceMode() != RebalanceMode.USER_DEFINED) {
- // prefer rebalancer context for non-user_defined data rebalancing
- rebalancerContext =
- resourceConfiguration.getRebalancerContext(PartitionedRebalancerContext.class);
+ && idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+ // prefer rebalancer config for user_defined data rebalancing
+ rebalancerConfig =
+ resourceConfiguration.getRebalancerConfig(PartitionedRebalancerConfig.class);
}
- if (rebalancerContext == null) {
+ if (rebalancerConfig == null) {
// prefer ideal state for non-user_defined data rebalancing
- rebalancerContext = PartitionedRebalancerContext.from(idealState);
+ rebalancerConfig = PartitionedRebalancerConfig.from(idealState);
}
bucketSize = idealState.getBucketSize();
batchMessageMode = idealState.getBatchMessageMode();
@@ -452,14 +467,13 @@ public class ResourceAccessor {
} else if (resourceConfiguration != null) {
bucketSize = resourceConfiguration.getBucketSize();
batchMessageMode = resourceConfiguration.getBatchMessageMode();
- RebalancerConfig rebalancerConfig = new RebalancerConfig(resourceConfiguration);
- rebalancerContext = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+ rebalancerConfig = resourceConfiguration.getRebalancerConfig(RebalancerConfig.class);
}
- if (rebalancerContext == null) {
- rebalancerContext = new PartitionedRebalancerContext();
+ if (rebalancerConfig == null) {
+ rebalancerConfig = new PartitionedRebalancerConfig();
}
return new Resource(resourceId, type, idealState, resourceAssignment, externalView,
- rebalancerContext, userConfig, bucketSize, batchMessageMode);
+ rebalancerConfig, userConfig, bucketSize, batchMessageMode);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
index 38d48ab..b148a49 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
@@ -7,8 +7,7 @@ import org.apache.helix.api.Partition;
import org.apache.helix.api.Scope;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import com.google.common.collect.Sets;
@@ -80,7 +79,7 @@ public class ResourceConfig {
* @return map of subunit id to subunit or empty map if none
*/
public Map<? extends PartitionId, ? extends Partition> getSubUnitMap() {
- return _rebalancerConfig.getRebalancerContext(RebalancerContext.class).getSubUnitMap();
+ return _rebalancerConfig.getSubUnitMap();
}
/**
@@ -167,7 +166,7 @@ public class ResourceConfig {
public static class Delta {
private enum Fields {
TYPE,
- REBALANCER_CONTEXT,
+ REBALANCER_CONFIG,
USER_CONFIG,
BUCKET_SIZE,
BATCH_MESSAGE_MODE
@@ -198,12 +197,12 @@ public class ResourceConfig {
/**
* Set the rebalancer configuration
- * @param context properties of interest for rebalancing
+ * @param config properties of interest for rebalancing
* @return Delta
*/
- public Delta setRebalancerContext(RebalancerContext context) {
- _builder.rebalancerContext(context);
- _updateFields.add(Fields.REBALANCER_CONTEXT);
+ public Delta setRebalancerConfig(RebalancerConfig config) {
+ _builder.rebalancerConfig(config);
+ _updateFields.add(Fields.REBALANCER_CONFIG);
return this;
}
@@ -248,10 +247,8 @@ public class ResourceConfig {
public ResourceConfig mergeInto(ResourceConfig orig) {
ResourceConfig deltaConfig = _builder.build();
Builder builder =
- new Builder(orig.getId())
- .type(orig.getType())
- .rebalancerContext(
- orig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class))
+ new Builder(orig.getId()).type(orig.getType())
+ .rebalancerConfig(orig.getRebalancerConfig())
.schedulerTaskConfig(orig.getSchedulerTaskConfig()).userConfig(orig.getUserConfig())
.bucketSize(orig.getBucketSize()).batchMessageMode(orig.getBatchMessageMode());
for (Fields field : _updateFields) {
@@ -259,9 +256,8 @@ public class ResourceConfig {
case TYPE:
builder.type(deltaConfig.getType());
break;
- case REBALANCER_CONTEXT:
- builder.rebalancerContext(deltaConfig.getRebalancerConfig().getRebalancerContext(
- RebalancerContext.class));
+ case REBALANCER_CONFIG:
+ builder.rebalancerConfig(deltaConfig.getRebalancerConfig());
break;
case USER_CONFIG:
builder.userConfig(deltaConfig.getUserConfig());
@@ -314,11 +310,11 @@ public class ResourceConfig {
/**
* Set the rebalancer configuration
- * @param rebalancerContext properties of interest for rebalancing
+ * @param rebalancerConfig properties of interest for rebalancing
* @return Builder
*/
- public Builder rebalancerContext(RebalancerContext rebalancerContext) {
- _rebalancerConfig = new RebalancerConfig(rebalancerContext);
+ public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
+ _rebalancerConfig = rebalancerConfig;
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/api/id/ContextId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/id/ContextId.java b/helix-core/src/main/java/org/apache/helix/api/id/ContextId.java
new file mode 100644
index 0000000..29a72b7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/id/ContextId.java
@@ -0,0 +1,57 @@
+package org.apache.helix.api.id;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Identifies a context
+ */
+public class ContextId extends Id {
+ @JsonProperty("id")
+ final private String _id;
+
+ /**
+ * Create a context id
+ * @param id string representation of the id
+ */
+ @JsonCreator
+ public ContextId(@JsonProperty("id") String id) {
+ _id = id;
+ }
+
+ @Override
+ public String stringify() {
+ return _id;
+ }
+
+ /**
+ * Get a concrete context id for a string name
+ * @param contextId string context identifier
+ * @return ContextId
+ */
+ public static ContextId from(String contextId) {
+ if (contextId == null) {
+ return null;
+ }
+ return new ContextId(contextId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index eec745e..96be0fa 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -44,18 +44,19 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.PipelineRegistry;
-import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.CompatibilityCheckStage;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.ExternalViewComputeStage;
import org.apache.helix.controller.stages.MessageGenerationStage;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.PersistAssignmentStage;
+import org.apache.helix.controller.stages.PersistContextStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.controller.stages.TaskAssignmentStage;
-import org.apache.helix.controller.stages.PersistAssignmentStage;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HealthStat;
@@ -183,11 +184,12 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
rebalancePipeline.addStage(new ResourceComputationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new PersistAssignmentStage());
rebalancePipeline.addStage(new MessageGenerationStage());
rebalancePipeline.addStage(new MessageSelectionStage());
rebalancePipeline.addStage(new MessageThrottleStage());
rebalancePipeline.addStage(new TaskAssignmentStage());
+ rebalancePipeline.addStage(new PersistAssignmentStage());
+ rebalancePipeline.addStage(new PersistContextStage());
// external view generation
Pipeline externalViewPipeline = new Pipeline();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/context/BasicControllerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/context/BasicControllerContext.java b/helix-core/src/main/java/org/apache/helix/controller/context/BasicControllerContext.java
new file mode 100644
index 0000000..2d98347
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/context/BasicControllerContext.java
@@ -0,0 +1,60 @@
+package org.apache.helix.controller.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.id.ContextId;
+import org.apache.helix.controller.serializer.DefaultStringSerializer;
+import org.apache.helix.controller.serializer.StringSerializer;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * A simple context that can be serialized by a {@link DefaultStringSerializer}
+ */
+public class BasicControllerContext implements ControllerContext {
+ private final ContextId _id;
+ private Class<? extends StringSerializer> _serializer;
+
+ /**
+ * Instantiate with an id
+ * @param id ContextId, unique among all contexts in this cluster
+ */
+ public BasicControllerContext(@JsonProperty("id") ContextId id) {
+ _id = id;
+ _serializer = DefaultStringSerializer.class;
+ }
+
+ /**
+ * Set the class that can serialize this object into a String, and back
+ * @param serializer StringSerializer implementation class
+ */
+ public void setSerializerClass(Class<? extends StringSerializer> serializer) {
+ _serializer = serializer;
+ }
+
+ @Override
+ public Class<? extends StringSerializer> getSerializerClass() {
+ return _serializer;
+ }
+
+ @Override
+ public ContextId getId() {
+ return _id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContext.java b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContext.java
new file mode 100644
index 0000000..b1be5c2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContext.java
@@ -0,0 +1,40 @@
+package org.apache.helix.controller.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.id.ContextId;
+import org.apache.helix.controller.serializer.StringSerializer;
+
+/**
+ * Controller stages can implement this interface to set and get persistent state
+ */
+public interface ControllerContext {
+ /**
+ * Get the identifier of this context
+ * @return {@link ContextId}
+ */
+ ContextId getId();
+
+ /**
+ * Get the class that can be used to convert this object to and from a String
+ * @return {@link StringSerializer} implementation class
+ */
+ Class<? extends StringSerializer> getSerializerClass();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextHolder.java b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextHolder.java
new file mode 100644
index 0000000..8c1d03a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextHolder.java
@@ -0,0 +1,156 @@
+package org.apache.helix.controller.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.serializer.StringSerializer;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Wrap a {@link ControllerContext} so that it can be persisted in the backing store
+ */
+public class ControllerContextHolder extends HelixProperty {
+ private enum Fields {
+ SERIALIZER_CLASS,
+ CONTEXT,
+ CONTEXT_CLASS
+ }
+
+ private static final Logger LOG = Logger.getLogger(ControllerContextHolder.class);
+
+ private final ControllerContext _context;
+
+ /**
+ * Instantiate from a populated ControllerContext
+ * @param context instantiated context
+ */
+ public ControllerContextHolder(ControllerContext context) {
+ super(context.getId().toString());
+ _context = context;
+ StringSerializer serializer = instantiateSerializerFromContext(_context);
+ if (_context != null && serializer != null) {
+ _record.setSimpleField(Fields.SERIALIZER_CLASS.toString(), _context.getSerializerClass()
+ .getName());
+ _record.setSimpleField(Fields.CONTEXT_CLASS.toString(), _context.getClass().getName());
+ _record.setSimpleField(Fields.CONTEXT.toString(), serializer.serialize(_context));
+ }
+ }
+
+ /**
+ * Instantiate from a record
+ * @param record populated ZNRecord
+ */
+ public ControllerContextHolder(ZNRecord record) {
+ super(record);
+ StringSerializer serializer =
+ instantiateSerializerFromName(_record.getSimpleField(Fields.SERIALIZER_CLASS.toString()));
+ _context =
+ loadContext(serializer, _record.getSimpleField(Fields.CONTEXT_CLASS.toString()),
+ _record.getSimpleField(Fields.CONTEXT.toString()));
+ }
+
+ /**
+ * Get a ControllerContext
+ * @return instantiated {@link ControllerContext}
+ */
+ public ControllerContext getContext() {
+ return _context;
+ }
+
+ /**
+ * Get a ControllerContext as a specific subtyple
+ * @return instantiated typed {@link ControllerContext}, or null
+ */
+ public <T extends ControllerContext> T getContext(Class<T> contextClass) {
+ if (_context != null) {
+ try {
+ return contextClass.cast(_context);
+ } catch (ClassCastException e) {
+ LOG.warn(contextClass + " is incompatible with context class: " + _context.getClass());
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Instantiate a StringSerializer that can serialize the context
+ * @param context instantiated ControllerContext
+ * @return StringSerializer object, or null if it could not be instantiated
+ */
+ private StringSerializer instantiateSerializerFromContext(ControllerContext context) {
+ if (context == null) {
+ return null;
+ }
+ try {
+ return context.getSerializerClass().newInstance();
+ } catch (InstantiationException e) {
+ LOG.error("Serializer could not be instatiated", e);
+ } catch (IllegalAccessException e) {
+ LOG.error("Serializer default constructor not accessible", e);
+ }
+ return null;
+ }
+
+ /**
+ * Instantiate a StringSerializer from its class name
+ * @param serializerClassName name of a StringSerializer implementation class
+ * @return instantiated StringSerializer, or null if it could not be instantiated
+ */
+ private StringSerializer instantiateSerializerFromName(String serializerClassName) {
+ if (serializerClassName != null) {
+ try {
+ return (StringSerializer) HelixUtil.loadClass(getClass(), serializerClassName)
+ .newInstance();
+ } catch (InstantiationException e) {
+ LOG.error("Error getting the serializer", e);
+ } catch (IllegalAccessException e) {
+ LOG.error("Error getting the serializer", e);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Error getting the serializer", e);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Deserialize a context
+ * @param serializer StringSerializer that can deserialize the context
+ * @param className the name of the context class
+ * @param contextData the raw context data as a String
+ * @return ControllerContext, or null if there was a conversion issue
+ */
+ private ControllerContext loadContext(StringSerializer serializer, String className,
+ String contextData) {
+ if (serializer != null && className != null && contextData != null) {
+ try {
+ Class<? extends ControllerContext> contextClass =
+ HelixUtil.loadClass(getClass(), className).asSubclass(ControllerContext.class);
+ return serializer.deserialize(contextClass, contextData);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Could not convert the persisted data into a " + className, e);
+ } catch (ClassCastException e) {
+ LOG.error("Could not convert the persisted data into a " + className, e);
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextProvider.java b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextProvider.java
new file mode 100644
index 0000000..1541585
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/context/ControllerContextProvider.java
@@ -0,0 +1,120 @@
+package org.apache.helix.controller.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.api.id.ContextId;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+
+/**
+ * An interface for getting and setting {@link ControllerContext} objects, which will eventually
+ * be persisted and acessible across runs of the controller pipeline.
+ */
+public class ControllerContextProvider {
+ private static final Logger LOG = Logger.getLogger(ControllerContextProvider.class);
+
+ private Map<ContextId, ControllerContext> _persistedContexts;
+ private Map<ContextId, ControllerContext> _pendingContexts;
+
+ /**
+ * Instantiate with already-persisted controller contexts
+ * @param contexts
+ */
+ public ControllerContextProvider(Map<ContextId, ControllerContext> contexts) {
+ _persistedContexts = contexts != null ? contexts : new HashMap<ContextId, ControllerContext>();
+ _pendingContexts = Maps.newHashMap();
+ }
+
+ /**
+ * Get a base ControllerContext
+ * @param contextId the context id to look up
+ * @return a ControllerContext, or null if not found
+ */
+ public ControllerContext getControllerContext(ContextId contextId) {
+ return getControllerContext(contextId, ControllerContext.class);
+ }
+
+ /**
+ * Get a typed ControllerContext
+ * @param contextId the context id to look up
+ * @param contextClass the class which the context should be returned as
+ * @return a typed ControllerContext, or null if no context with given id is available for this
+ * type
+ */
+ public <T extends ControllerContext> T getControllerContext(ContextId contextId,
+ Class<T> contextClass) {
+ try {
+ if (_pendingContexts.containsKey(contextId)) {
+ return contextClass.cast(_pendingContexts.get(contextId));
+ } else if (_persistedContexts.containsKey(contextId)) {
+ return contextClass.cast(_persistedContexts.get(contextId));
+ }
+ } catch (ClassCastException e) {
+ LOG.error("Could not convert context " + contextId + " into " + contextClass.getName());
+ }
+ return null;
+ }
+
+ /**
+ * Put a controller context, overwriting any existing ones
+ * @param contextId the id to set
+ * @param context the context object
+ */
+ public void putControllerContext(ContextId contextId, ControllerContext context) {
+ putControllerContext(contextId, context, true);
+ }
+
+ /**
+ * Put a controller context, specifying overwrite behavior
+ * @param contextId the id to set
+ * @param context the context object
+ * @param overwriteAllowed true if existing objects can be overwritten, false otherwise
+ * @return true if saved, false if an object with that id exists and overwrite is not allowed
+ */
+ public boolean putControllerContext(ContextId contextId, ControllerContext context,
+ boolean overwriteAllowed) {
+ if (overwriteAllowed || !exists(contextId)) {
+ _pendingContexts.put(contextId, context);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Check if a context exists
+ * @param contextId the id to look up
+ * @return true if a context exists with that id, false otherwise
+ */
+ public boolean exists(ContextId contextId) {
+ return _persistedContexts.containsKey(contextId) || _pendingContexts.containsKey(contextId);
+ }
+
+ /**
+ * Get all contexts that have been put, but not yet persisted
+ * @return a map of context id to context
+ */
+ public Map<ContextId, ControllerContext> getPendingContexts() {
+ return _pendingContexts;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 5209e2c..6629bec 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -8,8 +8,10 @@ import org.apache.helix.api.Cluster;
import org.apache.helix.api.State;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.model.ResourceAssignment;
@@ -40,15 +42,15 @@ public class CustomRebalancer implements HelixRebalancer {
private static final Logger LOG = Logger.getLogger(CustomRebalancer.class);
@Override
- public void init(HelixManager helixManager) {
+ public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
// do nothing
}
@Override
public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
- Cluster cluster, ResourceCurrentState currentState) {
- CustomRebalancerContext config =
- rebalancerConfig.getRebalancerContext(CustomRebalancerContext.class);
+ ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+ CustomRebalancerConfig config =
+ BasicRebalancerConfig.convert(rebalancerConfig, CustomRebalancerConfig.class);
StateModelDefinition stateModelDef =
cluster.getStateModelMap().get(config.getStateModelDefId());
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
index fc4bfa0..3aa41d7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
@@ -30,8 +30,9 @@ import org.apache.helix.api.State;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -54,31 +55,30 @@ public class FallbackRebalancer implements HelixRebalancer {
private HelixManager _helixManager;
@Override
- public void init(HelixManager helixManager) {
+ public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
_helixManager = helixManager;
}
@Override
public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
- Cluster cluster, ResourceCurrentState currentState) {
+ ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
// make sure the manager is not null
if (_helixManager == null) {
LOG.info("HelixManager is null!");
return null;
}
- // get the context
- PartitionedRebalancerContext context =
- rebalancerConfig.getRebalancerContext(PartitionedRebalancerContext.class);
- if (context == null) {
+ // get the config
+ PartitionedRebalancerConfig config = PartitionedRebalancerConfig.from(rebalancerConfig);
+ if (config == null) {
LOG.info("Resource is not partitioned");
return null;
}
// get the ideal state and rebalancer class
- ResourceId resourceId = context.getResourceId();
+ ResourceId resourceId = config.getResourceId();
StateModelDefinition stateModelDef =
- cluster.getStateModelMap().get(context.getStateModelDefId());
+ cluster.getStateModelMap().get(config.getStateModelDefId());
if (stateModelDef == null) {
LOG.info("StateModelDefinition unavailable for " + resourceId);
return null;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
index 6d7b0ef..0c55d45 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
@@ -16,8 +16,10 @@ import org.apache.helix.api.Participant;
import org.apache.helix.api.State;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
@@ -56,15 +58,15 @@ public class FullAutoRebalancer implements HelixRebalancer {
private static Logger LOG = Logger.getLogger(FullAutoRebalancer.class);
@Override
- public void init(HelixManager helixManager) {
+ public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
// do nothing
}
@Override
public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
- Cluster cluster, ResourceCurrentState currentState) {
- FullAutoRebalancerContext config =
- rebalancerConfig.getRebalancerContext(FullAutoRebalancerContext.class);
+ ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+ FullAutoRebalancerConfig config =
+ BasicRebalancerConfig.convert(rebalancerConfig, FullAutoRebalancerConfig.class);
StateModelDefinition stateModelDef =
cluster.getStateModelMap().get(config.getStateModelDefId());
// Compute a preference list based on the current ideal state
@@ -181,7 +183,7 @@ public class FullAutoRebalancer implements HelixRebalancer {
}
private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
- FullAutoRebalancerContext config, ResourceCurrentState currentStateOutput,
+ FullAutoRebalancerConfig config, ResourceCurrentState currentStateOutput,
Map<State, Integer> stateCountMap) {
Map<PartitionId, Map<ParticipantId, State>> map =
new HashMap<PartitionId, Map<ParticipantId, State>>();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
index 7fcbba5..1fbb02f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/HelixRebalancer.java
@@ -2,7 +2,8 @@ package org.apache.helix.controller.rebalancer;
import org.apache.helix.HelixManager;
import org.apache.helix.api.Cluster;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.model.ResourceAssignment;
@@ -32,10 +33,11 @@ import org.apache.helix.model.ResourceAssignment;
*/
public interface HelixRebalancer {
/**
- * Initialize the rebalancer with a HelixManager if necessary
- * @param manager
+ * Initialize the rebalancer with a HelixManager and ControllerContextProvider if necessary
+ * @param manager HelixManager instance
+ * @param contextProvider An object that supports getting and setting context across pipeline runs
*/
- public void init(HelixManager helixManager);
+ public void init(HelixManager helixManager, ControllerContextProvider contextProvider);
/**
* Given an ideal state for a resource and liveness of participants, compute a assignment of
@@ -47,18 +49,20 @@ public interface HelixRebalancer {
* Say that you have:<br/>
*
* <pre>
- * class MyRebalancerContext implements RebalancerContext
+ * class MyRebalancerConfig implements RebalancerConfig
* </pre>
*
- * as your rebalancer context. To extract it from a RebalancerConfig, do the following:<br/>
+ * as your rebalancer config. To get a typed version, you can do the following:<br/>
*
* <pre>
- * MyRebalancerContext context = rebalancerConfig.getRebalancerContext(MyRebalancerContext.class);
+ * MyRebalancerConfig config = BasicRebalancerConfig.convert(rebalancerConfig,
+ * MyRebalancerConfig.class);
* </pre>
* @param rebalancerConfig the properties of the resource for which a mapping will be computed
+ * @param prevAssignment the previous ResourceAssignment of this cluster, or null if none
* @param cluster complete snapshot of the cluster
* @param currentState the current states of all partitions
*/
public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
- Cluster cluster, ResourceCurrentState currentState);
+ ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index a0ad6f3..51ca463 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -9,9 +9,10 @@ import org.apache.helix.api.Cluster;
import org.apache.helix.api.State;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.model.ResourceAssignment;
@@ -45,15 +46,15 @@ public class SemiAutoRebalancer implements HelixRebalancer {
private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
@Override
- public void init(HelixManager helixManager) {
+ public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
// do nothing
}
@Override
public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
- Cluster cluster, ResourceCurrentState currentState) {
- SemiAutoRebalancerContext config =
- rebalancerConfig.getRebalancerContext(SemiAutoRebalancerContext.class);
+ ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+ SemiAutoRebalancerConfig config =
+ BasicRebalancerConfig.convert(rebalancerConfig, SemiAutoRebalancerConfig.class);
StateModelDefinition stateModelDef =
cluster.getStateModelMap().get(config.getStateModelDefId());
if (LOG.isDebugEnabled()) {
@@ -65,8 +66,7 @@ public class SemiAutoRebalancer implements HelixRebalancer {
Map<ParticipantId, State> currentStateMap =
currentState.getCurrentStateMap(config.getResourceId(), partition);
Set<ParticipantId> disabledInstancesForPartition =
- ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
- partition);
+ ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(), partition);
List<ParticipantId> preferenceList =
ConstraintBasedAssignment.getPreferenceList(cluster, partition,
config.getPreferenceList(partition));
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java
new file mode 100644
index 0000000..d6aa10e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java
@@ -0,0 +1,256 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import java.util.Set;
+
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.serializer.DefaultStringSerializer;
+import org.apache.helix.controller.serializer.StringSerializer;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Abstract RebalancerConfig that functions for generic subunits. Use a subclass that more
+ * concretely defines the subunits.
+ */
+public abstract class BasicRebalancerConfig implements RebalancerConfig {
+ private ResourceId _resourceId;
+ private StateModelDefId _stateModelDefId;
+ private StateModelFactoryId _stateModelFactoryId;
+ private String _participantGroupTag;
+ private Class<? extends StringSerializer> _serializer;
+ private RebalancerRef _rebalancerRef;
+
+ /**
+ * Instantiate a basic rebalancer config
+ */
+ public BasicRebalancerConfig() {
+ _serializer = DefaultStringSerializer.class;
+ }
+
+ @Override
+ public ResourceId getResourceId() {
+ return _resourceId;
+ }
+
+ /**
+ * Set the resource to rebalance
+ * @param resourceId resource id
+ */
+ public void setResourceId(ResourceId resourceId) {
+ _resourceId = resourceId;
+ }
+
+ @Override
+ public StateModelDefId getStateModelDefId() {
+ return _stateModelDefId;
+ }
+
+ /**
+ * Set the state model definition that the resource follows
+ * @param stateModelDefId state model definition id
+ */
+ public void setStateModelDefId(StateModelDefId stateModelDefId) {
+ _stateModelDefId = stateModelDefId;
+ }
+
+ @Override
+ public StateModelFactoryId getStateModelFactoryId() {
+ return _stateModelFactoryId;
+ }
+
+ /**
+ * Set the state model factory that the resource uses
+ * @param stateModelFactoryId state model factory id
+ */
+ public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+ _stateModelFactoryId = stateModelFactoryId;
+ }
+
+ @Override
+ public String getParticipantGroupTag() {
+ return _participantGroupTag;
+ }
+
+ /**
+ * Set a tag that participants must have in order to serve this resource
+ * @param participantGroupTag string group tag
+ */
+ public void setParticipantGroupTag(String participantGroupTag) {
+ _participantGroupTag = participantGroupTag;
+ }
+
+ /**
+ * Get the serializer. If none is provided, {@link DefaultStringSerializer} is used
+ */
+ @Override
+ public Class<? extends StringSerializer> getSerializerClass() {
+ return _serializer;
+ }
+
+ /**
+ * Set the class that can serialize this config
+ * @param serializer serializer class that implements StringSerializer
+ */
+ public void setSerializerClass(Class<? extends StringSerializer> serializer) {
+ _serializer = serializer;
+ }
+
+ @Override
+ @JsonIgnore
+ public Set<? extends PartitionId> getSubUnitIdSet() {
+ return getSubUnitMap().keySet();
+ }
+
+ @Override
+ @JsonIgnore
+ public Partition getSubUnit(PartitionId subUnitId) {
+ return getSubUnitMap().get(subUnitId);
+ }
+
+ @Override
+ public RebalancerRef getRebalancerRef() {
+ return _rebalancerRef;
+ }
+
+ /**
+ * Set the reference to the class used to rebalance this resource
+ * @param rebalancerRef RebalancerRef instance
+ */
+ public void setRebalancerRef(RebalancerRef rebalancerRef) {
+ _rebalancerRef = rebalancerRef;
+ }
+
+ /**
+ * Safely cast a RebalancerConfig into a subtype
+ * @param config RebalancerConfig object
+ * @param clazz the target class
+ * @return An instance of clazz, or null if the conversion is not possible
+ */
+ public static <T extends RebalancerConfig> T convert(RebalancerConfig config, Class<T> clazz) {
+ try {
+ return clazz.cast(config);
+ } catch (ClassCastException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Abstract builder for the base rebalancer config
+ */
+ public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
+ private final ResourceId _resourceId;
+ private StateModelDefId _stateModelDefId;
+ private StateModelFactoryId _stateModelFactoryId;
+ private String _participantGroupTag;
+ private Class<? extends StringSerializer> _serializerClass;
+ private RebalancerRef _rebalancerRef;
+
+ /**
+ * Instantiate with a resource id
+ * @param resourceId resource id
+ */
+ public AbstractBuilder(ResourceId resourceId) {
+ _resourceId = resourceId;
+ _serializerClass = DefaultStringSerializer.class;
+ }
+
+ /**
+ * Set the state model definition that the resource should follow
+ * @param stateModelDefId state model definition id
+ * @return Builder
+ */
+ public T stateModelDefId(StateModelDefId stateModelDefId) {
+ _stateModelDefId = stateModelDefId;
+ return self();
+ }
+
+ /**
+ * Set the state model factory that the resource should use
+ * @param stateModelFactoryId state model factory id
+ * @return Builder
+ */
+ public T stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+ _stateModelFactoryId = stateModelFactoryId;
+ return self();
+ }
+
+ /**
+ * Set the tag that all participants require in order to serve this resource
+ * @param participantGroupTag the tag
+ * @return Builder
+ */
+ public T participantGroupTag(String participantGroupTag) {
+ _participantGroupTag = participantGroupTag;
+ return self();
+ }
+
+ /**
+ * Set the serializer class for this rebalancer config
+ * @param serializerClass class that implements StringSerializer
+ * @return Builder
+ */
+ public T serializerClass(Class<? extends StringSerializer> serializerClass) {
+ _serializerClass = serializerClass;
+ return self();
+ }
+
+ /**
+ * Specify a custom class to use for rebalancing
+ * @param rebalancerRef RebalancerRef instance
+ * @return Builder
+ */
+ public T rebalancerRef(RebalancerRef rebalancerRef) {
+ _rebalancerRef = rebalancerRef;
+ return self();
+ }
+
+ /**
+ * Update an existing context with base fields
+ * @param config derived config
+ */
+ protected final void update(BasicRebalancerConfig config) {
+ config.setResourceId(_resourceId);
+ config.setStateModelDefId(_stateModelDefId);
+ config.setStateModelFactoryId(_stateModelFactoryId);
+ config.setParticipantGroupTag(_participantGroupTag);
+ config.setSerializerClass(_serializerClass);
+ config.setRebalancerRef(_rebalancerRef);
+ }
+
+ /**
+ * Get a typed reference to "this" class. Final derived classes should simply return the this
+ * reference.
+ * @return this for the most specific type
+ */
+ protected abstract T self();
+
+ /**
+ * Get the rebalancer config from the built fields
+ * @return RebalancerConfig
+ */
+ public abstract RebalancerConfig build();
+ }
+}
[2/4] [HELIX-327] Simplify rebalancer, rename rebalancer configs,
support settable contexts, rb=15981
Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
deleted file mode 100644
index aa872c4..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
+++ /dev/null
@@ -1,182 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.config.NamespacedConfig;
-import org.apache.helix.controller.rebalancer.HelixRebalancer;
-import org.apache.helix.model.ResourceConfiguration;
-import org.apache.helix.util.HelixUtil;
-import org.apache.log4j.Logger;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Configuration for a resource rebalancer. This contains a RebalancerContext, which contains
- * information specific to each rebalancer.
- */
-public final class RebalancerConfig {
- private enum Fields {
- SERIALIZER_CLASS,
- REBALANCER_CONTEXT,
- REBALANCER_CONTEXT_CLASS
- }
-
- private static final Logger LOG = Logger.getLogger(RebalancerConfig.class);
- private ContextSerializer _serializer;
- private HelixRebalancer _rebalancer;
- private final RebalancerContext _context;
- private final NamespacedConfig _config;
-
- /**
- * Instantiate a RebalancerConfig
- * @param context rebalancer context
- * @param rebalancerRef reference to the rebalancer class that will be used
- */
- public RebalancerConfig(RebalancerContext context) {
- _config =
- new NamespacedConfig(Scope.resource(context.getResourceId()),
- RebalancerConfig.class.getSimpleName());
- _config.setSimpleField(Fields.SERIALIZER_CLASS.toString(), context.getSerializerClass()
- .getName());
- _config
- .setSimpleField(Fields.REBALANCER_CONTEXT_CLASS.toString(), context.getClass().getName());
- _context = context;
- try {
- _serializer = context.getSerializerClass().newInstance();
- _config.setSimpleField(Fields.REBALANCER_CONTEXT.toString(), _serializer.serialize(context));
- } catch (InstantiationException e) {
- LOG.error("Error initializing the configuration", e);
- } catch (IllegalAccessException e) {
- LOG.error("Error initializing the configuration", e);
- }
- }
-
- /**
- * Instantiate from a physical ResourceConfiguration
- * @param resourceConfiguration populated ResourceConfiguration
- */
- public RebalancerConfig(ResourceConfiguration resourceConfiguration) {
- _config = new NamespacedConfig(resourceConfiguration, RebalancerConfig.class.getSimpleName());
- _serializer = getSerializer();
- _context = getContext();
- }
-
- /**
- * Get the class that can serialize and deserialize the rebalancer context
- * @return ContextSerializer
- */
- private ContextSerializer getSerializer() {
- String serializerClassName = _config.getSimpleField(Fields.SERIALIZER_CLASS.toString());
- if (serializerClassName != null) {
- try {
- return (ContextSerializer) HelixUtil.loadClass(getClass(), serializerClassName)
- .newInstance();
- } catch (InstantiationException e) {
- LOG.error("Error getting the serializer", e);
- } catch (IllegalAccessException e) {
- LOG.error("Error getting the serializer", e);
- } catch (ClassNotFoundException e) {
- LOG.error("Error getting the serializer", e);
- }
- }
- return null;
- }
-
- private RebalancerContext getContext() {
- String className = _config.getSimpleField(Fields.REBALANCER_CONTEXT_CLASS.toString());
- if (className != null) {
- try {
- Class<? extends RebalancerContext> contextClass =
- HelixUtil.loadClass(getClass(), className).asSubclass(RebalancerContext.class);
- String serialized = _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
- return _serializer.deserialize(contextClass, serialized);
- } catch (ClassNotFoundException e) {
- LOG.error(className + " is not a valid class");
- } catch (ClassCastException e) {
- LOG.error(className + " does not implement RebalancerContext");
- }
- }
- return null;
- }
-
- /**
- * Get a rebalancer class instance
- * @return Rebalancer
- */
- public HelixRebalancer getRebalancer() {
- // cache the rebalancer to avoid loading and instantiating it excessively
- if (_rebalancer == null) {
- if (_context == null || _context.getRebalancerRef() == null) {
- return null;
- }
- _rebalancer = _context.getRebalancerRef().getRebalancer();
- }
- return _rebalancer;
- }
-
- /**
- * Get the instantiated RebalancerContext
- * @param contextClass specific class of the RebalancerContext
- * @return RebalancerContext subclass instance, or null if conversion is not possible
- */
- public <T extends RebalancerContext> T getRebalancerContext(Class<T> contextClass) {
- if (_context != null) {
- try {
- return contextClass.cast(_context);
- } catch (ClassCastException e) {
- LOG.warn(contextClass + " is incompatible with context class: " + _context.getClass());
- }
- }
- return null;
- }
-
- /**
- * Get the rebalancer context serialized as a string
- * @return string representing the context
- */
- public String getSerializedContext() {
- return _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
- }
-
- /**
- * Convert this to a namespaced config
- * @return NamespacedConfig
- */
- public NamespacedConfig toNamespacedConfig() {
- return _config;
- }
-
- /**
- * Get a RebalancerConfig from a physical resource config
- * @param resourceConfiguration physical resource config
- * @return RebalancerConfig
- */
- public static RebalancerConfig from(ResourceConfiguration resourceConfiguration) {
- return new RebalancerConfig(resourceConfiguration);
- }
-
- /**
- * Get a RebalancerConfig from a RebalancerContext
- * @param context instantiated RebalancerContext
- * @return RebalancerConfig
- */
- public static RebalancerConfig from(RebalancerContext context) {
- return new RebalancerConfig(context);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
deleted file mode 100644
index 981891b..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Defines the state available to a rebalancer. The most common use case is to use a
- * {@link PartitionedRebalancerContext} or a subclass and set up a resource with it. A rebalancer
- * configuration, at a minimum, is aware of subunits of a resource, the state model to follow, and
- * how the configuration should be serialized.
- */
-public interface RebalancerContext {
- /**
- * Get a map of resource partition identifiers to partitions. A partition is a subunit of a
- * resource, e.g. a subtask of a task
- * @return map of (subunit id, subunit) pairs
- */
- public Map<? extends PartitionId, ? extends Partition> getSubUnitMap();
-
- /**
- * Get the subunits of the resource (e.g. partitions)
- * @return set of subunit ids
- */
- public Set<? extends PartitionId> getSubUnitIdSet();
-
- /**
- * Get a specific subunit
- * @param subUnitId the id of the subunit
- * @return SubUnit
- */
- public Partition getSubUnit(PartitionId subUnitId);
-
- /**
- * Get the resource to rebalance
- * @return resource id
- */
- public ResourceId getResourceId();
-
- /**
- * Get the state model definition that the resource follows
- * @return state model definition id
- */
- public StateModelDefId getStateModelDefId();
-
- /**
- * Get the state model factory of this resource
- * @return state model factory id
- */
- public StateModelFactoryId getStateModelFactoryId();
-
- /**
- * Get the tag, if any, that participants must have in order to serve this resource
- * @return participant group tag, or null
- */
- public String getParticipantGroupTag();
-
- /**
- * Get the serializer for this context
- * @return ContextSerializer class object
- */
- public Class<? extends ContextSerializer> getSerializerClass();
-
- /**
- * Get a reference to the class used to rebalance this resource
- * @return RebalancerRef
- */
- public RebalancerRef getRebalancerRef();
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
deleted file mode 100644
index 525931d..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Methods specifying a rebalancer context that allows replicas. For instance, a rebalancer context
- * with partitions may accept state model definitions that support multiple replicas per partition,
- * and it's possible that the policy is that each live participant in the system should have a
- * replica.
- */
-public interface ReplicatedRebalancerContext extends RebalancerContext {
- /**
- * Check if this resource should be assigned to any live participant
- * @return true if any live participant expected, false otherwise
- */
- public boolean anyLiveParticipant();
-
- /**
- * Get the number of replicas that each resource subunit should have
- * @return replica count
- */
- public int getReplicaCount();
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
deleted file mode 100644
index afa81e2..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
+++ /dev/null
@@ -1,178 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for SEMI_AUTO rebalancer mode. It indicates the preferred locations of each
- * partition replica. By default, it corresponds to {@link SemiAutoRebalancer}
- */
-public final class SemiAutoRebalancerContext extends PartitionedRebalancerContext {
- @JsonProperty("preferenceLists")
- private Map<PartitionId, List<ParticipantId>> _preferenceLists;
-
- /**
- * Instantiate a SemiAutoRebalancerContext
- */
- public SemiAutoRebalancerContext() {
- setRebalanceMode(RebalanceMode.SEMI_AUTO);
- setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
- _preferenceLists = Maps.newHashMap();
- }
-
- /**
- * Get the preference lists of all partitions of the resource
- * @return map of partition id to list of participant ids
- */
- public Map<PartitionId, List<ParticipantId>> getPreferenceLists() {
- return _preferenceLists;
- }
-
- /**
- * Set the preference lists of all partitions of the resource
- * @param preferenceLists
- */
- public void setPreferenceLists(Map<PartitionId, List<ParticipantId>> preferenceLists) {
- _preferenceLists = preferenceLists;
- }
-
- /**
- * Get the preference list of a partition
- * @param partitionId the partition to look up
- * @return list of participant ids
- */
- @JsonIgnore
- public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
- return _preferenceLists.get(partitionId);
- }
-
- /**
- * Generate preference lists based on a default cluster setup
- * @param stateModelDef the state model definition to follow
- * @param participantSet the set of participant ids to configure for
- */
- @Override
- @JsonIgnore
- public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
- Set<ParticipantId> participantSet) {
- // compute default upper bounds
- Map<State, String> upperBounds = Maps.newHashMap();
- for (State state : stateModelDef.getTypedStatesPriorityList()) {
- upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
- }
-
- // determine the current mapping
- Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
- for (PartitionId partitionId : getPartitionSet()) {
- List<ParticipantId> preferenceList = getPreferenceList(partitionId);
- if (preferenceList != null && !preferenceList.isEmpty()) {
- Set<ParticipantId> disabledParticipants = Collections.emptySet();
- Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
- Map<ParticipantId, State> initialMap =
- ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, participantSet,
- stateModelDef, preferenceList, emptyCurrentState, disabledParticipants);
- currentMapping.put(partitionId, initialMap);
- }
- }
-
- // determine the preference
- LinkedHashMap<State, Integer> stateCounts =
- ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
- getReplicaCount());
- ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
- List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
- List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
- AutoRebalanceStrategy strategy =
- new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
- getMaxPartitionsPerParticipant(), placementScheme);
- Map<String, List<String>> rawPreferenceLists =
- strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
- .getListFields();
- Map<PartitionId, List<ParticipantId>> preferenceLists =
- Maps.newHashMap(IdealState.preferenceListsFromStringLists(rawPreferenceLists));
- setPreferenceLists(preferenceLists);
- }
-
- /**
- * Build a SemiAutoRebalancerContext. By default, it corresponds to {@link SemiAutoRebalancer}
- */
- public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
- private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
-
- /**
- * Instantiate for a resource
- * @param resourceId resource id
- */
- public Builder(ResourceId resourceId) {
- super(resourceId);
- super.rebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
- super.rebalanceMode(RebalanceMode.SEMI_AUTO);
- _preferenceLists = Maps.newHashMap();
- }
-
- /**
- * Add a preference list for a partition
- * @param partitionId partition to set
- * @param preferenceList ordered list of participants who can serve the partition
- * @return Builder
- */
- public Builder preferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
- _preferenceLists.put(partitionId, preferenceList);
- return self();
- }
-
- @Override
- protected Builder self() {
- return this;
- }
-
- @Override
- public SemiAutoRebalancerContext build() {
- SemiAutoRebalancerContext context = new SemiAutoRebalancerContext();
- super.update(context);
- context.setPreferenceLists(_preferenceLists);
- return context;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/serializer/DefaultStringSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/serializer/DefaultStringSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/serializer/DefaultStringSerializer.java
new file mode 100644
index 0000000..d612451
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/serializer/DefaultStringSerializer.java
@@ -0,0 +1,82 @@
+package org.apache.helix.controller.serializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.StringWriter;
+
+import org.apache.helix.HelixException;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Default serializer implementation for converting to/from strings. Uses the Jackson JSON library
+ * to do the conversion
+ */
+public class DefaultStringSerializer implements StringSerializer {
+
+ private static Logger logger = Logger.getLogger(DefaultStringSerializer.class);
+
+ @Override
+ public <T> String serialize(final T data) {
+ if (data == null) {
+ return null;
+ }
+
+ ObjectMapper mapper = new ObjectMapper();
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+ serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+ serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+ StringWriter sw = new StringWriter();
+ try {
+ mapper.writeValue(sw, data);
+ } catch (Exception e) {
+ logger.error("Exception during payload data serialization.", e);
+ throw new HelixException(e);
+ }
+ return sw.toString();
+ }
+
+ @Override
+ public <T> T deserialize(final Class<T> clazz, final String string) {
+ if (string == null || string.length() == 0) {
+ return null;
+ }
+
+ ObjectMapper mapper = new ObjectMapper();
+ ByteArrayInputStream bais = new ByteArrayInputStream(string.getBytes());
+
+ DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
+ deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+ deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
+ deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_CREATORS, true);
+ deserializationConfig.set(DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+ try {
+ T payload = mapper.readValue(bais, clazz);
+ return payload;
+ } catch (Exception e) {
+ logger.error("Exception during deserialization of payload bytes: " + string, e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/serializer/StringSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/serializer/StringSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/serializer/StringSerializer.java
new file mode 100644
index 0000000..9311191
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/serializer/StringSerializer.java
@@ -0,0 +1,37 @@
+package org.apache.helix.controller.serializer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public interface StringSerializer {
+ /**
+ * Convert an object instance to a String
+ * @param data instance of an arbitrary type
+ * @return String representing the object
+ */
+ public <T> String serialize(final T data);
+
+ /**
+ * Convert raw bytes to a generic object instance
+ * @param clazz The class represented by the deserialized string
+ * @param string String representing the object
+ * @return instance of the generic type or null if the conversion failed
+ */
+ public <T> T deserialize(final Class<T> clazz, final String string);
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index ae0278b..5cedd7c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -26,5 +26,6 @@ public enum AttributeName {
MESSAGES_ALL,
MESSAGES_SELECTED,
MESSAGES_THROTTLE,
- LOCAL_STATE
+ LOCAL_STATE,
+ CONTEXT_PROVIDER,
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 7b143bd..ec812b2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -25,18 +25,19 @@ import java.util.Set;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Resource;
import org.apache.helix.api.State;
import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.context.ControllerContextProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.controller.rebalancer.FallbackRebalancer;
import org.apache.helix.controller.rebalancer.HelixRebalancer;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
@@ -173,18 +174,29 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
}
ResourceConfig resourceConfig = resourceMap.get(resourceId);
RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
- RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
- StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
+ StateModelDefinition stateModelDef =
+ stateModelDefs.get(rebalancerConfig.getStateModelDefId());
ResourceAssignment resourceAssignment = null;
if (rebalancerConfig != null) {
- HelixRebalancer rebalancer = rebalancerConfig.getRebalancer();
+ HelixRebalancer rebalancer = null;
+ if (rebalancerConfig != null && rebalancerConfig.getRebalancerRef() != null) {
+ rebalancer = rebalancerConfig.getRebalancerRef().getRebalancer();
+ }
HelixManager manager = event.getAttribute("helixmanager");
+ ControllerContextProvider provider =
+ event.getAttribute(AttributeName.CONTEXT_PROVIDER.toString());
if (rebalancer == null) {
rebalancer = new FallbackRebalancer();
}
- rebalancer.init(manager);
+ rebalancer.init(manager, provider);
+ ResourceAssignment currentAssignment = null;
+ Resource resourceSnapshot = cluster.getResource(resourceId);
+ if (resourceSnapshot != null) {
+ currentAssignment = resourceSnapshot.getResourceAssignment();
+ }
resourceAssignment =
- rebalancer.computeResourceMapping(rebalancerConfig, cluster, currentStateOutput);
+ rebalancer.computeResourceMapping(rebalancerConfig, currentAssignment, cluster,
+ currentStateOutput);
}
if (resourceAssignment == null) {
resourceAssignment =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index edceed6..7704378 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -44,8 +44,7 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
@@ -139,11 +138,8 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
// partitions are finished (COMPLETED or ERROR), update the status update of the original
// scheduler
// message, and then remove the partitions from the ideal state
- RebalancerContext rebalancerContext =
- (rebalancerConfig != null) ? rebalancerConfig
- .getRebalancerContext(RebalancerContext.class) : null;
- if (rebalancerContext != null
- && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
+ if (rebalancerConfig != null
+ && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
StateModelDefId.SchedulerTaskQueue)) {
updateScheduledTaskStatus(resourceId, view, manager, schedulerTaskConfig);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
index 08e6799..ef5a5fd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
@@ -39,7 +39,7 @@ import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.api.id.StateModelFactoryId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
@@ -76,9 +76,8 @@ public class MessageGenerationStage extends AbstractBaseStage {
ResourceConfig resourceConfig = resourceMap.get(resourceId);
int bucketSize = resourceConfig.getBucketSize();
- RebalancerContext rebalancerCtx =
- resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
- StateModelDefinition stateModelDef = stateModelDefMap.get(rebalancerCtx.getStateModelDefId());
+ RebalancerConfig rebalancerCfg = resourceConfig.getRebalancerConfig();
+ StateModelDefinition stateModelDef = stateModelDefMap.get(rebalancerCfg.getStateModelDefId());
ResourceAssignment resourceAssignment =
bestPossibleStateOutput.getResourceAssignment(resourceId);
@@ -132,16 +131,15 @@ public class MessageGenerationStage extends AbstractBaseStage {
SessionId sessionId =
cluster.getLiveParticipantMap().get(participantId).getRunningInstance()
.getSessionId();
- RebalancerContext rebalancerContext =
- resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
+ RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
Message message =
createMessage(manager, resourceId, subUnitId, participantId, currentState,
nextState, sessionId, StateModelDefId.from(stateModelDef.getId()),
- rebalancerContext.getStateModelFactoryId(), bucketSize);
+ rebalancerConfig.getStateModelFactoryId(), bucketSize);
// TODO refactor get/set timeout/inner-message
- if (rebalancerContext != null
- && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
+ if (rebalancerConfig != null
+ && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
StateModelDefId.SchedulerTaskQueue)) {
if (resourceConfig.getSubUnitMap().size() > 0) {
// TODO refactor it -- we need a way to read in scheduler tasks a priori
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index bbbf5c6..9adc833 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -38,9 +38,9 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.ReplicatedRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.ReplicatedRebalancerConfig;
import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -109,8 +109,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
for (ResourceId resourceId : resourceMap.keySet()) {
ResourceConfig resource = resourceMap.get(resourceId);
StateModelDefinition stateModelDef =
- stateModelDefMap.get(resource.getRebalancerConfig()
- .getRebalancerContext(RebalancerContext.class).getStateModelDefId());
+ stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
// TODO have a logical model for transition
Map<String, Integer> stateTransitionPriorities = getStateTransitionPriorityMap(stateModelDef);
@@ -266,9 +265,9 @@ public class MessageSelectionStage extends AbstractBaseStage {
*/
private Map<State, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
RebalancerConfig rebalancerConfig, Cluster cluster) {
- ReplicatedRebalancerContext context =
- (rebalancerConfig != null) ? rebalancerConfig
- .getRebalancerContext(ReplicatedRebalancerContext.class) : null;
+ ReplicatedRebalancerConfig config =
+ (rebalancerConfig != null) ? BasicRebalancerConfig.convert(rebalancerConfig,
+ ReplicatedRebalancerConfig.class) : null;
Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
List<State> statePriorityList = stateModelDefinition.getTypedStatesPriorityList();
@@ -282,11 +281,11 @@ public class MessageSelectionStage extends AbstractBaseStage {
} else if ("R".equals(numInstancesPerState)) {
// idealState is null when resource has been dropped,
// R can't be evaluated and ignore state constraints
- if (context != null) {
- if (context.anyLiveParticipant()) {
+ if (config != null) {
+ if (config.anyLiveParticipant()) {
max = cluster.getLiveParticipantMap().size();
} else {
- max = context.getReplicaCount();
+ max = config.getReplicaCount();
}
}
} else {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index 31dbb08..45fc355 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -1,12 +1,5 @@
package org.apache.helix.controller.stages;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.api.accessor.ResourceAccessor;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.model.ResourceAssignment;
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -26,6 +19,13 @@ import org.apache.helix.model.ResourceAssignment;
* under the License.
*/
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.accessor.ResourceAccessor;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.model.ResourceAssignment;
+
/**
* Persist the ResourceAssignment of each resource that went through rebalancing
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java
new file mode 100644
index 0000000..e63041a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java
@@ -0,0 +1,59 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.id.ContextId;
+import org.apache.helix.controller.context.ControllerContext;
+import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Persist all dirty contexts set in the controller pipeline
+ */
+public class PersistContextStage extends AbstractBaseStage {
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ HelixManager helixManager = event.getAttribute("helixmanager");
+ HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ ControllerContextProvider contextProvider =
+ event.getAttribute(AttributeName.CONTEXT_PROVIDER.toString());
+ Map<ContextId, ControllerContext> pendingContexts = contextProvider.getPendingContexts();
+ List<PropertyKey> keys = Lists.newArrayList();
+ List<ControllerContextHolder> properties = Lists.newArrayList();
+ for (ContextId contextId : pendingContexts.keySet()) {
+ ControllerContextHolder holder = new ControllerContextHolder(pendingContexts.get(contextId));
+ if (holder != null) {
+ keys.add(keyBuilder.controllerContext(contextId.stringify()));
+ properties.add(holder);
+ }
+ }
+ accessor.setChildren(keys, properties);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index 44fddb6..fb016f1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -19,16 +19,23 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import java.util.Map;
+
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.accessor.ClusterAccessor;
import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ContextId;
+import org.apache.helix.controller.context.ControllerContext;
+import org.apache.helix.controller.context.ControllerContextProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.log4j.Logger;
+import com.google.common.collect.Maps;
+
public class ReadClusterDataStage extends AbstractBaseStage {
private static final Logger LOG = Logger.getLogger(ReadClusterDataStage.class.getName());
@@ -67,6 +74,16 @@ public class ReadClusterDataStage extends AbstractBaseStage {
event.addAttribute("ClusterDataCache", cluster);
+ // read contexts (if any)
+ Map<ContextId, ControllerContext> persistedContexts = null;
+ if (cluster != null) {
+ persistedContexts = cluster.getContextMap();
+ } else {
+ persistedContexts = Maps.newHashMap();
+ }
+ ControllerContextProvider contextProvider = new ControllerContextProvider(persistedContexts);
+ event.addAttribute(AttributeName.CONTEXT_PROVIDER.toString(), contextProvider);
+
long endTime = System.currentTimeMillis();
LOG.info("END ReadClusterDataStage.process(). took: " + (endTime - startTime) + " ms");
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 1fdd892..5b75535 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -32,9 +32,8 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelFactoryId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.model.CurrentState;
import org.apache.log4j.Logger;
@@ -72,7 +71,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
resCfgBuilder.bucketSize(resource.getBucketSize());
resCfgBuilder.batchMessageMode(resource.getBatchMessageMode());
resCfgBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
- resCfgBuilder.rebalancerContext(rebalancerCfg.getRebalancerContext(RebalancerContext.class));
+ resCfgBuilder.rebalancerConfig(rebalancerCfg);
resCfgMap.put(resourceId, resCfgBuilder.build());
}
@@ -89,8 +88,8 @@ public class ResourceComputationStage extends AbstractBaseStage {
Map<ResourceId, ResourceConfig.Builder> resCfgBuilderMap =
new HashMap<ResourceId, ResourceConfig.Builder>();
- Map<ResourceId, PartitionedRebalancerContext.Builder> rebCtxBuilderMap =
- new HashMap<ResourceId, PartitionedRebalancerContext.Builder>();
+ Map<ResourceId, PartitionedRebalancerConfig.Builder> rebCtxBuilderMap =
+ new HashMap<ResourceId, PartitionedRebalancerConfig.Builder>();
for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
@@ -98,15 +97,15 @@ public class ResourceComputationStage extends AbstractBaseStage {
if (currentState.getStateModelDefRef() == null) {
LOG.error("state model def is null." + "resource:" + currentState.getResourceId()
- + ", partitions: " + currentState.getPartitionStateMap().keySet()
- + ", states: " + currentState.getPartitionStateMap().values());
+ + ", partitions: " + currentState.getPartitionStateMap().keySet() + ", states: "
+ + currentState.getPartitionStateMap().values());
throw new StageException("State model def is null for resource:"
+ currentState.getResourceId());
}
if (!resCfgBuilderMap.containsKey(resourceId)) {
- PartitionedRebalancerContext.Builder rebCtxBuilder =
- new PartitionedRebalancerContext.Builder(resourceId);
+ PartitionedRebalancerConfig.Builder rebCtxBuilder =
+ new PartitionedRebalancerConfig.Builder(resourceId);
rebCtxBuilder.stateModelDefId(currentState.getStateModelDefId());
rebCtxBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
.getStateModelFactoryName()));
@@ -118,7 +117,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
resCfgBuilderMap.put(resourceId, resCfgBuilder);
}
- PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
+ PartitionedRebalancerConfig.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
for (PartitionId partitionId : currentState.getTypedPartitionStateMap().keySet()) {
rebCtxBuilder.addPartition(new Partition(partitionId));
}
@@ -128,8 +127,8 @@ public class ResourceComputationStage extends AbstractBaseStage {
Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
for (ResourceId resourceId : resCfgBuilderMap.keySet()) {
ResourceConfig.Builder resCfgBuilder = resCfgBuilderMap.get(resourceId);
- PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
- resCfgBuilder.rebalancerContext(rebCtxBuilder.build());
+ PartitionedRebalancerConfig.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
+ resCfgBuilder.rebalancerConfig(rebCtxBuilder.build());
resCfgMap.put(resourceId, resCfgBuilder.build());
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
index f32649f..8c5b863 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
@@ -6,8 +6,8 @@ import org.apache.helix.api.config.NamespacedConfig;
import org.apache.helix.api.config.ResourceConfig.ResourceType;
import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
import com.google.common.base.Enums;
import com.google.common.base.Optional;
@@ -105,11 +105,11 @@ public class ResourceConfiguration extends HelixProperty {
}
/**
- * Get a RebalancerContext if available
- * @return RebalancerContext, or null
+ * Get a RebalancerConfig if available
+ * @return RebalancerConfig, or null
*/
- public RebalancerContext getRebalancerContext(Class<? extends RebalancerContext> clazz) {
- RebalancerConfig config = new RebalancerConfig(this);
- return config.getRebalancerContext(clazz);
+ public RebalancerConfig getRebalancerConfig(Class<? extends RebalancerConfig> clazz) {
+ RebalancerConfigHolder config = new RebalancerConfigHolder(this);
+ return config.getRebalancerConfig(clazz);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
index 98f7cac..ba8958d 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
@@ -64,10 +64,12 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -300,9 +302,9 @@ public class NewClusterSetup {
idealState.setMaxPartitionsPerInstance(maxPartitionsPerNode);
idealState.setStateModelDefId(stateModelDefId);
- RebalancerContext rebalancerCtx = PartitionedRebalancerContext.from(idealState);
+ RebalancerConfig rebalancerCtx = PartitionedRebalancerConfig.from(idealState);
ResourceConfig.Builder builder =
- new ResourceConfig.Builder(resourceId).rebalancerContext(rebalancerCtx).bucketSize(
+ new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalancerCtx).bucketSize(
bucketSize);
ClusterAccessor accessor = clusterAccessor(clusterName);
@@ -407,10 +409,10 @@ public class NewClusterSetup {
new IdealState(
(ZNRecord) (new ZNRecordSerializer().deserialize(readFile(idealStateJsonFile))));
- RebalancerContext rebalancerCtx = PartitionedRebalancerContext.from(idealState);
+ RebalancerConfig rebalancerCtx = PartitionedRebalancerConfig.from(idealState);
ResourceConfig.Builder builder =
- new ResourceConfig.Builder(ResourceId.from(resourceName))
- .rebalancerContext(rebalancerCtx).bucketSize(idealState.getBucketSize());
+ new ResourceConfig.Builder(ResourceId.from(resourceName)).rebalancerConfig(rebalancerCtx)
+ .bucketSize(idealState.getBucketSize());
ClusterAccessor accessor = clusterAccessor(clusterName);
accessor.addResourceToCluster(builder.build());
@@ -459,23 +461,25 @@ public class NewClusterSetup {
StringBuilder sb = new StringBuilder();
Map<ParticipantId, State> stateMap = resource.getExternalView().getStateMap(partitionId);
sb.append(resourceName + "/" + partitionName + ", externalView: " + stateMap);
- PartitionedRebalancerContext partitionedContext =
- resource.getRebalancerConfig().getRebalancerContext(PartitionedRebalancerContext.class);
- if (partitionedContext != null) {
+ PartitionedRebalancerConfig partitionedConfig =
+ PartitionedRebalancerConfig.from(resource.getRebalancerConfig());
+ if (partitionedConfig != null) {
// for partitioned contexts, check the mode and apply mode-specific information if possible
- if (partitionedContext.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
- SemiAutoRebalancerContext semiAutoContext =
- resource.getRebalancerConfig().getRebalancerContext(SemiAutoRebalancerContext.class);
- sb.append(", preferenceList: " + semiAutoContext.getPreferenceList(partitionId));
- } else if (partitionedContext.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
- CustomRebalancerContext customContext =
- resource.getRebalancerConfig().getRebalancerContext(CustomRebalancerContext.class);
- sb.append(", preferenceMap: " + customContext.getPreferenceMap(partitionId));
+ if (partitionedConfig.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+ SemiAutoRebalancerConfig semiAutoConfig =
+ BasicRebalancerConfig.convert(resource.getRebalancerConfig(),
+ SemiAutoRebalancerConfig.class);
+ sb.append(", preferenceList: " + semiAutoConfig.getPreferenceList(partitionId));
+ } else if (partitionedConfig.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+ CustomRebalancerConfig customConfig =
+ BasicRebalancerConfig.convert(resource.getRebalancerConfig(),
+ CustomRebalancerConfig.class);
+ sb.append(", preferenceMap: " + customConfig.getPreferenceMap(partitionId));
}
- if (partitionedContext.anyLiveParticipant()) {
- sb.append(", anyLiveParticipant: " + partitionedContext.anyLiveParticipant());
+ if (partitionedConfig.anyLiveParticipant()) {
+ sb.append(", anyLiveParticipant: " + partitionedConfig.anyLiveParticipant());
} else {
- sb.append(", replicaCount: " + partitionedContext.getReplicaCount());
+ sb.append(", replicaCount: " + partitionedConfig.getReplicaCount());
}
}
@@ -750,12 +754,13 @@ public class NewClusterSetup {
ResourceAccessor accessor = resourceAccessor(clusterName);
ResourceId resourceId = ResourceId.from(resourceName);
Resource resource = accessor.readResource(resourceId);
+ RebalancerConfigHolder holder = new RebalancerConfigHolder(resource.getRebalancerConfig());
StringBuilder sb =
new StringBuilder("Resource ").append(resourceName).append(" in cluster ")
.append(clusterName).append(":\n").append("externalView: ")
.append(resource.getExternalView()).append(", userConfig: ")
- .append(resource.getUserConfig()).append(", rebalancerContext: ")
- .append(resource.getRebalancerConfig().getSerializedContext());
+ .append(resource.getUserConfig()).append(", rebalancerConfig: ")
+ .append(holder.getSerializedConfig());
System.out.println(sb.toString());
}
@@ -956,17 +961,18 @@ public class NewClusterSetup {
private void expandResource(ClusterId clusterId, ResourceId resourceId) {
ResourceAccessor accessor = resourceAccessor(clusterId.stringify());
Resource resource = accessor.readResource(resourceId);
- SemiAutoRebalancerContext context =
- resource.getRebalancerConfig().getRebalancerContext(SemiAutoRebalancerContext.class);
- if (context == null) {
+ SemiAutoRebalancerConfig config =
+ BasicRebalancerConfig.convert(resource.getRebalancerConfig(),
+ SemiAutoRebalancerConfig.class);
+ if (config == null) {
LOG.info("Only SEMI_AUTO mode supported for resource expansion");
return;
}
- if (context.anyLiveParticipant()) {
+ if (config.anyLiveParticipant()) {
LOG.info("Resource uses ANY_LIVE_PARTICIPANT, skipping default assignment");
return;
}
- if (context.getPreferenceLists().size() == 0) {
+ if (config.getPreferenceLists().size() == 0) {
LOG.info("No preference lists have been set yet, skipping default assignment");
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index 0d597f6..ab4ead0 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -33,11 +33,12 @@ import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
@@ -99,8 +100,8 @@ public class TestNewStages extends ZkUnitTestBase {
ResourceId resourceId = ResourceId.from("TestDB0");
Assert.assertTrue(resourceMap.containsKey(resourceId));
Resource resource = resourceMap.get(resourceId);
- Assert.assertNotNull(resource.getRebalancerConfig().getRebalancerContext(
- SemiAutoRebalancerContext.class));
+ Assert.assertNotNull(BasicRebalancerConfig.convert(resource.getRebalancerConfig(),
+ SemiAutoRebalancerConfig.class));
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}
@@ -163,8 +164,12 @@ public class TestNewStages extends ZkUnitTestBase {
Resource resource = cluster.getResource(resourceId);
ResourceCurrentState currentStateOutput = new ResourceCurrentState();
ResourceAssignment semiAutoResult =
- resource.getRebalancerConfig().getRebalancer()
- .computeResourceMapping(resource.getRebalancerConfig(), cluster, currentStateOutput);
+ resource
+ .getRebalancerConfig()
+ .getRebalancerRef()
+ .getRebalancer()
+ .computeResourceMapping(resource.getRebalancerConfig(), null, cluster,
+ currentStateOutput);
verifySemiAutoRebalance(resource, semiAutoResult);
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
@@ -177,8 +182,9 @@ public class TestNewStages extends ZkUnitTestBase {
*/
private void verifySemiAutoRebalance(Resource resource, ResourceAssignment assignment) {
Assert.assertEquals(assignment.getMappedPartitionIds().size(), resource.getSubUnitSet().size());
- SemiAutoRebalancerContext context =
- resource.getRebalancerConfig().getRebalancerContext(SemiAutoRebalancerContext.class);
+ SemiAutoRebalancerConfig context =
+ BasicRebalancerConfig.convert(resource.getRebalancerConfig(),
+ SemiAutoRebalancerConfig.class);
for (PartitionId partitionId : assignment.getMappedPartitionIds()) {
List<ParticipantId> preferenceList = context.getPreferenceList(partitionId);
Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java b/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
index 74781cd..3c8fb2c 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
@@ -9,8 +9,9 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -91,29 +92,29 @@ public class TestUpdateConfig {
// message mode
UserConfig userConfig = new UserConfig(Scope.resource(resourceId));
userConfig.setSimpleField("key1", "value1");
- SemiAutoRebalancerContext rebalancerContext =
- new SemiAutoRebalancerContext.Builder(resourceId).build();
+ SemiAutoRebalancerConfig rebalancerContext =
+ new SemiAutoRebalancerConfig.Builder(resourceId).build();
ResourceConfig config =
new ResourceConfig.Builder(resourceId).userConfig(userConfig)
- .rebalancerContext(rebalancerContext).bucketSize(OLD_BUCKET_SIZE)
- .batchMessageMode(true).build();
+ .rebalancerConfig(rebalancerContext).bucketSize(OLD_BUCKET_SIZE).batchMessageMode(true)
+ .build();
// update: overwrite user config, change to full auto rebalancer context, and change the bucket
// size
UserConfig newUserConfig = new UserConfig(Scope.resource(resourceId));
newUserConfig.setSimpleField("key2", "value2");
- FullAutoRebalancerContext newRebalancerContext =
- new FullAutoRebalancerContext.Builder(resourceId).build();
+ FullAutoRebalancerConfig newRebalancerContext =
+ new FullAutoRebalancerConfig.Builder(resourceId).build();
ResourceConfig updated =
new ResourceConfig.Delta(resourceId).setBucketSize(NEW_BUCKET_SIZE)
- .setUserConfig(newUserConfig).setRebalancerContext(newRebalancerContext)
+ .setUserConfig(newUserConfig).setRebalancerConfig(newRebalancerContext)
.mergeInto(config);
Assert.assertEquals(updated.getBucketSize(), NEW_BUCKET_SIZE);
Assert.assertTrue(updated.getBatchMessageMode());
- Assert.assertNull(updated.getRebalancerConfig().getRebalancerContext(
- SemiAutoRebalancerContext.class));
- Assert.assertNotNull(updated.getRebalancerConfig().getRebalancerContext(
- FullAutoRebalancerContext.class));
+ Assert.assertNull(BasicRebalancerConfig.convert(updated.getRebalancerConfig(),
+ SemiAutoRebalancerConfig.class));
+ Assert.assertNotNull(BasicRebalancerConfig.convert(updated.getRebalancerConfig(),
+ FullAutoRebalancerConfig.class));
Assert.assertNull(updated.getUserConfig().getSimpleField("key1"));
Assert.assertEquals(updated.getUserConfig().getSimpleField("key2"), "value2");
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
index 5bbe54f..94deaac 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
@@ -8,6 +8,9 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
import org.apache.helix.model.ResourceConfiguration;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -41,7 +44,7 @@ public class TestSerializeRebalancerContext {
@Test
public void basicTest() {
// populate a context
- CustomRebalancerContext context = new CustomRebalancerContext();
+ CustomRebalancerConfig context = new CustomRebalancerConfig();
context.setAnyLiveParticipant(false);
context.setMaxPartitionsPerParticipant(Integer.MAX_VALUE);
Map<PartitionId, Partition> partitionMap = Maps.newHashMap();
@@ -61,9 +64,8 @@ public class TestSerializeRebalancerContext {
context.setResourceId(resourceId);
// serialize and deserialize by wrapping in a config
- RebalancerConfig config = new RebalancerConfig(context);
- CustomRebalancerContext deserialized =
- config.getRebalancerContext(CustomRebalancerContext.class);
+ RebalancerConfigHolder config = new RebalancerConfigHolder(context);
+ CustomRebalancerConfig deserialized = config.getRebalancerConfig(CustomRebalancerConfig.class);
// check to make sure that the two objects contain the same data
Assert.assertNotNull(deserialized);
@@ -79,9 +81,9 @@ public class TestSerializeRebalancerContext {
// wrap in a physical config and then unwrap it
ResourceConfiguration physicalConfig = new ResourceConfiguration(resourceId);
physicalConfig.addNamespacedConfig(config.toNamespacedConfig());
- RebalancerConfig extractedConfig = new RebalancerConfig(physicalConfig);
- CustomRebalancerContext extractedContext =
- extractedConfig.getRebalancerContext(CustomRebalancerContext.class);
+ RebalancerConfigHolder extractedConfig = new RebalancerConfigHolder(physicalConfig);
+ CustomRebalancerConfig extractedContext =
+ extractedConfig.getRebalancerConfig(CustomRebalancerConfig.class);
// make sure the unwrapped data hasn't changed
Assert.assertNotNull(extractedContext);
@@ -95,8 +97,8 @@ public class TestSerializeRebalancerContext {
Assert.assertEquals(extractedContext.getResourceId(), context.getResourceId());
// make sure that it's legal to use a base rebalancer context
- RebalancerContext rebalancerContext =
- extractedConfig.getRebalancerContext(RebalancerContext.class);
+ RebalancerConfig rebalancerContext =
+ extractedConfig.getRebalancerConfig(RebalancerConfig.class);
Assert.assertNotNull(rebalancerContext);
Assert.assertEquals(rebalancerContext.getResourceId(), context.getResourceId());
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index e53530c..22c4168 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -41,8 +41,8 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.InstanceConfig;
@@ -169,7 +169,7 @@ public class BaseStageTest {
Map<ResourceId, ResourceConfig> resourceMap = new HashMap<ResourceId, ResourceConfig>();
for (IdealState idealState : idealStates) {
ResourceId resourceId = idealState.getResourceId();
- RebalancerContext context = PartitionedRebalancerContext.from(idealState);
+ RebalancerConfig context = PartitionedRebalancerConfig.from(idealState);
Resource resource =
new Resource(resourceId, ResourceType.DATA, idealState, null, null, context,
new UserConfig(Scope.resource(resourceId)), idealState.getBucketSize(),
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
index e320011..90ea393 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
@@ -33,7 +33,6 @@ import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
@@ -79,8 +78,7 @@ public class TestResourceComputationStage extends BaseStageTest {
AssertJUnit.assertEquals(resource.values().iterator().next().getId(),
ResourceId.from(resourceName));
AssertJUnit.assertEquals(resource.values().iterator().next().getRebalancerConfig()
- .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
- idealState.getStateModelDefId());
+ .getStateModelDefId(), idealState.getStateModelDefId());
AssertJUnit
.assertEquals(resource.values().iterator().next().getSubUnitSet().size(), partitions);
}
@@ -107,8 +105,7 @@ public class TestResourceComputationStage extends BaseStageTest {
AssertJUnit.assertTrue(resourceMap.containsKey(resourceId));
AssertJUnit.assertEquals(resourceMap.get(resourceId).getId(), resourceId);
AssertJUnit.assertEquals(resourceMap.get(resourceId).getRebalancerConfig()
- .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
- idealState.getStateModelDefId());
+ .getStateModelDefId(), idealState.getStateModelDefId());
AssertJUnit.assertEquals(resourceMap.get(resourceId).getSubUnitSet().size(),
idealState.getNumPartitions());
}
@@ -182,8 +179,7 @@ public class TestResourceComputationStage extends BaseStageTest {
AssertJUnit.assertTrue(resourceMap.containsKey(resourceId));
AssertJUnit.assertEquals(resourceMap.get(resourceId).getId(), resourceId);
AssertJUnit.assertEquals(resourceMap.get(resourceId).getRebalancerConfig()
- .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
- idealState.getStateModelDefId());
+ .getStateModelDefId(), idealState.getStateModelDefId());
AssertJUnit.assertEquals(resourceMap.get(resourceId).getSubUnitSet().size(),
idealState.getNumPartitions());
}
@@ -192,8 +188,7 @@ public class TestResourceComputationStage extends BaseStageTest {
AssertJUnit.assertTrue(resourceMap.containsKey(oldResourceId));
AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getId(), oldResourceId);
AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getRebalancerConfig()
- .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
- currentState.getStateModelDefId());
+ .getStateModelDefId(), currentState.getStateModelDefId());
AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getSubUnitSet().size(), currentState
.getTypedPartitionStateMap().size());
AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getSubUnit(
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index ce26e2e..11805fe 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -30,11 +30,15 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.State;
+import org.apache.helix.api.id.ContextId;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.controller.context.BasicControllerContext;
+import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.controller.context.ControllerContextProvider;
import org.apache.helix.controller.rebalancer.HelixRebalancer;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
-import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -60,22 +64,23 @@ public class TestCustomizedIdealStateRebalancer extends
public static class TestRebalancer implements HelixRebalancer {
+ private ControllerContextProvider _contextProvider;
+
/**
* Very basic mapping that evenly assigns one replica of each partition to live nodes, each of
* which is in the highest-priority state.
*/
@Override
- public ResourceAssignment computeResourceMapping(RebalancerConfig config, Cluster cluster,
- ResourceCurrentState currentState) {
- PartitionedRebalancerContext context =
- config.getRebalancerContext(PartitionedRebalancerContext.class);
+ public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+ ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+ PartitionedRebalancerConfig config = PartitionedRebalancerConfig.from(rebalancerConfig);
StateModelDefinition stateModelDef =
- cluster.getStateModelMap().get(context.getStateModelDefId());
+ cluster.getStateModelMap().get(config.getStateModelDefId());
List<ParticipantId> liveParticipants =
new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
- ResourceAssignment resourceMapping = new ResourceAssignment(context.getResourceId());
+ ResourceAssignment resourceMapping = new ResourceAssignment(config.getResourceId());
int i = 0;
- for (PartitionId partitionId : context.getPartitionSet()) {
+ for (PartitionId partitionId : config.getPartitionSet()) {
int nodeIndex = i % liveParticipants.size();
Map<ParticipantId, State> replicaMap = new HashMap<ParticipantId, State>();
replicaMap.put(liveParticipants.get(nodeIndex), stateModelDef.getTypedStatesPriorityList()
@@ -84,12 +89,17 @@ public class TestCustomizedIdealStateRebalancer extends
i++;
}
testRebalancerInvoked = true;
+
+ // set some basic context
+ ContextId contextId = ContextId.from(config.getResourceId().stringify());
+ _contextProvider.putControllerContext(contextId, new BasicControllerContext(contextId));
return resourceMapping;
}
@Override
- public void init(HelixManager helixManager) {
+ public void init(HelixManager helixManager, ControllerContextProvider contextProvider) {
testRebalancerCreated = true;
+ _contextProvider = contextProvider;
}
}
@@ -124,6 +134,11 @@ public class TestCustomizedIdealStateRebalancer extends
}
Assert.assertTrue(testRebalancerCreated);
Assert.assertTrue(testRebalancerInvoked);
+
+ // check that context can be extracted
+ ControllerContextHolder holder = accessor.getProperty(keyBuilder.controllerContext(db2));
+ Assert.assertNotNull(holder);
+ Assert.assertNotNull(holder.getContext());
}
public static class ExternalViewBalancedVerifier implements ZkVerifier {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
index 29e0a44..b415393 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
@@ -42,8 +42,8 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
import org.apache.helix.manager.zk.ZkHelixConnection;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.Message;
@@ -122,13 +122,13 @@ public class TestHelixConnection extends ZkUnitTestBase {
.addTransition(slave, offline, 4).addTransition(slave, master, 2)
.addTransition(master, slave, 1).addTransition(offline, dropped).initialState(offline)
.upperBound(master, 1).dynamicUpperBound(slave, "R").build();
- RebalancerContext rebalancerCtx =
- new SemiAutoRebalancerContext.Builder(resourceId).addPartitions(1).replicaCount(1)
+ RebalancerConfig rebalancerCtx =
+ new SemiAutoRebalancerConfig.Builder(resourceId).addPartitions(1).replicaCount(1)
.stateModelDefId(stateModelDefId)
.preferenceList(PartitionId.from("testDB_0"), Arrays.asList(participantId)).build();
clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId).addStateModelDefinition(
stateModelDef).build());
- clusterAccessor.addResourceToCluster(new ResourceConfig.Builder(resourceId).rebalancerContext(
+ clusterAccessor.addResourceToCluster(new ResourceConfig.Builder(resourceId).rebalancerConfig(
rebalancerCtx).build());
clusterAccessor.addParticipantToCluster(new ParticipantConfig.Builder(participantId).build());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
index c233417..880d31c 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
@@ -23,7 +23,7 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
+import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
import org.apache.helix.manager.zk.ZkHelixConnection;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.Message;
@@ -219,10 +219,10 @@ public class LogicalModelExample {
Partition partition2 = new Partition(PartitionId.from(resourceId, "2"));
// specify the rebalancer configuration
- // this resource will be rebalanced in FULL_AUTO mode, so use the FullAutoRebalancerContext
+ // this resource will be rebalanced in FULL_AUTO mode, so use the FullAutoRebalancerConfig
// builder
- FullAutoRebalancerContext.Builder rebalanceContextBuilder =
- new FullAutoRebalancerContext.Builder(resourceId).replicaCount(1).addPartition(partition1)
+ FullAutoRebalancerConfig.Builder rebalanceConfigBuilder =
+ new FullAutoRebalancerConfig.Builder(resourceId).replicaCount(1).addPartition(partition1)
.addPartition(partition2).stateModelDefId(stateModelDef.getStateModelDefId());
// create (optional) user-defined configuration properties for the resource
@@ -231,7 +231,7 @@ public class LogicalModelExample {
// create the configuration for a new resource
ResourceConfig.Builder resourceBuilder =
- new ResourceConfig.Builder(resourceId).rebalancerContext(rebalanceContextBuilder.build())
+ new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalanceConfigBuilder.build())
.userConfig(userConfig);
return resourceBuilder.build();
}
[3/4] [HELIX-327] Simplify rebalancer, rename rebalancer configs,
support settable contexts, rb=15981
Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
new file mode 100644
index 0000000..73c3ccc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
@@ -0,0 +1,164 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.CustomRebalancer;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerConfig for a resource that should be rebalanced in CUSTOMIZED mode. By default, it
+ * corresponds to {@link CustomRebalancer}
+ */
+public class CustomRebalancerConfig extends PartitionedRebalancerConfig {
+ private Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
+
+ /**
+ * Instantiate a CustomRebalancerConfig
+ */
+ public CustomRebalancerConfig() {
+ setRebalanceMode(RebalanceMode.CUSTOMIZED);
+ setRebalancerRef(RebalancerRef.from(CustomRebalancer.class));
+ _preferenceMaps = Maps.newHashMap();
+ }
+
+ /**
+ * Get the preference maps of the partitions and replicas of the resource
+ * @return map of partition to participant and state
+ */
+ public Map<PartitionId, Map<ParticipantId, State>> getPreferenceMaps() {
+ return _preferenceMaps;
+ }
+
+ /**
+ * Set the preference maps of the partitions and replicas of the resource
+ * @param preferenceMaps map of partition to participant and state
+ */
+ public void setPreferenceMaps(Map<PartitionId, Map<ParticipantId, State>> preferenceMaps) {
+ _preferenceMaps = preferenceMaps;
+ }
+
+ /**
+ * Get the preference map of a partition
+ * @param partitionId the partition to look up
+ * @return map of participant to state
+ */
+ @JsonIgnore
+ public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
+ return _preferenceMaps.get(partitionId);
+ }
+
+ /**
+ * Generate preference maps based on a default cluster setup
+ * @param stateModelDef the state model definition to follow
+ * @param participantSet the set of participant ids to configure for
+ */
+ @Override
+ @JsonIgnore
+ public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+ Set<ParticipantId> participantSet) {
+ // compute default upper bounds
+ Map<State, String> upperBounds = Maps.newHashMap();
+ for (State state : stateModelDef.getTypedStatesPriorityList()) {
+ upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
+ }
+
+ // determine the current mapping
+ Map<PartitionId, Map<ParticipantId, State>> currentMapping = getPreferenceMaps();
+
+ // determine the preference maps
+ LinkedHashMap<State, Integer> stateCounts =
+ ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
+ getReplicaCount());
+ ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+ List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
+ List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
+ AutoRebalanceStrategy strategy =
+ new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
+ getMaxPartitionsPerParticipant(), placementScheme);
+ Map<String, Map<String, String>> rawPreferenceMaps =
+ strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
+ .getMapFields();
+ Map<PartitionId, Map<ParticipantId, State>> preferenceMaps =
+ Maps.newHashMap(ResourceAssignment.replicaMapsFromStringMaps(rawPreferenceMaps));
+ setPreferenceMaps(preferenceMaps);
+ }
+
+ /**
+ * Build a CustomRebalancerConfig. By default, it corresponds to {@link CustomRebalancer}
+ */
+ public static final class Builder extends PartitionedRebalancerConfig.AbstractBuilder<Builder> {
+ private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
+
+ /**
+ * Instantiate for a resource
+ * @param resourceId resource id
+ */
+ public Builder(ResourceId resourceId) {
+ super(resourceId);
+ super.rebalancerRef(RebalancerRef.from(CustomRebalancer.class));
+ super.rebalanceMode(RebalanceMode.CUSTOMIZED);
+ _preferenceMaps = Maps.newHashMap();
+ }
+
+ /**
+ * Add a preference map for a partition
+ * @param partitionId partition to set
+ * @param preferenceList map of participant id to state indicating where replicas are served
+ * @return Builder
+ */
+ public Builder preferenceMap(PartitionId partitionId, Map<ParticipantId, State> preferenceMap) {
+ _preferenceMaps.put(partitionId, preferenceMap);
+ return self();
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+
+ @Override
+ public CustomRebalancerConfig build() {
+ CustomRebalancerConfig config = new CustomRebalancerConfig();
+ super.update(config);
+ config.setPreferenceMaps(_preferenceMaps);
+ return config;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
new file mode 100644
index 0000000..828d509
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
@@ -0,0 +1,64 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.model.IdealState.RebalanceMode;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerConfig for FULL_AUTO rebalancing mode. By default, it corresponds to
+ * {@link FullAutoRebalancer}
+ */
+public class FullAutoRebalancerConfig extends PartitionedRebalancerConfig {
+ public FullAutoRebalancerConfig() {
+ setRebalanceMode(RebalanceMode.FULL_AUTO);
+ setRebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
+ }
+
+ /**
+ * Builder for a full auto rebalancer config. By default, it corresponds to
+ * {@link FullAutoRebalancer}
+ */
+ public static final class Builder extends PartitionedRebalancerConfig.AbstractBuilder<Builder> {
+ /**
+ * Instantiate with a resource
+ * @param resourceId resource id
+ */
+ public Builder(ResourceId resourceId) {
+ super(resourceId);
+ super.rebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
+ super.rebalanceMode(RebalanceMode.FULL_AUTO);
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+
+ @Override
+ public FullAutoRebalancerConfig build() {
+ FullAutoRebalancerConfig config = new FullAutoRebalancerConfig();
+ super.update(config);
+ return config;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
new file mode 100644
index 0000000..2c9769d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
@@ -0,0 +1,405 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixConstants.StateModelToken;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerConfig for a resource whose subunits are partitions. In addition, these partitions can
+ * be replicated.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class PartitionedRebalancerConfig extends BasicRebalancerConfig implements
+ ReplicatedRebalancerConfig {
+ private Map<PartitionId, Partition> _partitionMap;
+ private boolean _anyLiveParticipant;
+ private int _replicaCount;
+ private int _maxPartitionsPerParticipant;
+ private RebalanceMode _rebalanceMode;
+
+ /**
+ * Instantiate a PartitionedRebalancerConfig
+ */
+ public PartitionedRebalancerConfig() {
+ _partitionMap = Collections.emptyMap();
+ _replicaCount = 1;
+ _anyLiveParticipant = false;
+ _maxPartitionsPerParticipant = Integer.MAX_VALUE;
+ _rebalanceMode = RebalanceMode.USER_DEFINED;
+ }
+
+ /**
+ * Get a map from partition id to partition
+ * @return partition map (mutable)
+ */
+ public Map<PartitionId, Partition> getPartitionMap() {
+ return _partitionMap;
+ }
+
+ /**
+ * Set a map of partition id to partition
+ * @param partitionMap partition map
+ */
+ public void setPartitionMap(Map<PartitionId, Partition> partitionMap) {
+ _partitionMap = Maps.newHashMap(partitionMap);
+ }
+
+ /**
+ * Get the set of partitions for this resource
+ * @return set of partition ids
+ */
+ @JsonIgnore
+ public Set<PartitionId> getPartitionSet() {
+ return _partitionMap.keySet();
+ }
+
+ /**
+ * Get a partition
+ * @param partitionId id of the partition to get
+ * @return Partition object, or null if not present
+ */
+ @JsonIgnore
+ public Partition getPartition(PartitionId partitionId) {
+ return _partitionMap.get(partitionId);
+ }
+
+ @Override
+ public boolean anyLiveParticipant() {
+ return _anyLiveParticipant;
+ }
+
+ /**
+ * Indicate if this resource should be assigned to any live participant
+ * @param anyLiveParticipant true if any live participant expected, false otherwise
+ */
+ public void setAnyLiveParticipant(boolean anyLiveParticipant) {
+ _anyLiveParticipant = anyLiveParticipant;
+ }
+
+ @Override
+ public int getReplicaCount() {
+ return _replicaCount;
+ }
+
+ /**
+ * Set the number of replicas that each partition should have
+ * @param replicaCount
+ */
+ public void setReplicaCount(int replicaCount) {
+ _replicaCount = replicaCount;
+ }
+
+ /**
+ * Get the maximum number of partitions that a participant can serve
+ * @return maximum number of partitions per participant
+ */
+ public int getMaxPartitionsPerParticipant() {
+ return _maxPartitionsPerParticipant;
+ }
+
+ /**
+ * Set the maximum number of partitions that a participant can serve
+ * @param maxPartitionsPerParticipant maximum number of partitions per participant
+ */
+ public void setMaxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
+ _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
+ }
+
+ /**
+ * Set the rebalancer mode of the partitioned resource
+ * @param rebalanceMode {@link RebalanceMode} enum value
+ */
+ public void setRebalanceMode(RebalanceMode rebalanceMode) {
+ _rebalanceMode = rebalanceMode;
+ }
+
+ /**
+ * Get the rebalancer mode of the resource
+ * @return RebalanceMode
+ */
+ public RebalanceMode getRebalanceMode() {
+ return _rebalanceMode;
+ }
+
+ @Override
+ @JsonIgnore
+ public Map<PartitionId, Partition> getSubUnitMap() {
+ return getPartitionMap();
+ }
+
+ /**
+ * Generate a default configuration given the state model and a participant.
+ * @param stateModelDef the state model definition to follow
+ * @param participantSet the set of participant ids to configure for
+ */
+ @JsonIgnore
+ public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+ Set<ParticipantId> participantSet) {
+ // the base config does not understand enough to know do to anything
+ }
+
+ /**
+ * Safely get a {@link PartitionedRebalancerConfig} from a {@link RebalancerConfig}
+ * @param config the base config
+ * @return a {@link PartitionedRebalancerConfig}, or null if the conversion is not possible
+ */
+ public static PartitionedRebalancerConfig from(RebalancerConfig config) {
+ try {
+ return PartitionedRebalancerConfig.class.cast(config);
+ } catch (ClassCastException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Convert a physically-stored IdealState into a rebalancer config for a partitioned resource
+ * @param idealState populated IdealState
+ * @return PartitionedRebalancerConfig
+ */
+ public static PartitionedRebalancerConfig from(IdealState idealState) {
+ PartitionedRebalancerConfig config;
+ switch (idealState.getRebalanceMode()) {
+ case FULL_AUTO:
+ FullAutoRebalancerConfig.Builder fullAutoBuilder =
+ new FullAutoRebalancerConfig.Builder(idealState.getResourceId());
+ populateConfig(fullAutoBuilder, idealState);
+ config = fullAutoBuilder.build();
+ break;
+ case SEMI_AUTO:
+ SemiAutoRebalancerConfig.Builder semiAutoBuilder =
+ new SemiAutoRebalancerConfig.Builder(idealState.getResourceId());
+ for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+ semiAutoBuilder.preferenceList(partitionId, idealState.getPreferenceList(partitionId));
+ }
+ populateConfig(semiAutoBuilder, idealState);
+ config = semiAutoBuilder.build();
+ break;
+ case CUSTOMIZED:
+ CustomRebalancerConfig.Builder customBuilder =
+ new CustomRebalancerConfig.Builder(idealState.getResourceId());
+ for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+ customBuilder.preferenceMap(partitionId, idealState.getParticipantStateMap(partitionId));
+ }
+ populateConfig(customBuilder, idealState);
+ config = customBuilder.build();
+ break;
+ default:
+ Builder baseBuilder = new Builder(idealState.getResourceId());
+ populateConfig(baseBuilder, idealState);
+ config = baseBuilder.build();
+ break;
+ }
+ return config;
+ }
+
+ /**
+ * Update a builder subclass with all the fields of the ideal state
+ * @param builder builder that extends AbstractBuilder
+ * @param idealState populated IdealState
+ */
+ private static <T extends AbstractBuilder<T>> void populateConfig(T builder, IdealState idealState) {
+ String replicas = idealState.getReplicas();
+ int replicaCount = 0;
+ boolean anyLiveParticipant = false;
+ if (replicas.equals(StateModelToken.ANY_LIVEINSTANCE.toString())) {
+ anyLiveParticipant = true;
+ } else {
+ replicaCount = Integer.parseInt(replicas);
+ }
+ if (idealState.getNumPartitions() > 0 && idealState.getPartitionIdSet().size() == 0) {
+ // backwards compatibility: partition sets were based on pref lists/maps previously
+ builder.addPartitions(idealState.getNumPartitions());
+ } else {
+ for (PartitionId partitionId : idealState.getPartitionIdSet()) {
+ builder.addPartition(new Partition(partitionId));
+ }
+ }
+ builder.anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
+ .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
+ .participantGroupTag(idealState.getInstanceGroupTag())
+ .stateModelDefId(idealState.getStateModelDefId())
+ .stateModelFactoryId(idealState.getStateModelFactoryId());
+ RebalancerRef rebalancerRef = idealState.getRebalancerRef();
+ if (rebalancerRef != null) {
+ builder.rebalancerRef(rebalancerRef);
+ }
+ }
+
+ /**
+ * Builder for a basic data rebalancer config
+ */
+ public static final class Builder extends AbstractBuilder<Builder> {
+ /**
+ * Instantiate with a resource
+ * @param resourceId resource id
+ */
+ public Builder(ResourceId resourceId) {
+ super(resourceId);
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+
+ @Override
+ public PartitionedRebalancerConfig build() {
+ PartitionedRebalancerConfig config = new PartitionedRebalancerConfig();
+ super.update(config);
+ return config;
+ }
+ }
+
+ /**
+ * Abstract builder for a generic partitioned resource rebalancer config
+ */
+ public static abstract class AbstractBuilder<T extends BasicRebalancerConfig.AbstractBuilder<T>>
+ extends BasicRebalancerConfig.AbstractBuilder<T> {
+ private final ResourceId _resourceId;
+ private final Map<PartitionId, Partition> _partitionMap;
+ private RebalanceMode _rebalanceMode;
+ private boolean _anyLiveParticipant;
+ private int _replicaCount;
+ private int _maxPartitionsPerParticipant;
+
+ /**
+ * Instantiate with a resource
+ * @param resourceId resource id
+ */
+ public AbstractBuilder(ResourceId resourceId) {
+ super(resourceId);
+ _resourceId = resourceId;
+ _partitionMap = Maps.newHashMap();
+ _rebalanceMode = RebalanceMode.USER_DEFINED;
+ _anyLiveParticipant = false;
+ _replicaCount = 1;
+ _maxPartitionsPerParticipant = Integer.MAX_VALUE;
+ }
+
+ /**
+ * Set the rebalance mode for a partitioned rebalancer config
+ * @param rebalanceMode {@link RebalanceMode} enum value
+ * @return Builder
+ */
+ public T rebalanceMode(RebalanceMode rebalanceMode) {
+ _rebalanceMode = rebalanceMode;
+ return self();
+ }
+
+ /**
+ * Add a partition that the resource serves
+ * @param partition fully-qualified partition
+ * @return Builder
+ */
+ public T addPartition(Partition partition) {
+ _partitionMap.put(partition.getId(), partition);
+ return self();
+ }
+
+ /**
+ * Add a collection of partitions
+ * @param partitions any collection of Partition objects
+ * @return Builder
+ */
+ public T addPartitions(Collection<Partition> partitions) {
+ for (Partition partition : partitions) {
+ addPartition(partition);
+ }
+ return self();
+ }
+
+ /**
+ * Add a specified number of partitions with a default naming scheme, namely
+ * resourceId_partitionNumber where partitionNumber starts at 0
+ * @param partitionCount number of partitions to add
+ * @return Builder
+ */
+ public T addPartitions(int partitionCount) {
+ for (int i = 0; i < partitionCount; i++) {
+ addPartition(new Partition(PartitionId.from(_resourceId, Integer.toString(i))));
+ }
+ return self();
+ }
+
+ /**
+ * Set whether any live participant should be used in rebalancing
+ * @param anyLiveParticipant true if any live participant can be used, false otherwise
+ * @return Builder
+ */
+ public T anyLiveParticipant(boolean anyLiveParticipant) {
+ _anyLiveParticipant = anyLiveParticipant;
+ return self();
+ }
+
+ /**
+ * Set the number of replicas
+ * @param replicaCount number of replicas
+ * @return Builder
+ */
+ public T replicaCount(int replicaCount) {
+ _replicaCount = replicaCount;
+ return self();
+ }
+
+ /**
+ * Set the maximum number of partitions to assign to any participant
+ * @param maxPartitionsPerParticipant the maximum
+ * @return Builder
+ */
+ public T maxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
+ _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
+ return self();
+ }
+
+ /**
+ * Update a PartitionedRebalancerConfig with fields from this builder level
+ * @param config PartitionedRebalancerConfig
+ */
+ protected final void update(PartitionedRebalancerConfig config) {
+ super.update(config);
+ // enforce at least one partition
+ if (_partitionMap.isEmpty()) {
+ addPartitions(1);
+ }
+ config.setRebalanceMode(_rebalanceMode);
+ config.setPartitionMap(_partitionMap);
+ config.setAnyLiveParticipant(_anyLiveParticipant);
+ config.setMaxPartitionsPerParticipant(_maxPartitionsPerParticipant);
+ config.setReplicaCount(_replicaCount);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java
new file mode 100644
index 0000000..3f8c9d1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java
@@ -0,0 +1,95 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.serializer.StringSerializer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Defines the state available to a rebalancer. The most common use case is to use a
+ * {@link PartitionedRebalancerConfig} or a subclass and set up a resource with it. A rebalancer
+ * configuration, at a minimum, is aware of subunits of a resource, the state model to follow, and
+ * how the configuration should be serialized.
+ */
+public interface RebalancerConfig {
+ /**
+ * Get a map of resource partition identifiers to partitions. A partition is a subunit of a
+ * resource, e.g. a subtask of a task
+ * @return map of (subunit id, subunit) pairs
+ */
+ public Map<? extends PartitionId, ? extends Partition> getSubUnitMap();
+
+ /**
+ * Get the subunits of the resource (e.g. partitions)
+ * @return set of subunit ids
+ */
+ public Set<? extends PartitionId> getSubUnitIdSet();
+
+ /**
+ * Get a specific subunit
+ * @param subUnitId the id of the subunit
+ * @return SubUnit
+ */
+ public Partition getSubUnit(PartitionId subUnitId);
+
+ /**
+ * Get the resource to rebalance
+ * @return resource id
+ */
+ public ResourceId getResourceId();
+
+ /**
+ * Get the state model definition that the resource follows
+ * @return state model definition id
+ */
+ public StateModelDefId getStateModelDefId();
+
+ /**
+ * Get the state model factory of this resource
+ * @return state model factory id
+ */
+ public StateModelFactoryId getStateModelFactoryId();
+
+ /**
+ * Get the tag, if any, that participants must have in order to serve this resource
+ * @return participant group tag, or null
+ */
+ public String getParticipantGroupTag();
+
+ /**
+ * Get the serializer for this config
+ * @return StringSerializer class object
+ */
+ public Class<? extends StringSerializer> getSerializerClass();
+
+ /**
+ * Get a reference to the class used to rebalance this resource
+ * @return RebalancerRef
+ */
+ public RebalancerRef getRebalancerRef();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
new file mode 100644
index 0000000..8581732
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
@@ -0,0 +1,185 @@
+package org.apache.helix.controller.rebalancer.config;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.NamespacedConfig;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.controller.serializer.StringSerializer;
+import org.apache.helix.model.ResourceConfiguration;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Configuration for a resource rebalancer. This contains a RebalancerConfig, which contains
+ * information specific to each rebalancer.
+ */
+public final class RebalancerConfigHolder {
+ private enum Fields {
+ SERIALIZER_CLASS,
+ REBALANCER_CONFIG,
+ REBALANCER_CONFIG_CLASS
+ }
+
+ private static final Logger LOG = Logger.getLogger(RebalancerConfigHolder.class);
+ private StringSerializer _serializer;
+ private HelixRebalancer _rebalancer;
+ private final RebalancerConfig _config;
+ private final NamespacedConfig _backingConfig;
+
+ /**
+ * Instantiate a RebalancerConfig
+ * @param config rebalancer config
+ * @param rebalancerRef reference to the rebalancer class that will be used
+ */
+ public RebalancerConfigHolder(RebalancerConfig config) {
+ _backingConfig =
+ new NamespacedConfig(Scope.resource(config.getResourceId()),
+ RebalancerConfigHolder.class.getSimpleName());
+ _backingConfig.setSimpleField(Fields.SERIALIZER_CLASS.toString(), config.getSerializerClass()
+ .getName());
+ _backingConfig.setSimpleField(Fields.REBALANCER_CONFIG_CLASS.toString(), config.getClass()
+ .getName());
+ _config = config;
+ try {
+ _serializer = config.getSerializerClass().newInstance();
+ _backingConfig.setSimpleField(Fields.REBALANCER_CONFIG.toString(),
+ _serializer.serialize(config));
+ } catch (InstantiationException e) {
+ LOG.error("Error initializing the configuration", e);
+ } catch (IllegalAccessException e) {
+ LOG.error("Error initializing the configuration", e);
+ }
+ }
+
+ /**
+ * Instantiate from a physical ResourceConfiguration
+ * @param resourceConfiguration populated ResourceConfiguration
+ */
+ public RebalancerConfigHolder(ResourceConfiguration resourceConfiguration) {
+ _backingConfig =
+ new NamespacedConfig(resourceConfiguration, RebalancerConfigHolder.class.getSimpleName());
+ _serializer = getSerializer();
+ _config = getConfig();
+ }
+
+ /**
+ * Get the class that can serialize and deserialize the rebalancer config
+ * @return StringSerializer
+ */
+ private StringSerializer getSerializer() {
+ String serializerClassName = _backingConfig.getSimpleField(Fields.SERIALIZER_CLASS.toString());
+ if (serializerClassName != null) {
+ try {
+ return (StringSerializer) HelixUtil.loadClass(getClass(), serializerClassName)
+ .newInstance();
+ } catch (InstantiationException e) {
+ LOG.error("Error getting the serializer", e);
+ } catch (IllegalAccessException e) {
+ LOG.error("Error getting the serializer", e);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Error getting the serializer", e);
+ }
+ }
+ return null;
+ }
+
+ private RebalancerConfig getConfig() {
+ String className = _backingConfig.getSimpleField(Fields.REBALANCER_CONFIG_CLASS.toString());
+ if (className != null) {
+ try {
+ Class<? extends RebalancerConfig> configClass =
+ HelixUtil.loadClass(getClass(), className).asSubclass(RebalancerConfig.class);
+ String serialized = _backingConfig.getSimpleField(Fields.REBALANCER_CONFIG.toString());
+ return _serializer.deserialize(configClass, serialized);
+ } catch (ClassNotFoundException e) {
+ LOG.error(className + " is not a valid class");
+ } catch (ClassCastException e) {
+ LOG.error("Could not convert the persisted data into a " + className, e);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Get a rebalancer class instance
+ * @return Rebalancer
+ */
+ public HelixRebalancer getRebalancer() {
+ // cache the rebalancer to avoid loading and instantiating it excessively
+ if (_rebalancer == null) {
+ if (_config == null || _config.getRebalancerRef() == null) {
+ return null;
+ }
+ _rebalancer = _config.getRebalancerRef().getRebalancer();
+ }
+ return _rebalancer;
+ }
+
+ /**
+ * Get the instantiated RebalancerConfig
+ * @param configClass specific class of the RebalancerConfig
+ * @return RebalancerConfig subclass instance, or null if conversion is not possible
+ */
+ public <T extends RebalancerConfig> T getRebalancerConfig(Class<T> configClass) {
+ if (_config != null) {
+ try {
+ return configClass.cast(_config);
+ } catch (ClassCastException e) {
+ LOG.warn(configClass + " is incompatible with config class: " + _config.getClass());
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Get the rebalancer config serialized as a string
+ * @return string representing the config
+ */
+ public String getSerializedConfig() {
+ return _backingConfig.getSimpleField(Fields.REBALANCER_CONFIG.toString());
+ }
+
+ /**
+ * Convert this to a namespaced config
+ * @return NamespacedConfig
+ */
+ public NamespacedConfig toNamespacedConfig() {
+ return _backingConfig;
+ }
+
+ /**
+ * Get a RebalancerConfig from a physical resource config
+ * @param resourceConfiguration physical resource config
+ * @return RebalancerConfig
+ */
+ public static RebalancerConfigHolder from(ResourceConfiguration resourceConfiguration) {
+ return new RebalancerConfigHolder(resourceConfiguration);
+ }
+
+ /**
+ * Get a RebalancerConfigHolder from a RebalancerConfig
+ * @param config instantiated RebalancerConfig
+ * @return RebalancerConfigHolder
+ */
+ public static RebalancerConfigHolder from(RebalancerConfig config) {
+ return new RebalancerConfigHolder(config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java
new file mode 100644
index 0000000..3118b2a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java
@@ -0,0 +1,40 @@
+package org.apache.helix.controller.rebalancer.config;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Methods specifying a rebalancer config that allows replicas. For instance, a rebalancer config
+ * with partitions may accept state model definitions that support multiple replicas per partition,
+ * and it's possible that the policy is that each live participant in the system should have a
+ * replica.
+ */
+public interface ReplicatedRebalancerConfig extends RebalancerConfig {
+ /**
+ * Check if this resource should be assigned to any live participant
+ * @return true if any live participant expected, false otherwise
+ */
+ public boolean anyLiveParticipant();
+
+ /**
+ * Get the number of replicas that each resource subunit should have
+ * @return replica count
+ */
+ public int getReplicaCount();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
new file mode 100644
index 0000000..bfc3309
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
@@ -0,0 +1,178 @@
+package org.apache.helix.controller.rebalancer.config;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
+import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerConfig for SEMI_AUTO rebalancer mode. It indicates the preferred locations of each
+ * partition replica. By default, it corresponds to {@link SemiAutoRebalancer}
+ */
+public final class SemiAutoRebalancerConfig extends PartitionedRebalancerConfig {
+ @JsonProperty("preferenceLists")
+ private Map<PartitionId, List<ParticipantId>> _preferenceLists;
+
+ /**
+ * Instantiate a SemiAutoRebalancerConfig
+ */
+ public SemiAutoRebalancerConfig() {
+ setRebalanceMode(RebalanceMode.SEMI_AUTO);
+ setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
+ _preferenceLists = Maps.newHashMap();
+ }
+
+ /**
+ * Get the preference lists of all partitions of the resource
+ * @return map of partition id to list of participant ids
+ */
+ public Map<PartitionId, List<ParticipantId>> getPreferenceLists() {
+ return _preferenceLists;
+ }
+
+ /**
+ * Set the preference lists of all partitions of the resource
+ * @param preferenceLists
+ */
+ public void setPreferenceLists(Map<PartitionId, List<ParticipantId>> preferenceLists) {
+ _preferenceLists = preferenceLists;
+ }
+
+ /**
+ * Get the preference list of a partition
+ * @param partitionId the partition to look up
+ * @return list of participant ids
+ */
+ @JsonIgnore
+ public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
+ return _preferenceLists.get(partitionId);
+ }
+
+ /**
+ * Generate preference lists based on a default cluster setup
+ * @param stateModelDef the state model definition to follow
+ * @param participantSet the set of participant ids to configure for
+ */
+ @Override
+ @JsonIgnore
+ public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
+ Set<ParticipantId> participantSet) {
+ // compute default upper bounds
+ Map<State, String> upperBounds = Maps.newHashMap();
+ for (State state : stateModelDef.getTypedStatesPriorityList()) {
+ upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
+ }
+
+ // determine the current mapping
+ Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
+ for (PartitionId partitionId : getPartitionSet()) {
+ List<ParticipantId> preferenceList = getPreferenceList(partitionId);
+ if (preferenceList != null && !preferenceList.isEmpty()) {
+ Set<ParticipantId> disabledParticipants = Collections.emptySet();
+ Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
+ Map<ParticipantId, State> initialMap =
+ ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, participantSet,
+ stateModelDef, preferenceList, emptyCurrentState, disabledParticipants);
+ currentMapping.put(partitionId, initialMap);
+ }
+ }
+
+ // determine the preference
+ LinkedHashMap<State, Integer> stateCounts =
+ ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
+ getReplicaCount());
+ ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
+ List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
+ List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
+ AutoRebalanceStrategy strategy =
+ new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
+ getMaxPartitionsPerParticipant(), placementScheme);
+ Map<String, List<String>> rawPreferenceLists =
+ strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
+ .getListFields();
+ Map<PartitionId, List<ParticipantId>> preferenceLists =
+ Maps.newHashMap(IdealState.preferenceListsFromStringLists(rawPreferenceLists));
+ setPreferenceLists(preferenceLists);
+ }
+
+ /**
+ * Build a SemiAutoRebalancerConfig. By default, it corresponds to {@link SemiAutoRebalancer}
+ */
+ public static final class Builder extends PartitionedRebalancerConfig.AbstractBuilder<Builder> {
+ private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
+
+ /**
+ * Instantiate for a resource
+ * @param resourceId resource id
+ */
+ public Builder(ResourceId resourceId) {
+ super(resourceId);
+ super.rebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
+ super.rebalanceMode(RebalanceMode.SEMI_AUTO);
+ _preferenceLists = Maps.newHashMap();
+ }
+
+ /**
+ * Add a preference list for a partition
+ * @param partitionId partition to set
+ * @param preferenceList ordered list of participants who can serve the partition
+ * @return Builder
+ */
+ public Builder preferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
+ _preferenceLists.put(partitionId, preferenceList);
+ return self();
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+
+ @Override
+ public SemiAutoRebalancerConfig build() {
+ SemiAutoRebalancerConfig config = new SemiAutoRebalancerConfig();
+ super.update(config);
+ config.setPreferenceLists(_preferenceLists);
+ return config;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
deleted file mode 100644
index ec765d7..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/BasicRebalancerContext.java
+++ /dev/null
@@ -1,240 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.Set;
-
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.codehaus.jackson.annotate.JsonIgnore;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Abstract RebalancerContext that functions for generic subunits. Use a subclass that more
- * concretely defines the subunits.
- */
-public abstract class BasicRebalancerContext implements RebalancerContext {
- private ResourceId _resourceId;
- private StateModelDefId _stateModelDefId;
- private StateModelFactoryId _stateModelFactoryId;
- private String _participantGroupTag;
- private Class<? extends ContextSerializer> _serializer;
- private RebalancerRef _rebalancerRef;
-
- /**
- * Instantiate a basic rebalancer context
- */
- public BasicRebalancerContext() {
- _serializer = DefaultContextSerializer.class;
- }
-
- @Override
- public ResourceId getResourceId() {
- return _resourceId;
- }
-
- /**
- * Set the resource to rebalance
- * @param resourceId resource id
- */
- public void setResourceId(ResourceId resourceId) {
- _resourceId = resourceId;
- }
-
- @Override
- public StateModelDefId getStateModelDefId() {
- return _stateModelDefId;
- }
-
- /**
- * Set the state model definition that the resource follows
- * @param stateModelDefId state model definition id
- */
- public void setStateModelDefId(StateModelDefId stateModelDefId) {
- _stateModelDefId = stateModelDefId;
- }
-
- @Override
- public StateModelFactoryId getStateModelFactoryId() {
- return _stateModelFactoryId;
- }
-
- /**
- * Set the state model factory that the resource uses
- * @param stateModelFactoryId state model factory id
- */
- public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
- _stateModelFactoryId = stateModelFactoryId;
- }
-
- @Override
- public String getParticipantGroupTag() {
- return _participantGroupTag;
- }
-
- /**
- * Set a tag that participants must have in order to serve this resource
- * @param participantGroupTag string group tag
- */
- public void setParticipantGroupTag(String participantGroupTag) {
- _participantGroupTag = participantGroupTag;
- }
-
- /**
- * Get the serializer. If none is provided, {@link DefaultContextSerializer} is used
- */
- @Override
- public Class<? extends ContextSerializer> getSerializerClass() {
- return _serializer;
- }
-
- /**
- * Set the class that can serialize this context
- * @param serializer serializer class that implements ContextSerializer
- */
- public void setSerializerClass(Class<? extends ContextSerializer> serializer) {
- _serializer = serializer;
- }
-
- @Override
- @JsonIgnore
- public Set<? extends PartitionId> getSubUnitIdSet() {
- return getSubUnitMap().keySet();
- }
-
- @Override
- @JsonIgnore
- public Partition getSubUnit(PartitionId subUnitId) {
- return getSubUnitMap().get(subUnitId);
- }
-
- @Override
- public RebalancerRef getRebalancerRef() {
- return _rebalancerRef;
- }
-
- /**
- * Set the reference to the class used to rebalance this resource
- * @param rebalancerRef RebalancerRef instance
- */
- public void setRebalancerRef(RebalancerRef rebalancerRef) {
- _rebalancerRef = rebalancerRef;
- }
-
- /**
- * Abstract builder for the base rebalancer context
- */
- public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
- private final ResourceId _resourceId;
- private StateModelDefId _stateModelDefId;
- private StateModelFactoryId _stateModelFactoryId;
- private String _participantGroupTag;
- private Class<? extends ContextSerializer> _serializerClass;
- private RebalancerRef _rebalancerRef;
-
- /**
- * Instantiate with a resource id
- * @param resourceId resource id
- */
- public AbstractBuilder(ResourceId resourceId) {
- _resourceId = resourceId;
- _serializerClass = DefaultContextSerializer.class;
- }
-
- /**
- * Set the state model definition that the resource should follow
- * @param stateModelDefId state model definition id
- * @return Builder
- */
- public T stateModelDefId(StateModelDefId stateModelDefId) {
- _stateModelDefId = stateModelDefId;
- return self();
- }
-
- /**
- * Set the state model factory that the resource should use
- * @param stateModelFactoryId state model factory id
- * @return Builder
- */
- public T stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
- _stateModelFactoryId = stateModelFactoryId;
- return self();
- }
-
- /**
- * Set the tag that all participants require in order to serve this resource
- * @param participantGroupTag the tag
- * @return Builder
- */
- public T participantGroupTag(String participantGroupTag) {
- _participantGroupTag = participantGroupTag;
- return self();
- }
-
- /**
- * Set the serializer class for this rebalancer context
- * @param serializerClass class that implements ContextSerializer
- * @return Builder
- */
- public T serializerClass(Class<? extends ContextSerializer> serializerClass) {
- _serializerClass = serializerClass;
- return self();
- }
-
- /**
- * Specify a custom class to use for rebalancing
- * @param rebalancerRef RebalancerRef instance
- * @return Builder
- */
- public T rebalancerRef(RebalancerRef rebalancerRef) {
- _rebalancerRef = rebalancerRef;
- return self();
- }
-
- /**
- * Update an existing context with base fields
- * @param context derived context
- */
- protected final void update(BasicRebalancerContext context) {
- context.setResourceId(_resourceId);
- context.setStateModelDefId(_stateModelDefId);
- context.setStateModelFactoryId(_stateModelFactoryId);
- context.setParticipantGroupTag(_participantGroupTag);
- context.setSerializerClass(_serializerClass);
- context.setRebalancerRef(_rebalancerRef);
- }
-
- /**
- * Get a typed reference to "this" class. Final derived classes should simply return the this
- * reference.
- * @return this for the most specific type
- */
- protected abstract T self();
-
- /**
- * Get the rebalancer context from the built fields
- * @return RebalancerContext
- */
- public abstract RebalancerContext build();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java
deleted file mode 100644
index ef12a09..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ContextSerializer.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public interface ContextSerializer {
- /**
- * Convert a RebalancerContext object instance to a String
- * @param data instance of the rebalancer context type
- * @return String representing the object
- */
- public <T> String serialize(final T data);
-
- /**
- * Convert raw bytes to a generic object instance
- * @param clazz The class represented by the deserialized string
- * @param string String representing the object
- * @return instance of the generic type or null if the conversion failed
- */
- public <T> T deserialize(final Class<T> clazz, final String string);
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
deleted file mode 100644
index 0d2c1f2..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/CustomRebalancerContext.java
+++ /dev/null
@@ -1,164 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.CustomRebalancer;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for a resource that should be rebalanced in CUSTOMIZED mode. By default, it
- * corresponds to {@link CustomRebalancer}
- */
-public class CustomRebalancerContext extends PartitionedRebalancerContext {
- private Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
-
- /**
- * Instantiate a CustomRebalancerContext
- */
- public CustomRebalancerContext() {
- setRebalanceMode(RebalanceMode.CUSTOMIZED);
- setRebalancerRef(RebalancerRef.from(CustomRebalancer.class));
- _preferenceMaps = Maps.newHashMap();
- }
-
- /**
- * Get the preference maps of the partitions and replicas of the resource
- * @return map of partition to participant and state
- */
- public Map<PartitionId, Map<ParticipantId, State>> getPreferenceMaps() {
- return _preferenceMaps;
- }
-
- /**
- * Set the preference maps of the partitions and replicas of the resource
- * @param preferenceMaps map of partition to participant and state
- */
- public void setPreferenceMaps(Map<PartitionId, Map<ParticipantId, State>> preferenceMaps) {
- _preferenceMaps = preferenceMaps;
- }
-
- /**
- * Get the preference map of a partition
- * @param partitionId the partition to look up
- * @return map of participant to state
- */
- @JsonIgnore
- public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
- return _preferenceMaps.get(partitionId);
- }
-
- /**
- * Generate preference maps based on a default cluster setup
- * @param stateModelDef the state model definition to follow
- * @param participantSet the set of participant ids to configure for
- */
- @Override
- @JsonIgnore
- public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
- Set<ParticipantId> participantSet) {
- // compute default upper bounds
- Map<State, String> upperBounds = Maps.newHashMap();
- for (State state : stateModelDef.getTypedStatesPriorityList()) {
- upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
- }
-
- // determine the current mapping
- Map<PartitionId, Map<ParticipantId, State>> currentMapping = getPreferenceMaps();
-
- // determine the preference maps
- LinkedHashMap<State, Integer> stateCounts =
- ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
- getReplicaCount());
- ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
- List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
- List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
- AutoRebalanceStrategy strategy =
- new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
- getMaxPartitionsPerParticipant(), placementScheme);
- Map<String, Map<String, String>> rawPreferenceMaps =
- strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
- .getMapFields();
- Map<PartitionId, Map<ParticipantId, State>> preferenceMaps =
- Maps.newHashMap(ResourceAssignment.replicaMapsFromStringMaps(rawPreferenceMaps));
- setPreferenceMaps(preferenceMaps);
- }
-
- /**
- * Build a CustomRebalancerContext. By default, it corresponds to {@link CustomRebalancer}
- */
- public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
- private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
-
- /**
- * Instantiate for a resource
- * @param resourceId resource id
- */
- public Builder(ResourceId resourceId) {
- super(resourceId);
- super.rebalancerRef(RebalancerRef.from(CustomRebalancer.class));
- super.rebalanceMode(RebalanceMode.CUSTOMIZED);
- _preferenceMaps = Maps.newHashMap();
- }
-
- /**
- * Add a preference map for a partition
- * @param partitionId partition to set
- * @param preferenceList map of participant id to state indicating where replicas are served
- * @return Builder
- */
- public Builder preferenceMap(PartitionId partitionId, Map<ParticipantId, State> preferenceMap) {
- _preferenceMaps.put(partitionId, preferenceMap);
- return self();
- }
-
- @Override
- protected Builder self() {
- return this;
- }
-
- @Override
- public CustomRebalancerContext build() {
- CustomRebalancerContext context = new CustomRebalancerContext();
- super.update(context);
- context.setPreferenceMaps(_preferenceMaps);
- return context;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java
deleted file mode 100644
index ecc93fb..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/DefaultContextSerializer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.io.ByteArrayInputStream;
-import java.io.StringWriter;
-
-import org.apache.helix.HelixException;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Default serializer implementation for RebalancerContexts. Uses the Jackson JSON library to
- * convert to and from strings
- */
-public class DefaultContextSerializer implements ContextSerializer {
-
- private static Logger logger = Logger.getLogger(DefaultContextSerializer.class);
-
- @Override
- public <T> String serialize(final T data) {
- if (data == null) {
- return null;
- }
-
- ObjectMapper mapper = new ObjectMapper();
- SerializationConfig serializationConfig = mapper.getSerializationConfig();
- serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
- serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
- serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
- StringWriter sw = new StringWriter();
- try {
- mapper.writeValue(sw, data);
- } catch (Exception e) {
- logger.error("Exception during payload data serialization.", e);
- throw new HelixException(e);
- }
- return sw.toString();
- }
-
- @Override
- public <T> T deserialize(final Class<T> clazz, final String string) {
- if (string == null || string.length() == 0) {
- return null;
- }
-
- ObjectMapper mapper = new ObjectMapper();
- ByteArrayInputStream bais = new ByteArrayInputStream(string.getBytes());
-
- DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
- deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
- deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
- deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_CREATORS, true);
- deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
- deserializationConfig.set(DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
- try {
- T payload = mapper.readValue(bais, clazz);
- return payload;
- } catch (Exception e) {
- logger.error("Exception during deserialization of payload bytes: " + string, e);
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
deleted file mode 100644
index 2400707..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/FullAutoRebalancerContext.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.model.IdealState.RebalanceMode;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for FULL_AUTO rebalancing mode. By default, it corresponds to
- * {@link FullAutoRebalancer}
- */
-public class FullAutoRebalancerContext extends PartitionedRebalancerContext {
- public FullAutoRebalancerContext() {
- setRebalanceMode(RebalanceMode.FULL_AUTO);
- setRebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
- }
-
- /**
- * Builder for a full auto rebalancer context. By default, it corresponds to
- * {@link FullAutoRebalancer}
- */
- public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
- /**
- * Instantiate with a resource
- * @param resourceId resource id
- */
- public Builder(ResourceId resourceId) {
- super(resourceId);
- super.rebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
- super.rebalanceMode(RebalanceMode.FULL_AUTO);
- }
-
- @Override
- protected Builder self() {
- return this;
- }
-
- @Override
- public FullAutoRebalancerContext build() {
- FullAutoRebalancerContext context = new FullAutoRebalancerContext();
- super.update(context);
- return context;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/015e7dda/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
deleted file mode 100644
index 15fcf9c..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/PartitionedRebalancerContext.java
+++ /dev/null
@@ -1,393 +0,0 @@
-package org.apache.helix.controller.rebalancer.context;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixConstants.StateModelToken;
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerContext for a resource whose subunits are partitions. In addition, these partitions can
- * be replicated.
- */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class PartitionedRebalancerContext extends BasicRebalancerContext implements
- ReplicatedRebalancerContext {
- private Map<PartitionId, Partition> _partitionMap;
- private boolean _anyLiveParticipant;
- private int _replicaCount;
- private int _maxPartitionsPerParticipant;
- private RebalanceMode _rebalanceMode;
-
- /**
- * Instantiate a DataRebalancerContext
- */
- public PartitionedRebalancerContext() {
- _partitionMap = Collections.emptyMap();
- _replicaCount = 1;
- _anyLiveParticipant = false;
- _maxPartitionsPerParticipant = Integer.MAX_VALUE;
- _rebalanceMode = RebalanceMode.USER_DEFINED;
- }
-
- /**
- * Get a map from partition id to partition
- * @return partition map (mutable)
- */
- public Map<PartitionId, Partition> getPartitionMap() {
- return _partitionMap;
- }
-
- /**
- * Set a map of partition id to partition
- * @param partitionMap partition map
- */
- public void setPartitionMap(Map<PartitionId, Partition> partitionMap) {
- _partitionMap = Maps.newHashMap(partitionMap);
- }
-
- /**
- * Get the set of partitions for this resource
- * @return set of partition ids
- */
- @JsonIgnore
- public Set<PartitionId> getPartitionSet() {
- return _partitionMap.keySet();
- }
-
- /**
- * Get a partition
- * @param partitionId id of the partition to get
- * @return Partition object, or null if not present
- */
- @JsonIgnore
- public Partition getPartition(PartitionId partitionId) {
- return _partitionMap.get(partitionId);
- }
-
- @Override
- public boolean anyLiveParticipant() {
- return _anyLiveParticipant;
- }
-
- /**
- * Indicate if this resource should be assigned to any live participant
- * @param anyLiveParticipant true if any live participant expected, false otherwise
- */
- public void setAnyLiveParticipant(boolean anyLiveParticipant) {
- _anyLiveParticipant = anyLiveParticipant;
- }
-
- @Override
- public int getReplicaCount() {
- return _replicaCount;
- }
-
- /**
- * Set the number of replicas that each partition should have
- * @param replicaCount
- */
- public void setReplicaCount(int replicaCount) {
- _replicaCount = replicaCount;
- }
-
- /**
- * Get the maximum number of partitions that a participant can serve
- * @return maximum number of partitions per participant
- */
- public int getMaxPartitionsPerParticipant() {
- return _maxPartitionsPerParticipant;
- }
-
- /**
- * Set the maximum number of partitions that a participant can serve
- * @param maxPartitionsPerParticipant maximum number of partitions per participant
- */
- public void setMaxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
- _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
- }
-
- /**
- * Set the rebalancer mode of the partitioned resource
- * @param rebalanceMode {@link RebalanceMode} enum value
- */
- public void setRebalanceMode(RebalanceMode rebalanceMode) {
- _rebalanceMode = rebalanceMode;
- }
-
- /**
- * Get the rebalancer mode of the resource
- * @return RebalanceMode
- */
- public RebalanceMode getRebalanceMode() {
- return _rebalanceMode;
- }
-
- @Override
- @JsonIgnore
- public Map<PartitionId, Partition> getSubUnitMap() {
- return getPartitionMap();
- }
-
- /**
- * Generate a default configuration given the state model and a participant.
- * @param stateModelDef the state model definition to follow
- * @param participantSet the set of participant ids to configure for
- */
- @JsonIgnore
- public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
- Set<ParticipantId> participantSet) {
- // the base context does not understand enough to know do to anything
- }
-
- /**
- * Convert a physically-stored IdealState into a rebalancer context for a partitioned resource
- * @param idealState populated IdealState
- * @return PartitionedRebalancerContext
- */
- public static PartitionedRebalancerContext from(IdealState idealState) {
- PartitionedRebalancerContext context;
- switch (idealState.getRebalanceMode()) {
- case FULL_AUTO:
- FullAutoRebalancerContext.Builder fullAutoBuilder =
- new FullAutoRebalancerContext.Builder(idealState.getResourceId());
- populateContext(fullAutoBuilder, idealState);
- context = fullAutoBuilder.build();
- break;
- case SEMI_AUTO:
- SemiAutoRebalancerContext.Builder semiAutoBuilder =
- new SemiAutoRebalancerContext.Builder(idealState.getResourceId());
- for (PartitionId partitionId : idealState.getPartitionIdSet()) {
- semiAutoBuilder.preferenceList(partitionId, idealState.getPreferenceList(partitionId));
- }
- populateContext(semiAutoBuilder, idealState);
- context = semiAutoBuilder.build();
- break;
- case CUSTOMIZED:
- CustomRebalancerContext.Builder customBuilder =
- new CustomRebalancerContext.Builder(idealState.getResourceId());
- for (PartitionId partitionId : idealState.getPartitionIdSet()) {
- customBuilder.preferenceMap(partitionId, idealState.getParticipantStateMap(partitionId));
- }
- populateContext(customBuilder, idealState);
- context = customBuilder.build();
- break;
- default:
- Builder baseBuilder = new Builder(idealState.getResourceId());
- populateContext(baseBuilder, idealState);
- context = baseBuilder.build();
- break;
- }
- return context;
- }
-
- /**
- * Update a builder subclass with all the fields of the ideal state
- * @param builder builder that extends AbstractBuilder
- * @param idealState populated IdealState
- */
- private static <T extends AbstractBuilder<T>> void populateContext(T builder,
- IdealState idealState) {
- String replicas = idealState.getReplicas();
- int replicaCount = 0;
- boolean anyLiveParticipant = false;
- if (replicas.equals(StateModelToken.ANY_LIVEINSTANCE.toString())) {
- anyLiveParticipant = true;
- } else {
- replicaCount = Integer.parseInt(replicas);
- }
- if (idealState.getNumPartitions() > 0 && idealState.getPartitionIdSet().size() == 0) {
- // backwards compatibility: partition sets were based on pref lists/maps previously
- builder.addPartitions(idealState.getNumPartitions());
- } else {
- for (PartitionId partitionId : idealState.getPartitionIdSet()) {
- builder.addPartition(new Partition(partitionId));
- }
- }
- builder.anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
- .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
- .participantGroupTag(idealState.getInstanceGroupTag())
- .stateModelDefId(idealState.getStateModelDefId())
- .stateModelFactoryId(idealState.getStateModelFactoryId());
- RebalancerRef rebalancerRef = idealState.getRebalancerRef();
- if (rebalancerRef != null) {
- builder.rebalancerRef(rebalancerRef);
- }
- }
-
- /**
- * Builder for a basic data rebalancer context
- */
- public static final class Builder extends AbstractBuilder<Builder> {
- /**
- * Instantiate with a resource
- * @param resourceId resource id
- */
- public Builder(ResourceId resourceId) {
- super(resourceId);
- }
-
- @Override
- protected Builder self() {
- return this;
- }
-
- @Override
- public PartitionedRebalancerContext build() {
- PartitionedRebalancerContext context = new PartitionedRebalancerContext();
- super.update(context);
- return context;
- }
- }
-
- /**
- * Abstract builder for a generic partitioned resource rebalancer context
- */
- public static abstract class AbstractBuilder<T extends BasicRebalancerContext.AbstractBuilder<T>>
- extends BasicRebalancerContext.AbstractBuilder<T> {
- private final ResourceId _resourceId;
- private final Map<PartitionId, Partition> _partitionMap;
- private RebalanceMode _rebalanceMode;
- private boolean _anyLiveParticipant;
- private int _replicaCount;
- private int _maxPartitionsPerParticipant;
-
- /**
- * Instantiate with a resource
- * @param resourceId resource id
- */
- public AbstractBuilder(ResourceId resourceId) {
- super(resourceId);
- _resourceId = resourceId;
- _partitionMap = Maps.newHashMap();
- _rebalanceMode = RebalanceMode.USER_DEFINED;
- _anyLiveParticipant = false;
- _replicaCount = 1;
- _maxPartitionsPerParticipant = Integer.MAX_VALUE;
- }
-
- /**
- * Set the rebalance mode for a partitioned rebalancer context
- * @param rebalanceMode {@link RebalanceMode} enum value
- * @return Builder
- */
- public T rebalanceMode(RebalanceMode rebalanceMode) {
- _rebalanceMode = rebalanceMode;
- return self();
- }
-
- /**
- * Add a partition that the resource serves
- * @param partition fully-qualified partition
- * @return Builder
- */
- public T addPartition(Partition partition) {
- _partitionMap.put(partition.getId(), partition);
- return self();
- }
-
- /**
- * Add a collection of partitions
- * @param partitions any collection of Partition objects
- * @return Builder
- */
- public T addPartitions(Collection<Partition> partitions) {
- for (Partition partition : partitions) {
- addPartition(partition);
- }
- return self();
- }
-
- /**
- * Add a specified number of partitions with a default naming scheme, namely
- * resourceId_partitionNumber where partitionNumber starts at 0
- * @param partitionCount number of partitions to add
- * @return Builder
- */
- public T addPartitions(int partitionCount) {
- for (int i = 0; i < partitionCount; i++) {
- addPartition(new Partition(PartitionId.from(_resourceId, Integer.toString(i))));
- }
- return self();
- }
-
- /**
- * Set whether any live participant should be used in rebalancing
- * @param anyLiveParticipant true if any live participant can be used, false otherwise
- * @return Builder
- */
- public T anyLiveParticipant(boolean anyLiveParticipant) {
- _anyLiveParticipant = anyLiveParticipant;
- return self();
- }
-
- /**
- * Set the number of replicas
- * @param replicaCount number of replicas
- * @return Builder
- */
- public T replicaCount(int replicaCount) {
- _replicaCount = replicaCount;
- return self();
- }
-
- /**
- * Set the maximum number of partitions to assign to any participant
- * @param maxPartitionsPerParticipant the maximum
- * @return Builder
- */
- public T maxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
- _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
- return self();
- }
-
- /**
- * Update a DataRebalancerContext with fields from this builder level
- * @param context DataRebalancerContext
- */
- protected final void update(PartitionedRebalancerContext context) {
- super.update(context);
- // enforce at least one partition
- if (_partitionMap.isEmpty()) {
- addPartitions(1);
- }
- context.setRebalanceMode(_rebalanceMode);
- context.setPartitionMap(_partitionMap);
- context.setAnyLiveParticipant(_anyLiveParticipant);
- context.setMaxPartitionsPerParticipant(_maxPartitionsPerParticipant);
- context.setReplicaCount(_replicaCount);
- }
- }
-}