You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/31 01:06:42 UTC
[2/2] git commit: [HELIX-374] Rebalancer config should be fully
user-specified
[HELIX-374] Rebalancer config should be fully user-specified
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ee8ef6a7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ee8ef6a7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ee8ef6a7
Branch: refs/heads/master
Commit: ee8ef6a702de2b1490e6c09bff914adf6dbb7603
Parents: 08371b5
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Jul 30 14:37:02 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Jul 30 16:05:15 2014 -0700
----------------------------------------------------------------------
.../helix/api/accessor/ClusterAccessor.java | 78 +--
.../config/BasicRebalancerConfig.java | 256 ---------
.../config/CustomRebalancerConfig.java | 169 ------
.../config/FullAutoRebalancerConfig.java | 69 ---
.../config/PartitionedRebalancerConfig.java | 523 -------------------
.../rebalancer/config/RebalancerConfig.java | 57 +-
.../config/RebalancerConfigHolder.java | 15 -
.../config/ReplicatedRebalancerConfig.java | 40 --
.../config/SemiAutoRebalancerConfig.java | 183 -------
.../stages/BestPossibleStateCalcStage.java | 70 +--
.../stages/ExternalViewComputeStage.java | 8 +-
.../stages/MessageGenerationStage.java | 17 +-
.../stages/MessageSelectionStage.java | 21 +-
.../stages/ResourceComputationStage.java | 51 +-
.../java/org/apache/helix/model/IdealState.java | 83 +--
.../helix/model/builder/IdealStateBuilder.java | 10 +-
.../monitoring/mbeans/ClusterStatusMonitor.java | 4 +-
.../org/apache/helix/api/TestNewStages.java | 253 ---------
.../context/TestSerializeRebalancerContext.java | 105 ----
.../helix/controller/stages/BaseStageTest.java | 5 +-
.../stages/TestResourceComputationStage.java | 14 +-
.../stages/TestStagesWithLogicalAccessors.java | 255 +++++++++
.../TestCustomizedIdealStateRebalancer.java | 7 +-
.../helix/integration/TestHelixConnection.java | 20 +-
.../integration/TestLocalContainerProvider.java | 20 +-
.../TestUserDefRebalancerCompatibility.java | 3 +-
.../mbeans/TestClusterStatusMonitor.java | 20 +-
.../helix/examples/LogicalModelExample.java | 66 +--
.../tools/UpdateProvisionerConfig.java | 1 +
.../provisioning/yarn/AppMasterLauncher.java | 22 +-
30 files changed, 432 insertions(+), 2013 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 6b92275..ddf809a 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -53,8 +53,6 @@ import org.apache.helix.controller.provisioner.ContainerId;
import org.apache.helix.controller.provisioner.ContainerSpec;
import org.apache.helix.controller.provisioner.ContainerState;
import org.apache.helix.controller.provisioner.ProvisionerConfig;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
import org.apache.helix.controller.stages.ClusterDataCache;
@@ -64,7 +62,6 @@ import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
@@ -292,34 +289,8 @@ public class ClusterAccessor {
resourceConfigMap = _accessor.getChildValuesMap(_keyBuilder.resourceConfigs());
}
- // check if external view and resource assignment reads are required
- boolean extraReadsRequired = false;
- for (String resourceName : idealStateMap.keySet()) {
- if (extraReadsRequired) {
- break;
- }
- // a rebalancer can be user defined if it has that mode set, or has a different rebalancer
- // class
- IdealState idealState = idealStateMap.get(resourceName);
- extraReadsRequired =
- extraReadsRequired || (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED);
- RebalancerRef ref = idealState.getRebalancerRef();
- if (ref != null) {
- extraReadsRequired =
- extraReadsRequired
- || !PartitionedRebalancerConfig.isBuiltinRebalancer(ref.getRebalancerClass());
- }
- }
- for (String resourceName : resourceConfigMap.keySet()) {
- if (extraReadsRequired) {
- break;
- }
- extraReadsRequired =
- extraReadsRequired || resourceConfigMap.get(resourceName).hasRebalancerConfig();
- }
-
// now read external view and resource assignments if needed
- if (!useCache || extraReadsRequired) {
+ if (!useCache) {
externalViewMap = _accessor.getChildValuesMap(_keyBuilder.externalViews());
resourceAssignmentMap = _accessor.getChildValuesMap(_keyBuilder.resourceAssignments());
_cache.setAssignmentWritePolicy(true);
@@ -439,8 +410,8 @@ public class ClusterAccessor {
* @return true if resource added, false if there was an error
*/
public boolean addResource(ResourceConfig resource) {
- if (resource == null || resource.getRebalancerConfig() == null) {
- LOG.error("Resource not fully defined with a rebalancer config");
+ if (resource == null || resource.getIdealState() == null) {
+ LOG.error("Resource not fully defined with an ideal state");
return false;
}
@@ -448,8 +419,8 @@ public class ClusterAccessor {
LOG.error("Cluster: " + _clusterId + " structure is not valid");
return false;
}
- RebalancerConfig config = resource.getRebalancerConfig();
- StateModelDefId stateModelDefId = config.getStateModelDefId();
+ IdealState idealState = resource.getIdealState();
+ StateModelDefId stateModelDefId = idealState.getStateModelDefId();
if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
LOG.error("State model: " + stateModelDefId + " not found in cluster: " + _clusterId);
return false;
@@ -467,13 +438,8 @@ public class ClusterAccessor {
return false;
}
- // Create an IdealState from a RebalancerConfig (if the resource supports it)
- IdealState idealState =
- PartitionedRebalancerConfig.rebalancerConfigToIdealState(resource.getRebalancerConfig(), 0,
- false);
- if (idealState != null) {
- _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
- }
+ // Persist the ideal state
+ _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
// Add resource user config
boolean persistConfig = false;
@@ -482,11 +448,10 @@ public class ClusterAccessor {
configuration.addNamespacedConfig(resource.getUserConfig());
persistConfig = true;
}
- PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
- if (idealState == null
- && (partitionedConfig == null || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED)) {
+ RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+ if (rebalancerConfig != null) {
// only persist if this is not easily convertible to an ideal state
- configuration.addNamespacedConfig(new RebalancerConfigHolder(resource.getRebalancerConfig())
+ configuration.addNamespacedConfig(new RebalancerConfigHolder(rebalancerConfig)
.toNamespacedConfig());
persistConfig = true;
}
@@ -796,29 +761,10 @@ public class ClusterAccessor {
RebalancerConfig rebalancerConfig = null;
if (resourceConfiguration != null) {
userConfig = resourceConfiguration.getUserConfig();
- } else {
- userConfig = new UserConfig(Scope.resource(resourceId));
- }
- if (idealState != null) {
- if (resourceConfiguration != null
- && idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
- // prefer rebalancer config for user_defined data rebalancing
- rebalancerConfig =
- resourceConfiguration.getRebalancerConfig(PartitionedRebalancerConfig.class);
- }
- if (rebalancerConfig == null) {
- // prefer ideal state for non-user_defined data rebalancing
- rebalancerConfig = PartitionedRebalancerConfig.from(idealState);
- }
- idealState.updateUserConfig(userConfig);
- } else if (resourceConfiguration != null) {
rebalancerConfig = resourceConfiguration.getRebalancerConfig(RebalancerConfig.class);
- }
- if (rebalancerConfig == null) {
- rebalancerConfig = new PartitionedRebalancerConfig();
- }
- if (resourceConfiguration != null) {
provisionerConfig = resourceConfiguration.getProvisionerConfig(ProvisionerConfig.class);
+ } else {
+ userConfig = new UserConfig(Scope.resource(resourceId));
}
ResourceConfig resourceConfig =
new ResourceConfig.Builder(resourceId).idealState(idealState)
http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java
deleted file mode 100644
index d6aa10e..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/BasicRebalancerConfig.java
+++ /dev/null
@@ -1,256 +0,0 @@
-package org.apache.helix.controller.rebalancer.config;
-
-import java.util.Set;
-
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.serializer.DefaultStringSerializer;
-import org.apache.helix.controller.serializer.StringSerializer;
-import org.codehaus.jackson.annotate.JsonIgnore;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Abstract RebalancerConfig that functions for generic subunits. Use a subclass that more
- * concretely defines the subunits.
- */
-public abstract class BasicRebalancerConfig implements RebalancerConfig {
- private ResourceId _resourceId;
- private StateModelDefId _stateModelDefId;
- private StateModelFactoryId _stateModelFactoryId;
- private String _participantGroupTag;
- private Class<? extends StringSerializer> _serializer;
- private RebalancerRef _rebalancerRef;
-
- /**
- * Instantiate a basic rebalancer config
- */
- public BasicRebalancerConfig() {
- _serializer = DefaultStringSerializer.class;
- }
-
- @Override
- public ResourceId getResourceId() {
- return _resourceId;
- }
-
- /**
- * Set the resource to rebalance
- * @param resourceId resource id
- */
- public void setResourceId(ResourceId resourceId) {
- _resourceId = resourceId;
- }
-
- @Override
- public StateModelDefId getStateModelDefId() {
- return _stateModelDefId;
- }
-
- /**
- * Set the state model definition that the resource follows
- * @param stateModelDefId state model definition id
- */
- public void setStateModelDefId(StateModelDefId stateModelDefId) {
- _stateModelDefId = stateModelDefId;
- }
-
- @Override
- public StateModelFactoryId getStateModelFactoryId() {
- return _stateModelFactoryId;
- }
-
- /**
- * Set the state model factory that the resource uses
- * @param stateModelFactoryId state model factory id
- */
- public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
- _stateModelFactoryId = stateModelFactoryId;
- }
-
- @Override
- public String getParticipantGroupTag() {
- return _participantGroupTag;
- }
-
- /**
- * Set a tag that participants must have in order to serve this resource
- * @param participantGroupTag string group tag
- */
- public void setParticipantGroupTag(String participantGroupTag) {
- _participantGroupTag = participantGroupTag;
- }
-
- /**
- * Get the serializer. If none is provided, {@link DefaultStringSerializer} is used
- */
- @Override
- public Class<? extends StringSerializer> getSerializerClass() {
- return _serializer;
- }
-
- /**
- * Set the class that can serialize this config
- * @param serializer serializer class that implements StringSerializer
- */
- public void setSerializerClass(Class<? extends StringSerializer> serializer) {
- _serializer = serializer;
- }
-
- @Override
- @JsonIgnore
- public Set<? extends PartitionId> getSubUnitIdSet() {
- return getSubUnitMap().keySet();
- }
-
- @Override
- @JsonIgnore
- public Partition getSubUnit(PartitionId subUnitId) {
- return getSubUnitMap().get(subUnitId);
- }
-
- @Override
- public RebalancerRef getRebalancerRef() {
- return _rebalancerRef;
- }
-
- /**
- * Set the reference to the class used to rebalance this resource
- * @param rebalancerRef RebalancerRef instance
- */
- public void setRebalancerRef(RebalancerRef rebalancerRef) {
- _rebalancerRef = rebalancerRef;
- }
-
- /**
- * Safely cast a RebalancerConfig into a subtype
- * @param config RebalancerConfig object
- * @param clazz the target class
- * @return An instance of clazz, or null if the conversion is not possible
- */
- public static <T extends RebalancerConfig> T convert(RebalancerConfig config, Class<T> clazz) {
- try {
- return clazz.cast(config);
- } catch (ClassCastException e) {
- return null;
- }
- }
-
- /**
- * Abstract builder for the base rebalancer config
- */
- public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
- private final ResourceId _resourceId;
- private StateModelDefId _stateModelDefId;
- private StateModelFactoryId _stateModelFactoryId;
- private String _participantGroupTag;
- private Class<? extends StringSerializer> _serializerClass;
- private RebalancerRef _rebalancerRef;
-
- /**
- * Instantiate with a resource id
- * @param resourceId resource id
- */
- public AbstractBuilder(ResourceId resourceId) {
- _resourceId = resourceId;
- _serializerClass = DefaultStringSerializer.class;
- }
-
- /**
- * Set the state model definition that the resource should follow
- * @param stateModelDefId state model definition id
- * @return Builder
- */
- public T stateModelDefId(StateModelDefId stateModelDefId) {
- _stateModelDefId = stateModelDefId;
- return self();
- }
-
- /**
- * Set the state model factory that the resource should use
- * @param stateModelFactoryId state model factory id
- * @return Builder
- */
- public T stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
- _stateModelFactoryId = stateModelFactoryId;
- return self();
- }
-
- /**
- * Set the tag that all participants require in order to serve this resource
- * @param participantGroupTag the tag
- * @return Builder
- */
- public T participantGroupTag(String participantGroupTag) {
- _participantGroupTag = participantGroupTag;
- return self();
- }
-
- /**
- * Set the serializer class for this rebalancer config
- * @param serializerClass class that implements StringSerializer
- * @return Builder
- */
- public T serializerClass(Class<? extends StringSerializer> serializerClass) {
- _serializerClass = serializerClass;
- return self();
- }
-
- /**
- * Specify a custom class to use for rebalancing
- * @param rebalancerRef RebalancerRef instance
- * @return Builder
- */
- public T rebalancerRef(RebalancerRef rebalancerRef) {
- _rebalancerRef = rebalancerRef;
- return self();
- }
-
- /**
- * Update an existing context with base fields
- * @param config derived config
- */
- protected final void update(BasicRebalancerConfig config) {
- config.setResourceId(_resourceId);
- config.setStateModelDefId(_stateModelDefId);
- config.setStateModelFactoryId(_stateModelFactoryId);
- config.setParticipantGroupTag(_participantGroupTag);
- config.setSerializerClass(_serializerClass);
- config.setRebalancerRef(_rebalancerRef);
- }
-
- /**
- * Get a typed reference to "this" class. Final derived classes should simply return the this
- * reference.
- * @return this for the most specific type
- */
- protected abstract T self();
-
- /**
- * Get the rebalancer config from the built fields
- * @return RebalancerConfig
- */
- public abstract RebalancerConfig build();
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
deleted file mode 100644
index a44b230..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
+++ /dev/null
@@ -1,169 +0,0 @@
-package org.apache.helix.controller.rebalancer.config;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.CustomRebalancer;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerConfig for a resource that should be rebalanced in CUSTOMIZED mode. By default, it
- * corresponds to {@link CustomRebalancer}
- */
-public class CustomRebalancerConfig extends PartitionedRebalancerConfig {
- private Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
-
- /**
- * Instantiate a CustomRebalancerConfig
- */
- public CustomRebalancerConfig() {
- if (getClass().equals(CustomRebalancerConfig.class)) {
- // only mark this as customized mode if this specifc config is used
- setRebalanceMode(RebalanceMode.CUSTOMIZED);
- } else {
- setRebalanceMode(RebalanceMode.USER_DEFINED);
- }
- setRebalancerRef(RebalancerRef.from(CustomRebalancer.class));
- _preferenceMaps = Maps.newHashMap();
- }
-
- /**
- * Get the preference maps of the partitions and replicas of the resource
- * @return map of partition to participant and state
- */
- public Map<PartitionId, Map<ParticipantId, State>> getPreferenceMaps() {
- return _preferenceMaps;
- }
-
- /**
- * Set the preference maps of the partitions and replicas of the resource
- * @param preferenceMaps map of partition to participant and state
- */
- public void setPreferenceMaps(Map<PartitionId, Map<ParticipantId, State>> preferenceMaps) {
- _preferenceMaps = preferenceMaps;
- }
-
- /**
- * Get the preference map of a partition
- * @param partitionId the partition to look up
- * @return map of participant to state
- */
- @JsonIgnore
- public Map<ParticipantId, State> getPreferenceMap(PartitionId partitionId) {
- return _preferenceMaps.get(partitionId);
- }
-
- /**
- * Generate preference maps based on a default cluster setup
- * @param stateModelDef the state model definition to follow
- * @param participantSet the set of participant ids to configure for
- */
- @Override
- @JsonIgnore
- public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
- Set<ParticipantId> participantSet) {
- // compute default upper bounds
- Map<State, String> upperBounds = Maps.newHashMap();
- for (State state : stateModelDef.getTypedStatesPriorityList()) {
- upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
- }
-
- // determine the current mapping
- Map<PartitionId, Map<ParticipantId, State>> currentMapping = getPreferenceMaps();
-
- // determine the preference maps
- LinkedHashMap<State, Integer> stateCounts =
- ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
- getReplicaCount());
- ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
- List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
- List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
- AutoRebalanceStrategy strategy =
- new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
- getMaxPartitionsPerParticipant(), placementScheme);
- Map<String, Map<String, String>> rawPreferenceMaps =
- strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
- .getMapFields();
- Map<PartitionId, Map<ParticipantId, State>> preferenceMaps =
- Maps.newHashMap(ResourceAssignment.replicaMapsFromStringMaps(rawPreferenceMaps));
- setPreferenceMaps(preferenceMaps);
- }
-
- /**
- * Build a CustomRebalancerConfig. By default, it corresponds to {@link CustomRebalancer}
- */
- public static final class Builder extends PartitionedRebalancerConfig.AbstractBuilder<Builder> {
- private final Map<PartitionId, Map<ParticipantId, State>> _preferenceMaps;
-
- /**
- * Instantiate for a resource
- * @param resourceId resource id
- */
- public Builder(ResourceId resourceId) {
- super(resourceId);
- super.rebalancerRef(RebalancerRef.from(CustomRebalancer.class));
- super.rebalanceMode(RebalanceMode.CUSTOMIZED);
- _preferenceMaps = Maps.newHashMap();
- }
-
- /**
- * Add a preference map for a partition
- * @param partitionId partition to set
- * @param preferenceList map of participant id to state indicating where replicas are served
- * @return Builder
- */
- public Builder preferenceMap(PartitionId partitionId, Map<ParticipantId, State> preferenceMap) {
- _preferenceMaps.put(partitionId, preferenceMap);
- return self();
- }
-
- @Override
- protected Builder self() {
- return this;
- }
-
- @Override
- public CustomRebalancerConfig build() {
- CustomRebalancerConfig config = new CustomRebalancerConfig();
- super.update(config);
- config.setPreferenceMaps(_preferenceMaps);
- return config;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
deleted file mode 100644
index 16bb4cb..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.apache.helix.controller.rebalancer.config;
-
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.model.IdealState.RebalanceMode;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerConfig for FULL_AUTO rebalancing mode. By default, it corresponds to
- * {@link FullAutoRebalancer}
- */
-public class FullAutoRebalancerConfig extends PartitionedRebalancerConfig {
- public FullAutoRebalancerConfig() {
- if (getClass().equals(FullAutoRebalancerConfig.class)) {
- // only mark this as full auto mode if this specifc config is used
- setRebalanceMode(RebalanceMode.FULL_AUTO);
- } else {
- setRebalanceMode(RebalanceMode.USER_DEFINED);
- }
- setRebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
- }
-
- /**
- * Builder for a full auto rebalancer config. By default, it corresponds to
- * {@link FullAutoRebalancer}
- */
- public static final class Builder extends PartitionedRebalancerConfig.AbstractBuilder<Builder> {
- /**
- * Instantiate with a resource
- * @param resourceId resource id
- */
- public Builder(ResourceId resourceId) {
- super(resourceId);
- super.rebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
- super.rebalanceMode(RebalanceMode.FULL_AUTO);
- }
-
- @Override
- protected Builder self() {
- return this;
- }
-
- @Override
- public FullAutoRebalancerConfig build() {
- FullAutoRebalancerConfig config = new FullAutoRebalancerConfig();
- super.update(config);
- return config;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
deleted file mode 100644
index 934a9c2..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
+++ /dev/null
@@ -1,523 +0,0 @@
-package org.apache.helix.controller.rebalancer.config;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixConstants.StateModelToken;
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.CustomRebalancer;
-import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
-import org.apache.helix.controller.rebalancer.HelixRebalancer;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.task.TaskRebalancer;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerConfig for a resource whose subunits are partitions. In addition, these partitions can
- * be replicated.
- */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class PartitionedRebalancerConfig extends BasicRebalancerConfig implements
- ReplicatedRebalancerConfig {
- private Map<PartitionId, Partition> _partitionMap;
- private boolean _anyLiveParticipant;
- private int _replicaCount;
- private int _maxPartitionsPerParticipant;
- private RebalanceMode _rebalanceMode;
-
- @JsonIgnore
- private static final Set<Class<? extends RebalancerConfig>> BUILTIN_CONFIG_CLASSES = Sets
- .newHashSet();
- @JsonIgnore
- private static final Set<Class<? extends HelixRebalancer>> BUILTIN_REBALANCER_CLASSES = Sets
- .newHashSet();
- static {
- BUILTIN_CONFIG_CLASSES.add(PartitionedRebalancerConfig.class);
- BUILTIN_CONFIG_CLASSES.add(FullAutoRebalancerConfig.class);
- BUILTIN_CONFIG_CLASSES.add(SemiAutoRebalancerConfig.class);
- BUILTIN_CONFIG_CLASSES.add(CustomRebalancerConfig.class);
- BUILTIN_REBALANCER_CLASSES.add(FullAutoRebalancer.class);
- BUILTIN_REBALANCER_CLASSES.add(SemiAutoRebalancer.class);
- BUILTIN_REBALANCER_CLASSES.add(CustomRebalancer.class);
- BUILTIN_REBALANCER_CLASSES.add(TaskRebalancer.class);
- }
-
- /**
- * Instantiate a PartitionedRebalancerConfig
- */
- public PartitionedRebalancerConfig() {
- _partitionMap = Collections.emptyMap();
- _replicaCount = 1;
- _anyLiveParticipant = false;
- _maxPartitionsPerParticipant = Integer.MAX_VALUE;
- _rebalanceMode = RebalanceMode.USER_DEFINED;
- }
-
- /**
- * Get a map from partition id to partition
- * @return partition map (mutable)
- */
- public Map<PartitionId, Partition> getPartitionMap() {
- return _partitionMap;
- }
-
- /**
- * Set a map of partition id to partition
- * @param partitionMap partition map
- */
- public void setPartitionMap(Map<PartitionId, Partition> partitionMap) {
- _partitionMap = Maps.newHashMap(partitionMap);
- }
-
- /**
- * Get the set of partitions for this resource
- * @return set of partition ids
- */
- @JsonIgnore
- public Set<PartitionId> getPartitionSet() {
- return _partitionMap.keySet();
- }
-
- /**
- * Get a partition
- * @param partitionId id of the partition to get
- * @return Partition object, or null if not present
- */
- @JsonIgnore
- public Partition getPartition(PartitionId partitionId) {
- return _partitionMap.get(partitionId);
- }
-
- @Override
- public boolean anyLiveParticipant() {
- return _anyLiveParticipant;
- }
-
- /**
- * Indicate if this resource should be assigned to any live participant
- * @param anyLiveParticipant true if any live participant expected, false otherwise
- */
- public void setAnyLiveParticipant(boolean anyLiveParticipant) {
- _anyLiveParticipant = anyLiveParticipant;
- }
-
- @Override
- public int getReplicaCount() {
- return _replicaCount;
- }
-
- /**
- * Set the number of replicas that each partition should have
- * @param replicaCount
- */
- public void setReplicaCount(int replicaCount) {
- _replicaCount = replicaCount;
- }
-
- /**
- * Get the maximum number of partitions that a participant can serve
- * @return maximum number of partitions per participant
- */
- public int getMaxPartitionsPerParticipant() {
- return _maxPartitionsPerParticipant;
- }
-
- /**
- * Set the maximum number of partitions that a participant can serve
- * @param maxPartitionsPerParticipant maximum number of partitions per participant
- */
- public void setMaxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
- _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
- }
-
- /**
- * Set the rebalancer mode of the partitioned resource
- * @param rebalanceMode {@link RebalanceMode} enum value
- */
- public void setRebalanceMode(RebalanceMode rebalanceMode) {
- _rebalanceMode = rebalanceMode;
- }
-
- /**
- * Get the rebalancer mode of the resource
- * @return RebalanceMode
- */
- public RebalanceMode getRebalanceMode() {
- return _rebalanceMode;
- }
-
- @Override
- @JsonIgnore
- public Map<PartitionId, Partition> getSubUnitMap() {
- return getPartitionMap();
- }
-
- /**
- * Generate a default configuration given the state model and a participant.
- * @param stateModelDef the state model definition to follow
- * @param participantSet the set of participant ids to configure for
- */
- @JsonIgnore
- public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
- Set<ParticipantId> participantSet) {
- // the base config does not understand enough to know do to anything
- }
-
- /**
- * Safely get a {@link PartitionedRebalancerConfig} from a {@link RebalancerConfig}
- * @param config the base config
- * @return a {@link PartitionedRebalancerConfig}, or null if the conversion is not possible
- */
- public static PartitionedRebalancerConfig from(RebalancerConfig config) {
- try {
- return PartitionedRebalancerConfig.class.cast(config);
- } catch (ClassCastException e) {
- return null;
- }
- }
-
- /**
- * Check if the given class is compatible with an {@link IdealState}
- * @param clazz the PartitionedRebalancerConfig subclass
- * @return true if IdealState can be used to describe this config, false otherwise
- */
- public static boolean isBuiltinConfig(Class<? extends RebalancerConfig> clazz) {
- return BUILTIN_CONFIG_CLASSES.contains(clazz);
- }
-
- /**
- * Check if the given class is a built-in rebalancer class
- * @param clazz the HelixRebalancer subclass
- * @return true if the rebalancer class is built in, false otherwise
- */
- public static boolean isBuiltinRebalancer(Class<? extends HelixRebalancer> clazz) {
- return BUILTIN_REBALANCER_CLASSES.contains(clazz);
- }
-
- /**
- * Convert a physically-stored IdealState into a rebalancer config for a partitioned resource
- * @param idealState populated IdealState
- * @return PartitionedRebalancerConfig
- */
- public static PartitionedRebalancerConfig from(IdealState idealState) {
- PartitionedRebalancerConfig config;
- RebalanceMode mode = idealState.getRebalanceMode();
- if (mode == RebalanceMode.USER_DEFINED) {
- Class<? extends RebalancerConfig> configClass = idealState.getRebalancerConfigClass();
- if (configClass.equals(FullAutoRebalancerConfig.class)) {
- mode = RebalanceMode.FULL_AUTO;
- } else if (configClass.equals(SemiAutoRebalancerConfig.class)) {
- mode = RebalanceMode.SEMI_AUTO;
- } else if (configClass.equals(CustomRebalancerConfig.class)) {
- mode = RebalanceMode.CUSTOMIZED;
- }
- }
- switch (mode) {
- case FULL_AUTO:
- FullAutoRebalancerConfig.Builder fullAutoBuilder =
- new FullAutoRebalancerConfig.Builder(idealState.getResourceId());
- populateConfig(fullAutoBuilder, idealState);
- config = fullAutoBuilder.build();
- break;
- case SEMI_AUTO:
- SemiAutoRebalancerConfig.Builder semiAutoBuilder =
- new SemiAutoRebalancerConfig.Builder(idealState.getResourceId());
- for (PartitionId partitionId : idealState.getPartitionIdSet()) {
- semiAutoBuilder.preferenceList(partitionId, idealState.getPreferenceList(partitionId));
- }
- populateConfig(semiAutoBuilder, idealState);
- config = semiAutoBuilder.build();
- break;
- case CUSTOMIZED:
- CustomRebalancerConfig.Builder customBuilder =
- new CustomRebalancerConfig.Builder(idealState.getResourceId());
- for (PartitionId partitionId : idealState.getPartitionIdSet()) {
- customBuilder.preferenceMap(partitionId, idealState.getParticipantStateMap(partitionId));
- }
- populateConfig(customBuilder, idealState);
- config = customBuilder.build();
- break;
- default:
- Builder baseBuilder = new Builder(idealState.getResourceId());
- populateConfig(baseBuilder, idealState);
- config = baseBuilder.build();
- break;
- }
- return config;
- }
-
- /**
- * Update a builder subclass with all the fields of the ideal state
- * @param builder builder that extends AbstractBuilder
- * @param idealState populated IdealState
- */
- private static <T extends AbstractBuilder<T>> void populateConfig(T builder, IdealState idealState) {
- String replicas = idealState.getReplicas();
- int replicaCount = 0;
- boolean anyLiveParticipant = false;
- if (replicas.equals(StateModelToken.ANY_LIVEINSTANCE.toString())) {
- anyLiveParticipant = true;
- } else {
- replicaCount = Integer.parseInt(replicas);
- }
- if (idealState.getNumPartitions() > 0 && idealState.getPartitionIdSet().size() == 0) {
- // backwards compatibility: partition sets were based on pref lists/maps previously
- builder.addPartitions(idealState.getNumPartitions());
- } else {
- for (PartitionId partitionId : idealState.getPartitionIdSet()) {
- builder.addPartition(new Partition(partitionId));
- }
- }
- builder.anyLiveParticipant(anyLiveParticipant).replicaCount(replicaCount)
- .maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
- .participantGroupTag(idealState.getInstanceGroupTag())
- .stateModelDefId(idealState.getStateModelDefId())
- .stateModelFactoryId(idealState.getStateModelFactoryId())
- .rebalanceMode(idealState.getRebalanceMode());
- RebalancerRef rebalancerRef = idealState.getRebalancerRef();
- if (rebalancerRef != null) {
- builder.rebalancerRef(rebalancerRef);
- }
- }
-
- /**
- * Get an ideal state from a rebalancer config if the resource is partitioned
- * @param config RebalancerConfig instance
- * @param bucketSize bucket size to use
- * @param batchMessageMode true if batch messaging allowed, false otherwise
- * @return IdealState, or null
- */
- public static IdealState rebalancerConfigToIdealState(RebalancerConfig config, int bucketSize,
- boolean batchMessageMode) {
- PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
- if (partitionedConfig != null) {
- if (!PartitionedRebalancerConfig.isBuiltinConfig(partitionedConfig.getClass())) {
- // don't proceed if this resource cannot be described by an ideal state
- return null;
- }
- IdealState idealState = new IdealState(partitionedConfig.getResourceId());
- idealState.setRebalanceMode(partitionedConfig.getRebalanceMode());
-
- RebalancerRef ref = partitionedConfig.getRebalancerRef();
- if (ref != null) {
- idealState.setRebalancerRef(partitionedConfig.getRebalancerRef());
- }
- String replicas = null;
- if (partitionedConfig.anyLiveParticipant()) {
- replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
- } else {
- replicas = Integer.toString(partitionedConfig.getReplicaCount());
- }
- idealState.setReplicas(replicas);
- idealState.setNumPartitions(partitionedConfig.getPartitionSet().size());
- idealState.setInstanceGroupTag(partitionedConfig.getParticipantGroupTag());
- idealState.setMaxPartitionsPerInstance(partitionedConfig.getMaxPartitionsPerParticipant());
- idealState.setStateModelDefId(partitionedConfig.getStateModelDefId());
- idealState.setStateModelFactoryId(partitionedConfig.getStateModelFactoryId());
- idealState.setBucketSize(bucketSize);
- idealState.setBatchMessageMode(batchMessageMode);
- idealState.setRebalancerConfigClass(config.getClass());
- if (SemiAutoRebalancerConfig.class.equals(config.getClass())) {
- SemiAutoRebalancerConfig semiAutoConfig =
- BasicRebalancerConfig.convert(config, SemiAutoRebalancerConfig.class);
- for (PartitionId partitionId : semiAutoConfig.getPartitionSet()) {
- idealState.setPreferenceList(partitionId, semiAutoConfig.getPreferenceList(partitionId));
- }
- } else if (CustomRebalancerConfig.class.equals(config.getClass())) {
- CustomRebalancerConfig customConfig =
- BasicRebalancerConfig.convert(config, CustomRebalancerConfig.class);
- for (PartitionId partitionId : customConfig.getPartitionSet()) {
- idealState
- .setParticipantStateMap(partitionId, customConfig.getPreferenceMap(partitionId));
- }
- } else {
- for (PartitionId partitionId : partitionedConfig.getPartitionSet()) {
- List<ParticipantId> preferenceList = Collections.emptyList();
- idealState.setPreferenceList(partitionId, preferenceList);
- Map<ParticipantId, State> participantStateMap = Collections.emptyMap();
- idealState.setParticipantStateMap(partitionId, participantStateMap);
- }
- }
- return idealState;
- }
- return null;
- }
-
- /**
- * Builder for a basic data rebalancer config
- */
- public static final class Builder extends AbstractBuilder<Builder> {
- /**
- * Instantiate with a resource
- * @param resourceId resource id
- */
- public Builder(ResourceId resourceId) {
- super(resourceId);
- }
-
- @Override
- protected Builder self() {
- return this;
- }
-
- @Override
- public PartitionedRebalancerConfig build() {
- PartitionedRebalancerConfig config = new PartitionedRebalancerConfig();
- super.update(config);
- return config;
- }
- }
-
- /**
- * Abstract builder for a generic partitioned resource rebalancer config
- */
- public static abstract class AbstractBuilder<T extends BasicRebalancerConfig.AbstractBuilder<T>>
- extends BasicRebalancerConfig.AbstractBuilder<T> {
- private final ResourceId _resourceId;
- private final Map<PartitionId, Partition> _partitionMap;
- private RebalanceMode _rebalanceMode;
- private boolean _anyLiveParticipant;
- private int _replicaCount;
- private int _maxPartitionsPerParticipant;
-
- /**
- * Instantiate with a resource
- * @param resourceId resource id
- */
- public AbstractBuilder(ResourceId resourceId) {
- super(resourceId);
- _resourceId = resourceId;
- _partitionMap = Maps.newHashMap();
- _rebalanceMode = RebalanceMode.USER_DEFINED;
- _anyLiveParticipant = false;
- _replicaCount = 1;
- _maxPartitionsPerParticipant = Integer.MAX_VALUE;
- }
-
- /**
- * Set the rebalance mode for a partitioned rebalancer config
- * @param rebalanceMode {@link RebalanceMode} enum value
- * @return Builder
- */
- public T rebalanceMode(RebalanceMode rebalanceMode) {
- _rebalanceMode = rebalanceMode;
- return self();
- }
-
- /**
- * Add a partition that the resource serves
- * @param partition fully-qualified partition
- * @return Builder
- */
- public T addPartition(Partition partition) {
- _partitionMap.put(partition.getId(), partition);
- return self();
- }
-
- /**
- * Add a collection of partitions
- * @param partitions any collection of Partition objects
- * @return Builder
- */
- public T addPartitions(Collection<Partition> partitions) {
- for (Partition partition : partitions) {
- addPartition(partition);
- }
- return self();
- }
-
- /**
- * Add a specified number of partitions with a default naming scheme, namely
- * resourceId_partitionNumber where partitionNumber starts at 0
- * @param partitionCount number of partitions to add
- * @return Builder
- */
- public T addPartitions(int partitionCount) {
- for (int i = 0; i < partitionCount; i++) {
- addPartition(new Partition(PartitionId.from(_resourceId, Integer.toString(i))));
- }
- return self();
- }
-
- /**
- * Set whether any live participant should be used in rebalancing
- * @param anyLiveParticipant true if any live participant can be used, false otherwise
- * @return Builder
- */
- public T anyLiveParticipant(boolean anyLiveParticipant) {
- _anyLiveParticipant = anyLiveParticipant;
- return self();
- }
-
- /**
- * Set the number of replicas
- * @param replicaCount number of replicas
- * @return Builder
- */
- public T replicaCount(int replicaCount) {
- _replicaCount = replicaCount;
- return self();
- }
-
- /**
- * Set the maximum number of partitions to assign to any participant
- * @param maxPartitionsPerParticipant the maximum
- * @return Builder
- */
- public T maxPartitionsPerParticipant(int maxPartitionsPerParticipant) {
- _maxPartitionsPerParticipant = maxPartitionsPerParticipant;
- return self();
- }
-
- /**
- * Update a PartitionedRebalancerConfig with fields from this builder level
- * @param config PartitionedRebalancerConfig
- */
- protected final void update(PartitionedRebalancerConfig config) {
- super.update(config);
- // enforce at least one partition
- if (_partitionMap.isEmpty()) {
- addPartitions(1);
- }
- config.setRebalanceMode(_rebalanceMode);
- config.setPartitionMap(_partitionMap);
- config.setAnyLiveParticipant(_anyLiveParticipant);
- config.setMaxPartitionsPerParticipant(_maxPartitionsPerParticipant);
- config.setReplicaCount(_replicaCount);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java
index 3f8c9d1..b725c9e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfig.java
@@ -1,14 +1,6 @@
package org.apache.helix.controller.rebalancer.config;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.api.id.StateModelFactoryId;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
import org.apache.helix.controller.serializer.StringSerializer;
/*
@@ -31,31 +23,10 @@ import org.apache.helix.controller.serializer.StringSerializer;
*/
/**
- * Defines the state available to a rebalancer. The most common use case is to use a
- * {@link PartitionedRebalancerConfig} or a subclass and set up a resource with it. A rebalancer
- * configuration, at a minimum, is aware of subunits of a resource, the state model to follow, and
- * how the configuration should be serialized.
+ * Defines the state available to a rebalancer. This is fully optional, but it allows persistence of
+ * arbitrary fields that would be useful for a user-defined rebalancer
*/
public interface RebalancerConfig {
- /**
- * Get a map of resource partition identifiers to partitions. A partition is a subunit of a
- * resource, e.g. a subtask of a task
- * @return map of (subunit id, subunit) pairs
- */
- public Map<? extends PartitionId, ? extends Partition> getSubUnitMap();
-
- /**
- * Get the subunits of the resource (e.g. partitions)
- * @return set of subunit ids
- */
- public Set<? extends PartitionId> getSubUnitIdSet();
-
- /**
- * Get a specific subunit
- * @param subUnitId the id of the subunit
- * @return SubUnit
- */
- public Partition getSubUnit(PartitionId subUnitId);
/**
* Get the resource to rebalance
@@ -64,32 +35,8 @@ public interface RebalancerConfig {
public ResourceId getResourceId();
/**
- * Get the state model definition that the resource follows
- * @return state model definition id
- */
- public StateModelDefId getStateModelDefId();
-
- /**
- * Get the state model factory of this resource
- * @return state model factory id
- */
- public StateModelFactoryId getStateModelFactoryId();
-
- /**
- * Get the tag, if any, that participants must have in order to serve this resource
- * @return participant group tag, or null
- */
- public String getParticipantGroupTag();
-
- /**
* Get the serializer for this config
* @return StringSerializer class object
*/
public Class<? extends StringSerializer> getSerializerClass();
-
- /**
- * Get a reference to the class used to rebalance this resource
- * @return RebalancerRef
- */
- public RebalancerRef getRebalancerRef();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
index d6ddb50..19970c4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
@@ -119,21 +119,6 @@ public final class RebalancerConfigHolder {
}
/**
- * Get a rebalancer class instance
- * @return Rebalancer
- */
- public HelixRebalancer getRebalancer() {
- // cache the rebalancer to avoid loading and instantiating it excessively
- if (_rebalancer == null) {
- if (_config == null || _config.getRebalancerRef() == null) {
- return null;
- }
- _rebalancer = _config.getRebalancerRef().getRebalancer();
- }
- return _rebalancer;
- }
-
- /**
* Get the instantiated RebalancerConfig
* @param configClass specific class of the RebalancerConfig
* @return RebalancerConfig subclass instance, or null if conversion is not possible
http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java
deleted file mode 100644
index 3118b2a..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/ReplicatedRebalancerConfig.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.helix.controller.rebalancer.config;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Methods specifying a rebalancer config that allows replicas. For instance, a rebalancer config
- * with partitions may accept state model definitions that support multiple replicas per partition,
- * and it's possible that the policy is that each live participant in the system should have a
- * replica.
- */
-public interface ReplicatedRebalancerConfig extends RebalancerConfig {
- /**
- * Check if this resource should be assigned to any live participant
- * @return true if any live participant expected, false otherwise
- */
- public boolean anyLiveParticipant();
-
- /**
- * Get the number of replicas that each resource subunit should have
- * @return replica count
- */
- public int getReplicaCount();
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
deleted file mode 100644
index 60e30f4..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
+++ /dev/null
@@ -1,183 +0,0 @@
-package org.apache.helix.controller.rebalancer.config;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.StateModelDefinition;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-import com.google.common.collect.Maps;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * RebalancerConfig for SEMI_AUTO rebalancer mode. It indicates the preferred locations of each
- * partition replica. By default, it corresponds to {@link SemiAutoRebalancer}
- */
-public final class SemiAutoRebalancerConfig extends PartitionedRebalancerConfig {
- @JsonProperty("preferenceLists")
- private Map<PartitionId, List<ParticipantId>> _preferenceLists;
-
- /**
- * Instantiate a SemiAutoRebalancerConfig
- */
- public SemiAutoRebalancerConfig() {
- if (getClass().equals(SemiAutoRebalancerConfig.class)) {
- // only mark this as semi auto mode if this specifc config is used
- setRebalanceMode(RebalanceMode.SEMI_AUTO);
- } else {
- setRebalanceMode(RebalanceMode.USER_DEFINED);
- }
- setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
- _preferenceLists = Maps.newHashMap();
- }
-
- /**
- * Get the preference lists of all partitions of the resource
- * @return map of partition id to list of participant ids
- */
- public Map<PartitionId, List<ParticipantId>> getPreferenceLists() {
- return _preferenceLists;
- }
-
- /**
- * Set the preference lists of all partitions of the resource
- * @param preferenceLists
- */
- public void setPreferenceLists(Map<PartitionId, List<ParticipantId>> preferenceLists) {
- _preferenceLists = preferenceLists;
- }
-
- /**
- * Get the preference list of a partition
- * @param partitionId the partition to look up
- * @return list of participant ids
- */
- @JsonIgnore
- public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
- return _preferenceLists.get(partitionId);
- }
-
- /**
- * Generate preference lists based on a default cluster setup
- * @param stateModelDef the state model definition to follow
- * @param participantSet the set of participant ids to configure for
- */
- @Override
- @JsonIgnore
- public void generateDefaultConfiguration(StateModelDefinition stateModelDef,
- Set<ParticipantId> participantSet) {
- // compute default upper bounds
- Map<State, String> upperBounds = Maps.newHashMap();
- for (State state : stateModelDef.getTypedStatesPriorityList()) {
- upperBounds.put(state, stateModelDef.getNumParticipantsPerState(state));
- }
-
- // determine the current mapping
- Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
- for (PartitionId partitionId : getPartitionSet()) {
- List<ParticipantId> preferenceList = getPreferenceList(partitionId);
- if (preferenceList != null && !preferenceList.isEmpty()) {
- Set<ParticipantId> disabledParticipants = Collections.emptySet();
- Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
- Map<ParticipantId, State> initialMap =
- ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, participantSet,
- stateModelDef, preferenceList, emptyCurrentState, disabledParticipants, true);
- currentMapping.put(partitionId, initialMap);
- }
- }
-
- // determine the preference
- LinkedHashMap<State, Integer> stateCounts =
- ConstraintBasedAssignment.stateCount(upperBounds, stateModelDef, participantSet.size(),
- getReplicaCount());
- ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
- List<ParticipantId> participantList = new ArrayList<ParticipantId>(participantSet);
- List<PartitionId> partitionList = new ArrayList<PartitionId>(getPartitionSet());
- AutoRebalanceStrategy strategy =
- new AutoRebalanceStrategy(ResourceId.from(""), partitionList, stateCounts,
- getMaxPartitionsPerParticipant(), placementScheme);
- Map<String, List<String>> rawPreferenceLists =
- strategy.typedComputePartitionAssignment(participantList, currentMapping, participantList)
- .getListFields();
- Map<PartitionId, List<ParticipantId>> preferenceLists =
- Maps.newHashMap(IdealState.preferenceListsFromStringLists(rawPreferenceLists));
- setPreferenceLists(preferenceLists);
- }
-
- /**
- * Build a SemiAutoRebalancerConfig. By default, it corresponds to {@link SemiAutoRebalancer}
- */
- public static final class Builder extends PartitionedRebalancerConfig.AbstractBuilder<Builder> {
- private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
-
- /**
- * Instantiate for a resource
- * @param resourceId resource id
- */
- public Builder(ResourceId resourceId) {
- super(resourceId);
- super.rebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
- super.rebalanceMode(RebalanceMode.SEMI_AUTO);
- _preferenceLists = Maps.newHashMap();
- }
-
- /**
- * Add a preference list for a partition
- * @param partitionId partition to set
- * @param preferenceList ordered list of participants who can serve the partition
- * @return Builder
- */
- public Builder preferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
- _preferenceLists.put(partitionId, preferenceList);
- return self();
- }
-
- @Override
- protected Builder self() {
- return this;
- }
-
- @Override
- public SemiAutoRebalancerConfig build() {
- SemiAutoRebalancerConfig config = new SemiAutoRebalancerConfig();
- super.update(config);
- config.setPreferenceLists(_preferenceLists);
- return config;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 6f34953..f6f5d61 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -25,7 +25,6 @@ import java.util.Set;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Resource;
import org.apache.helix.api.State;
import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.id.ParticipantId;
@@ -193,51 +192,40 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
}
ResourceConfig resourceConfig = resourceMap.get(resourceId);
RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
- StateModelDefinition stateModelDef =
- stateModelDefs.get(rebalancerConfig.getStateModelDefId());
+ IdealState idealState = resourceConfig.getIdealState();
+ StateModelDefinition stateModelDef = stateModelDefs.get(idealState.getStateModelDefId());
ResourceAssignment resourceAssignment = null;
- if (rebalancerConfig != null) {
- // use a cached rebalancer if possible
- RebalancerRef ref = rebalancerConfig.getRebalancerRef();
- HelixRebalancer rebalancer = null;
- if (_rebalancerMap.containsKey(resourceId)) {
- HelixRebalancer candidateRebalancer = _rebalancerMap.get(resourceId);
- if (ref != null && candidateRebalancer.getClass().equals(ref.toString())) {
- rebalancer = candidateRebalancer;
- }
+ // use a cached rebalancer if possible
+ RebalancerRef ref = idealState.getRebalancerRef();
+ HelixRebalancer rebalancer = null;
+ if (_rebalancerMap.containsKey(resourceId)) {
+ HelixRebalancer candidateRebalancer = _rebalancerMap.get(resourceId);
+ if (ref != null && candidateRebalancer.getClass().equals(ref.toString())) {
+ rebalancer = candidateRebalancer;
}
+ }
- // otherwise instantiate a new one
- if (rebalancer == null) {
- if (ref != null) {
- rebalancer = ref.getRebalancer();
- }
- HelixManager manager = event.getAttribute("helixmanager");
- ControllerContextProvider provider =
- event.getAttribute(AttributeName.CONTEXT_PROVIDER.toString());
- if (rebalancer == null) {
- rebalancer = new FallbackRebalancer();
- }
- rebalancer.init(manager, provider);
- _rebalancerMap.put(resourceId, rebalancer);
+ // otherwise instantiate a new one
+ if (rebalancer == null) {
+ if (ref != null) {
+ rebalancer = ref.getRebalancer();
}
- ResourceAssignment currentAssignment = null;
- IdealState idealState;
- Resource resourceSnapshot = cluster.getResource(resourceId);
- if (resourceSnapshot != null) {
- currentAssignment = resourceSnapshot.getResourceAssignment();
- idealState = resourceSnapshot.getIdealState();
- } else {
- idealState = new IdealState(resourceId);
- }
- try {
-
- resourceAssignment =
- rebalancer.computeResourceMapping(idealState, rebalancerConfig, currentAssignment,
- cluster, currentStateOutput);
- } catch (Exception e) {
- LOG.error("Rebalancer for resource " + resourceId + " failed.", e);
+ HelixManager manager = event.getAttribute("helixmanager");
+ ControllerContextProvider provider =
+ event.getAttribute(AttributeName.CONTEXT_PROVIDER.toString());
+ if (rebalancer == null) {
+ rebalancer = new FallbackRebalancer();
}
+ rebalancer.init(manager, provider);
+ _rebalancerMap.put(resourceId, rebalancer);
+ }
+ ResourceAssignment currentAssignment = null;
+ try {
+ resourceAssignment =
+ rebalancer.computeResourceMapping(idealState, rebalancerConfig, currentAssignment,
+ cluster, currentStateOutput);
+ } catch (Exception e) {
+ LOG.error("Rebalancer for resource " + resourceId + " failed.", e);
}
if (resourceAssignment == null) {
resourceAssignment =
http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index deabb56..810a675 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -44,7 +44,6 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -93,7 +92,6 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
// if resource ideal state has bucket size, set it
// otherwise resource has been dropped, use bucket size from current state instead
ResourceConfig resource = resourceMap.get(resourceId);
- RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
SchedulerTaskConfig schedulerTaskConfig = resource.getSchedulerTaskConfig();
if (resource.getIdealState().getBucketSize() > 0) {
@@ -146,10 +144,8 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
// partitions are finished (COMPLETED or ERROR), update the status update of the original
// scheduler
// message, and then remove the partitions from the ideal state
- if (rebalancerConfig != null
- && rebalancerConfig.getStateModelDefId() != null
- && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
- StateModelDefId.SchedulerTaskQueue)) {
+ if (idealState != null && idealState.getStateModelDefId() != null
+ && idealState.getStateModelDefId().equalsIgnoreCase(StateModelDefId.SchedulerTaskQueue)) {
updateScheduledTaskStatus(resourceId, view, manager, schedulerTaskConfig);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
index 61da673..4ba249f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
@@ -39,7 +39,7 @@ import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.api.id.StateModelFactoryId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
@@ -75,12 +75,10 @@ public class MessageGenerationStage extends AbstractBaseStage {
for (ResourceId resourceId : resourceMap.keySet()) {
ResourceConfig resourceConfig = resourceMap.get(resourceId);
int bucketSize = 0;
- if (resourceConfig.getIdealState() != null) {
- bucketSize = resourceConfig.getIdealState().getBucketSize();
- }
+ bucketSize = resourceConfig.getIdealState().getBucketSize();
- RebalancerConfig rebalancerCfg = resourceConfig.getRebalancerConfig();
- StateModelDefinition stateModelDef = stateModelDefMap.get(rebalancerCfg.getStateModelDefId());
+ IdealState idealState = resourceConfig.getIdealState();
+ StateModelDefinition stateModelDef = stateModelDefMap.get(idealState.getStateModelDefId());
ResourceAssignment resourceAssignment =
bestPossibleStateOutput.getResourceAssignment(resourceId);
@@ -134,15 +132,14 @@ public class MessageGenerationStage extends AbstractBaseStage {
SessionId sessionId =
SessionId.from(cluster.getLiveParticipantMap().get(participantId).getLiveInstance()
.getSessionId());
- RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
Message message =
createMessage(manager, resourceId, subUnitId, participantId, currentState,
nextState, sessionId, StateModelDefId.from(stateModelDef.getId()),
- rebalancerConfig.getStateModelFactoryId(), bucketSize);
+ idealState.getStateModelFactoryId(), bucketSize);
// TODO refactor get/set timeout/inner-message
- if (rebalancerConfig != null
- && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
+ if (idealState != null
+ && idealState.getStateModelDefId().equalsIgnoreCase(
StateModelDefId.SchedulerTaskQueue)) {
if (resourceConfig.getSubUnitSet().size() > 0) {
// TODO refactor it -- we need a way to read in scheduler tasks a priori
http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 2408e29..0e4a694 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.helix.HelixConstants.StateModelToken;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
import org.apache.helix.api.Resource;
@@ -37,9 +38,7 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.ReplicatedRebalancerConfig;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -108,7 +107,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
for (ResourceId resourceId : resourceMap.keySet()) {
ResourceConfig resource = resourceMap.get(resourceId);
StateModelDefinition stateModelDef =
- stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
+ stateModelDefMap.get(resource.getIdealState().getStateModelDefId());
if (stateModelDef == null) {
LOG.info("resource: "
@@ -124,7 +123,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
// if configResource == null, the resource has been dropped
Map<State, Bounds> stateConstraints =
computeStateConstraints(stateModelDef,
- configResource == null ? null : configResource.getRebalancerConfig(), cluster);
+ configResource == null ? null : configResource.getIdealState(), cluster);
// TODO fix it
for (PartitionId partitionId : bestPossibleStateOutput.getResourceAssignment(resourceId)
@@ -270,10 +269,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
* @return
*/
private Map<State, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
- RebalancerConfig rebalancerConfig, Cluster cluster) {
- ReplicatedRebalancerConfig config =
- (rebalancerConfig != null) ? BasicRebalancerConfig.convert(rebalancerConfig,
- ReplicatedRebalancerConfig.class) : null;
+ IdealState idealState, Cluster cluster) {
Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
List<State> statePriorityList = stateModelDefinition.getTypedStatesPriorityList();
@@ -285,11 +281,12 @@ public class MessageSelectionStage extends AbstractBaseStage {
} else if ("R".equals(numInstancesPerState)) {
// idealState is null when resource has been dropped,
// R can't be evaluated and ignore state constraints
- if (config != null) {
- if (config.anyLiveParticipant()) {
+ if (idealState != null) {
+ String replicas = idealState.getReplicas();
+ if (replicas.equals(StateModelToken.ANY_LIVEINSTANCE.toString())) {
max = cluster.getLiveParticipantMap().size();
} else {
- max = config.getReplicaCount();
+ max = Integer.parseInt(replicas);
}
}
} else {
http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 1036b35..ccd8ae6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -19,22 +19,23 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
-import org.apache.helix.api.Partition;
import org.apache.helix.api.Resource;
+import org.apache.helix.api.State;
import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.StateModelFactoryId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
import org.apache.log4j.Logger;
/**
@@ -85,14 +86,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
* @throws StageException
*/
Map<ResourceId, ResourceConfig> getCurStateResourceCfgMap(Cluster cluster) throws StageException {
- Map<ResourceId, ResourceConfig.Builder> resCfgBuilderMap =
- new HashMap<ResourceId, ResourceConfig.Builder>();
-
- Map<ResourceId, PartitionedRebalancerConfig.Builder> rebCtxBuilderMap =
- new HashMap<ResourceId, PartitionedRebalancerConfig.Builder>();
-
- Map<ResourceId, Integer> bucketSizeMap = new HashMap<ResourceId, Integer>();
- Map<ResourceId, Boolean> batchModeMap = new HashMap<ResourceId, Boolean>();
+ Map<ResourceId, IdealState> idealStateMap = new HashMap<ResourceId, IdealState>();
for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
@@ -111,35 +105,28 @@ public class ResourceComputationStage extends AbstractBaseStage {
+ currentState.getResourceId());
}
- if (!resCfgBuilderMap.containsKey(resourceId)) {
- PartitionedRebalancerConfig.Builder rebCtxBuilder =
- new PartitionedRebalancerConfig.Builder(resourceId);
- rebCtxBuilder.stateModelDefId(currentState.getStateModelDefId());
- rebCtxBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
- .getStateModelFactoryName()));
- rebCtxBuilderMap.put(resourceId, rebCtxBuilder);
-
- ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
- resCfgBuilderMap.put(resourceId, resCfgBuilder);
- bucketSizeMap.put(resourceId, currentState.getBucketSize());
- batchModeMap.put(resourceId, currentState.getBatchMessageMode());
+ if (!idealStateMap.containsKey(resourceId)) {
+ IdealState idealState = new IdealState(resourceId);
+ idealState.setStateModelDefId(currentState.getStateModelDefId());
+ idealState.setStateModelFactoryName(currentState.getStateModelFactoryName());
+ idealState.setBucketSize(currentState.getBucketSize());
+ idealState.setBatchMessageMode(currentState.getBatchMessageMode());
+ idealStateMap.put(resourceId, idealState);
}
- PartitionedRebalancerConfig.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
+ IdealState idealState = idealStateMap.get(resourceId);
for (PartitionId partitionId : currentState.getTypedPartitionStateMap().keySet()) {
- rebCtxBuilder.addPartition(new Partition(partitionId));
+ idealState.setParticipantStateMap(partitionId, new HashMap<ParticipantId, State>());
+ idealState.setPreferenceList(partitionId, new ArrayList<ParticipantId>());
}
}
}
Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
- for (ResourceId resourceId : resCfgBuilderMap.keySet()) {
- ResourceConfig.Builder resCfgBuilder = resCfgBuilderMap.get(resourceId);
- PartitionedRebalancerConfig.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
- RebalancerConfig rebalancerConfig = rebCtxBuilder.build();
- resCfgBuilder.rebalancerConfig(rebalancerConfig);
- resCfgBuilder.idealState(PartitionedRebalancerConfig.rebalancerConfigToIdealState(
- rebalancerConfig, bucketSizeMap.get(resourceId), batchModeMap.get(resourceId)));
+ for (ResourceId resourceId : idealStateMap.keySet()) {
+ ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
+ IdealState idealState = idealStateMap.get(resourceId);
+ resCfgBuilder.idealState(idealState);
resCfgMap.put(resourceId, resCfgBuilder.build());
}
http://git-wip-us.apache.org/repos/asf/helix/blob/ee8ef6a7/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 88ec610..b0347ef 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -40,14 +40,11 @@ import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.api.id.StateModelFactoryId;
+import org.apache.helix.controller.rebalancer.CustomRebalancer;
+import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
import org.apache.helix.controller.rebalancer.HelixRebalancer;
import org.apache.helix.controller.rebalancer.RebalancerRef;
-import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
-import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
-import org.apache.helix.util.HelixUtil;
+import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
import org.apache.log4j.Logger;
import com.google.common.base.Enums;
@@ -217,47 +214,26 @@ public class IdealState extends HelixProperty {
* @return RebalancerRef
*/
public RebalancerRef getRebalancerRef() {
+ RebalancerRef ref = null;
String className = getRebalancerClassName();
if (className != null) {
- return RebalancerRef.from(getRebalancerClassName());
- }
- return null;
- }
-
- /**
- * Set the RebalancerConfig implementation class for this resource
- * @param clazz the class object
- */
- public void setRebalancerConfigClass(Class<? extends RebalancerConfig> clazz) {
- String className = clazz.getName();
- _record.setSimpleField(IdealStateProperty.REBALANCER_CONFIG_NAME.toString(), className);
- }
-
- /**
- * Get the class representing the rebalancer config of this resource
- * @return The rebalancer config class
- */
- public Class<? extends RebalancerConfig> getRebalancerConfigClass() {
- // try to extract the class from the persisted data
- String className = _record.getSimpleField(IdealStateProperty.REBALANCER_CONFIG_NAME.toString());
- if (className != null) {
- try {
- return HelixUtil.loadClass(getClass(), className).asSubclass(RebalancerConfig.class);
- } catch (ClassNotFoundException e) {
- logger.error(className + " is not a valid class");
+ ref = RebalancerRef.from(getRebalancerClassName());
+ } else {
+ switch (getRebalanceMode()) {
+ case FULL_AUTO:
+ ref = RebalancerRef.from(FullAutoRebalancer.class);
+ break;
+ case SEMI_AUTO:
+ ref = RebalancerRef.from(SemiAutoRebalancer.class);
+ break;
+ case CUSTOMIZED:
+ ref = RebalancerRef.from(CustomRebalancer.class);
+ break;
+ default:
+ break;
}
}
- // the fallback is to use the mode
- switch (getRebalanceMode()) {
- case FULL_AUTO:
- return FullAutoRebalancerConfig.class;
- case SEMI_AUTO:
- return SemiAutoRebalancerConfig.class;
- case CUSTOMIZED:
- return CustomRebalancerConfig.class;
- default:
- return PartitionedRebalancerConfig.class;
- }
+ return ref;
}
/**
@@ -333,13 +309,7 @@ public class IdealState extends HelixProperty {
case CUSTOMIZED:
return _record.getMapFields().keySet();
case USER_DEFINED:
- Class<? extends RebalancerConfig> configClass = getRebalancerConfigClass();
- if (configClass.equals(SemiAutoRebalancerConfig.class)
- || configClass.equals(FullAutoRebalancerConfig.class)) {
- return _record.getListFields().keySet();
- } else {
- return _record.getMapFields().keySet();
- }
+ return _record.getMapFields().keySet();
default:
logger.error("Invalid ideal state mode:" + getResourceName());
return Collections.emptySet();
@@ -414,18 +384,8 @@ public class IdealState extends HelixProperty {
* @return set of instance names
*/
public Set<String> getInstanceSet(String partitionName) {
- boolean useListFields = false;
RebalanceMode rebalanceMode = getRebalanceMode();
- if (rebalanceMode == RebalanceMode.USER_DEFINED) {
- Class<? extends RebalancerConfig> configClass = getRebalancerConfigClass();
- if (configClass.equals(SemiAutoRebalancerConfig.class)
- || configClass.equals(FullAutoRebalancerConfig.class)) {
- // override: if the user defined rebalancer expects auto-type inputs, use the list fields
- useListFields = true;
- }
- }
- if (useListFields || rebalanceMode == RebalanceMode.SEMI_AUTO
- || rebalanceMode == RebalanceMode.FULL_AUTO) {
+ if (rebalanceMode == RebalanceMode.SEMI_AUTO || rebalanceMode == RebalanceMode.FULL_AUTO) {
// get instances from list fields
List<String> prefList = _record.getListField(partitionName);
if (prefList != null) {
@@ -825,7 +785,6 @@ public class IdealState extends HelixProperty {
}
/**
- * <<<<<<< HEAD
* Get the non-Helix simple fields from this property and add them to a UserConfig
* @param userConfig the user config to update
*/