You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:17 UTC
[09/53] [abbrv] git commit: [HELIX-98] clean up setting constraint
api,
[HELIX-246] Refactor scheduler task config to comply with new rebalancer
config and fix related scheduler task tests
[HELIX-98] clean up setting constraint api, [HELIX-246] Refactor scheduler task config to comply with new rebalancer config and fix related scheduler task tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/243f2adf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/243f2adf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/243f2adf
Branch: refs/heads/master
Commit: 243f2adfa716bc704c3ad31cb4c66256f0b8fd5e
Parents: fb96a13
Author: zzhang <zz...@apache.org>
Authored: Wed Sep 18 13:10:51 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Nov 6 13:17:34 2013 -0800
----------------------------------------------------------------------
.../main/java/org/apache/helix/api/Cluster.java | 40 ++-
.../org/apache/helix/api/ClusterAccessor.java | 2 +-
.../org/apache/helix/api/ClusterConfig.java | 245 ++++++++++++++++++-
.../java/org/apache/helix/api/ConstraintId.java | 74 ++++++
.../helix/api/CustomRebalancerConfig.java | 10 +
.../helix/api/FullAutoRebalancerConfig.java | 8 +
.../apache/helix/api/ParticipantAccessor.java | 5 +-
.../java/org/apache/helix/api/PartitionId.java | 8 +
.../org/apache/helix/api/RebalancerConfig.java | 24 ++
.../main/java/org/apache/helix/api/Scope.java | 31 +++
.../helix/api/SemiAutoRebalancerConfig.java | 10 +
.../helix/api/UserDefinedRebalancerConfig.java | 9 +
.../controller/rebalancer/AutoRebalancer.java | 1 +
.../controller/rebalancer/CustomRebalancer.java | 4 +-
.../rebalancer/NewAutoRebalancer.java | 25 +-
.../rebalancer/NewSemiAutoRebalancer.java | 6 +-
.../helix/controller/rebalancer/Rebalancer.java | 1 +
.../rebalancer/SemiAutoRebalancer.java | 1 +
.../util/ConstraintBasedAssignment.java | 1 +
.../util/NewConstraintBasedAssignment.java | 29 ++-
.../stages/BestPossibleStateCalcStage.java | 4 +-
.../stages/NewBestPossibleStateCalcStage.java | 3 +-
.../stages/NewResourceComputationStage.java | 160 ++++++++----
.../stages/RebalanceIdealStateStage.java | 4 +-
.../stages/StatsAggregationStage.java | 2 +-
.../strategy/EspressoRelayStrategy.java | 3 +-
.../DefaultSchedulerMessageHandlerFactory.java | 6 +-
.../apache/helix/manager/zk/ZKHelixAdmin.java | 39 +--
.../apache/helix/model/ClusterConstraints.java | 38 ++-
.../org/apache/helix/model/ExternalView.java | 3 +
.../java/org/apache/helix/model/IdealState.java | 29 +--
.../builder/ClusterConstraintsBuilder.java | 11 +-
.../helix/model/builder/IdealStateBuilder.java | 6 +-
.../builder/MessageConstraintItemBuilder.java | 107 ++++++++
.../builder/StateConstraintItemBuilder.java | 92 +++++++
.../monitoring/mbeans/ResourceMonitor.java | 18 +-
.../participant/HelixCustomCodeRunner.java | 10 +-
.../org/apache/helix/tools/ClusterSetup.java | 10 +-
.../org/apache/helix/util/RebalanceUtil.java | 25 +-
.../java/org/apache/helix/TestZKCallback.java | 2 +-
.../java/org/apache/helix/ZkUnitTestBase.java | 25 +-
.../helix/controller/stages/BaseStageTest.java | 2 +-
.../TestBestPossibleCalcStageCompatibility.java | 9 +-
.../stages/TestCompatibilityCheckStage.java | 3 +-
.../stages/TestResourceComputationStage.java | 7 +-
.../strategy/TestNewAutoRebalanceStrategy.java | 17 +-
.../TestAddStateModelFactoryAfterConnect.java | 3 +-
.../helix/integration/TestAutoRebalance.java | 15 +-
.../TestAutoRebalancePartitionLimit.java | 13 +-
.../TestCustomizedIdealStateRebalancer.java | 16 +-
.../integration/TestInvalidAutoIdealState.java | 3 +-
.../helix/integration/TestRenamePartition.java | 3 +-
.../helix/integration/TestSchedulerMessage.java | 9 +-
.../integration/TestStateTransitionTimeout.java | 17 +-
.../helix/integration/TestZkReconnect.java | 3 +-
.../helix/manager/zk/TestZNRecordSizeLimit.java | 17 +-
.../helix/manager/zk/TestZkHelixAdmin.java | 7 +-
.../org/apache/helix/model/TestIdealState.java | 25 +-
.../apache/helix/tools/TestHelixAdminCli.java | 32 +--
59 files changed, 1068 insertions(+), 264 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index 005ae0d..56a84e9 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.Transition;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
@@ -99,8 +100,10 @@ public class Cluster {
}
});
_config =
- new ClusterConfig(id, resourceConfigMap, participantConfigMap, constraintMap,
- stateModelMap, userConfig, isPaused);
+ new ClusterConfig.Builder(id).addResources(resourceConfigMap.values())
+ .addParticipants(participantConfigMap.values()).addConstraints(constraintMap.values())
+ .addStateModelDefinitions(stateModelMap.values()).setPausedStatus(isPaused)
+ .userConfig(userConfig).build();
_resourceMap = ImmutableMap.copyOf(resourceMap);
@@ -222,10 +225,43 @@ public class Cluster {
}
/**
+ * Get the maximum number of participants that can be in a state
+ * @param scope the scope for the bound
+ * @param stateModelDefId the state model of the state
+ * @param state the constrained state
+ * @return The upper bound, which can be "-1" if unspecified, a numerical upper bound, "R" for
+ * number of replicas, or "N" for number of participants
+ */
+ public String getStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ State state) {
+ return _config.getStateUpperBoundConstraint(scope, stateModelDefId, state);
+ }
+
+ /**
+ * Get the limit of simultaneous execution of a transition
+ * @param scope the scope under which the transition is constrained
+ * @param stateModelDefId the state model of which the transition is a part
+ * @param transition the constrained transition
+ * @return the limit, or Integer.MAX_VALUE if there is no limit
+ */
+ public int getTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ Transition transition) {
+ return _config.getTransitionConstraint(scope, stateModelDefId, transition);
+ }
+
+ /**
* Check the pasued status of the cluster
* @return true if paused, false otherwise
*/
public boolean isPaused() {
return _config.isPaused();
}
+
+ /**
+ * Get the ClusterConfig specifying this cluster
+ * @return ClusterConfig
+ */
+ public ClusterConfig getConfig() {
+ return _config;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
index a46edad..e30825c 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
@@ -66,7 +66,7 @@ public class ClusterAccessor {
public boolean createCluster(ClusterConfig cluster) {
boolean created = _accessor.createProperty(_keyBuilder.cluster(), null);
if (!created) {
- LOG.warn("Cluster already created. Aborting.");
+ // LOG.warn("Cluster already created. Aborting.");
// return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
index 27da37d..ba1e78e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
@@ -3,12 +3,21 @@ package org.apache.helix.api;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.ClusterConstraints.ConstraintValue;
+import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.Transition;
+import org.apache.helix.model.builder.ConstraintItemBuilder;
+import org.apache.log4j.Logger;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -33,6 +42,8 @@ import com.google.common.collect.ImmutableMap;
* Configuration properties of a cluster
*/
public class ClusterConfig {
+ private static final Logger LOG = Logger.getLogger(ClusterConfig.class);
+
private final ClusterId _id;
private final Map<ResourceId, ResourceConfig> _resourceMap;
private final Map<ParticipantId, ParticipantConfig> _participantMap;
@@ -51,7 +62,7 @@ public class ClusterConfig {
* @param userConfig user-defined cluster properties
* @param isPaused true if paused, false if active
*/
- public ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
+ private ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
Map<ParticipantId, ParticipantConfig> participantMap,
Map<ConstraintType, ClusterConstraints> constraintMap,
Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
@@ -90,6 +101,105 @@ public class ClusterConfig {
}
/**
+ * Get the maximum number of participants that can be in a state
+ * @param scope the scope for the bound
+ * @param stateModelDefId the state model of the state
+ * @param state the constrained state
+ * @return The upper bound, which can be "-1" if unspecified, a numerical upper bound, "R" for
+ * number of replicas, or "N" for number of participants
+ */
+ public String getStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ State state) {
+ // set up attributes to match based on the scope
+ ClusterConstraints stateConstraints = getConstraintMap().get(ConstraintType.STATE_CONSTRAINT);
+ Map<ConstraintAttribute, String> matchAttributes = Maps.newHashMap();
+ matchAttributes.put(ConstraintAttribute.STATE, state.toString());
+ matchAttributes.put(ConstraintAttribute.STATE_MODEL, stateModelDefId.toString());
+ switch (scope.getType()) {
+ case CLUSTER:
+ // cluster is implicit
+ break;
+ case RESOURCE:
+ matchAttributes.put(ConstraintAttribute.RESOURCE, scope.getScopedId().stringify());
+ break;
+ default:
+ LOG.error("Unsupported scope for state constraint: " + scope);
+ return "-1";
+ }
+ Set<ConstraintItem> matches = stateConstraints.match(matchAttributes);
+ int value = -1;
+ for (ConstraintItem item : matches) {
+ // match: if an R or N is found, always choose that one
+ // otherwise, take the minimum of the counts specified in the constraints
+ String constraintValue = item.getConstraintValue();
+ if (constraintValue != null) {
+ if (constraintValue.equals(ConstraintValue.N.toString())
+ || constraintValue.equals(ConstraintValue.R.toString())) {
+ return constraintValue;
+ } else {
+ try {
+ int current = Integer.parseInt(constraintValue);
+ if (value == -1 || current < value) {
+ value = current;
+ }
+ } catch (NumberFormatException e) {
+ LOG.error("Invalid state upper bound: " + constraintValue);
+ }
+ }
+ }
+ }
+ return Integer.toString(value);
+ }
+
+ /**
+ * Get the limit of simultaneous execution of a transition
+ * @param scope the scope under which the transition is constrained
+ * @param stateModelDefId the state model of which the transition is a part
+ * @param transition the constrained transition
+ * @return the limit, or Integer.MAX_VALUE if there is no limit
+ */
+ public int getTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ Transition transition) {
+ // set up attributes to match based on the scope
+ ClusterConstraints transitionConstraints =
+ getConstraintMap().get(ConstraintType.MESSAGE_CONSTRAINT);
+ Map<ConstraintAttribute, String> matchAttributes = Maps.newHashMap();
+ matchAttributes.put(ConstraintAttribute.STATE_MODEL, stateModelDefId.toString());
+ matchAttributes.put(ConstraintAttribute.MESSAGE_TYPE, MessageType.STATE_TRANSITION.toString());
+ matchAttributes.put(ConstraintAttribute.TRANSITION, transition.toString());
+ switch (scope.getType()) {
+ case CLUSTER:
+ // cluster is implicit
+ break;
+ case RESOURCE:
+ matchAttributes.put(ConstraintAttribute.RESOURCE, scope.getScopedId().stringify());
+ break;
+ case PARTICIPANT:
+ matchAttributes.put(ConstraintAttribute.INSTANCE, scope.getScopedId().stringify());
+ break;
+ default:
+ LOG.error("Unsupported scope for transition constraints: " + scope);
+ return Integer.MAX_VALUE;
+ }
+ Set<ConstraintItem> matches = transitionConstraints.match(matchAttributes);
+ int value = Integer.MAX_VALUE;
+ for (ConstraintItem item : matches) {
+ String constraintValue = item.getConstraintValue();
+ if (constraintValue != null) {
+ try {
+ int current = Integer.parseInt(constraintValue);
+ if (current < value) {
+ value = current;
+ }
+ } catch (NumberFormatException e) {
+ LOG.error("Invalid in-flight transition cap: " + constraintValue);
+ }
+ }
+ }
+ return value;
+ }
+
+ /**
* Get participants of the cluster
* @return a map of participant id to participant, or empty map if none
*/
@@ -197,7 +307,11 @@ public class ClusterConfig {
* @return Builder
*/
public Builder addConstraint(ClusterConstraints constraint) {
- _constraintMap.put(constraint.getType(), constraint);
+ ClusterConstraints existConstraints = getConstraintsInstance(constraint.getType());
+ for (ConstraintId constraintId : constraint.getConstraintItems().keySet()) {
+ existConstraints
+ .addConstraintItem(constraintId, constraint.getConstraintItem(constraintId));
+ }
return this;
}
@@ -214,12 +328,121 @@ public class ClusterConfig {
}
/**
- * Add a constraint to the cluster
- * @param constraint cluster constraint of a specific type
+ * Add a constraint on the maximum number of in-flight transitions of a certain type
+ * @param scope scope of the constraint
+ * @param constraintId unique constraint identifier
+ * @param stateModelDefId identifies the state model containing the transition
+ * @param transition the transition to constrain
+ * @param maxInFlightTransitions number of allowed in-flight transitions in the scope
+ * @return Builder
+ */
+ public Builder addTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ Transition transition, int maxInFlightTransitions) {
+ Map<String, String> attributes = Maps.newHashMap();
+ attributes.put(ConstraintAttribute.MESSAGE_TYPE.toString(),
+ MessageType.STATE_TRANSITION.toString());
+ attributes.put(ConstraintAttribute.CONSTRAINT_VALUE.toString(),
+ Integer.toString(maxInFlightTransitions));
+ attributes.put(ConstraintAttribute.TRANSITION.toString(), transition.toString());
+ attributes.put(ConstraintAttribute.STATE_MODEL.toString(), stateModelDefId.stringify());
+ switch (scope.getType()) {
+ case CLUSTER:
+ // cluster is implicit
+ break;
+ case RESOURCE:
+ attributes.put(ConstraintAttribute.RESOURCE.toString(), scope.getScopedId().stringify());
+ break;
+ case PARTICIPANT:
+ attributes.put(ConstraintAttribute.INSTANCE.toString(), scope.getScopedId().stringify());
+ break;
+ default:
+ LOG.error("Unsupported scope for adding a transition constraint: " + scope);
+ return this;
+ }
+ ConstraintItem item = new ConstraintItemBuilder().addConstraintAttributes(attributes).build();
+ ClusterConstraints constraints = getConstraintsInstance(ConstraintType.MESSAGE_CONSTRAINT);
+ constraints.addConstraintItem(ConstraintId.from(scope, stateModelDefId, transition), item);
+ return this;
+ }
+
+ /**
+ * Add a state upper bound constraint
+ * @param scope scope under which the constraint is valid
+ * @param stateModelDefId identifier of the state model that owns the state
+ * @param state the state to constrain
+ * @param upperBound maximum number of replicas per partition in the state
+ * @return
+ */
+ public Builder addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ State state, int upperBound) {
+ return addStateUpperBoundConstraint(scope, stateModelDefId, state,
+ Integer.toString(upperBound));
+ }
+
+ /**
+ * Add a state upper bound constraint
+ * @param scope scope under which the constraint is valid
+ * @param stateModelDefId identifier of the state model that owns the state
+ * @param state the state to constrain
+ * @param dynamicUpperBound the upper bound of replicas per partition in the state, can be a
+ * number, or the currently supported special bound values:<br />
+ * "R" - Refers to the number of replicas specified during resource
+ * creation. This allows having different replication factor for each
+ * resource without having to create a different state machine. <br />
+ * "N" - Refers to all nodes in the cluster. Useful for resources that need
+ * to exist on all nodes. This way one can add/remove nodes without having
+ * the change the bounds.
+ * @return Builder
+ */
+ public Builder addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ State state, String dynamicUpperBound) {
+ Map<String, String> attributes = Maps.newHashMap();
+ attributes.put(ConstraintAttribute.STATE.toString(), state.toString());
+ attributes.put(ConstraintAttribute.STATE_MODEL.toString(), stateModelDefId.stringify());
+ attributes.put(ConstraintAttribute.CONSTRAINT_VALUE.toString(), dynamicUpperBound);
+ switch (scope.getType()) {
+ case CLUSTER:
+ // cluster is implicit
+ break;
+ case RESOURCE:
+ attributes.put(ConstraintAttribute.RESOURCE.toString(), scope.getScopedId().stringify());
+ break;
+ default:
+ LOG.error("Unsupported scope for adding a state constraint: " + scope);
+ return this;
+ }
+ ConstraintItem item = new ConstraintItemBuilder().addConstraintAttributes(attributes).build();
+ ClusterConstraints constraints = getConstraintsInstance(ConstraintType.STATE_CONSTRAINT);
+ constraints.addConstraintItem(ConstraintId.from(scope, stateModelDefId, state), item);
+ return this;
+ }
+
+ /**
+ * Add a state model definition to the cluster
+ * @param stateModelDef state model definition of the cluster
* @return Builder
*/
public Builder addStateModelDefinition(StateModelDefinition stateModelDef) {
_stateModelMap.put(stateModelDef.getStateModelDefId(), stateModelDef);
+ // add state constraints from the state model definition
+ for (State state : stateModelDef.getStatesPriorityList()) {
+ if (!stateModelDef.getNumParticipantsPerState(state).equals("-1")) {
+ addStateUpperBoundConstraint(Scope.cluster(_id), stateModelDef.getStateModelDefId(),
+ state, stateModelDef.getNumParticipantsPerState(state));
+ }
+ }
+ return this;
+ }
+
+ /**
+ * Add multiple state model definitions
+ * @param stateModelDefs collection of state model definitions for the cluster
+ * @return Builder
+ */
+ public Builder addStateModelDefinitions(Collection<StateModelDefinition> stateModelDefs) {
+ for (StateModelDefinition stateModelDef : stateModelDefs) {
+ addStateModelDefinition(stateModelDef);
+ }
return this;
}
@@ -251,5 +474,19 @@ public class ClusterConfig {
return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _stateModelMap,
_userConfig, _isPaused);
}
+
+ /**
+ * Get a valid instance of ClusterConstraints for a type
+ * @param type the type
+ * @return ClusterConstraints
+ */
+ private ClusterConstraints getConstraintsInstance(ConstraintType type) {
+ ClusterConstraints constraints = _constraintMap.get(type);
+ if (constraints == null) {
+ constraints = new ClusterConstraints(type);
+ _constraintMap.put(type, constraints);
+ }
+ return constraints;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/ConstraintId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ConstraintId.java b/helix-core/src/main/java/org/apache/helix/api/ConstraintId.java
new file mode 100644
index 0000000..85c9430
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/ConstraintId.java
@@ -0,0 +1,74 @@
+package org.apache.helix.api;
+
+import org.apache.helix.model.Transition;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Identifies a constraint item on the cluster
+ */
+public class ConstraintId extends Id {
+ private final String _constraintId;
+
+ /**
+ * Create a constraint id
+ * @param constraintId string representing the constraint id
+ */
+ private ConstraintId(String constraintId) {
+ _constraintId = constraintId;
+ }
+
+ @Override
+ public String stringify() {
+ return _constraintId;
+ }
+
+ /**
+ * Get a constraint id from a string
+ * @param constraintId string representing the constraint id
+ * @return ConstraintId
+ */
+ public static ConstraintId from(String constraintId) {
+ return new ConstraintId(constraintId);
+ }
+
+ /**
+ * Get a state constraint id based on the state model definition and state
+ * @param scope the scope of the constraint
+ * @param stateModelDefId the state model
+ * @param state the constrained state
+ * @return ConstraintId
+ */
+ public static ConstraintId from(Scope<?> scope, StateModelDefId stateModelDefId, State state) {
+ return new ConstraintId(scope + "|" + stateModelDefId + "|" + state);
+ }
+
+ /**
+ * Get a state constraint id based on the state model definition and transition
+ * @param scope the scope of the constraint
+ * @param stateModelDefId the state model
+ * @param transition the constrained transition
+ * @return ConstraintId
+ */
+ public static ConstraintId from(Scope<?> scope, StateModelDefId stateModelDefId,
+ Transition transition) {
+ return new ConstraintId(scope + "|" + stateModelDefId + "|" + transition);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/CustomRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/CustomRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/CustomRebalancerConfig.java
index 97d4a45..a5f2bbe 100644
--- a/helix-core/src/main/java/org/apache/helix/api/CustomRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/CustomRebalancerConfig.java
@@ -112,6 +112,16 @@ public final class CustomRebalancerConfig extends RebalancerConfig {
_preferenceMaps = new HashMap<PartitionId, Map<ParticipantId, State>>();
}
+ /**
+ * Construct builder using an existing custom rebalancer config
+ * @param config
+ */
+ public Builder(CustomRebalancerConfig config) {
+ super(config);
+ _preferenceMaps = new HashMap<PartitionId, Map<ParticipantId, State>>();
+ _preferenceMaps.putAll(config.getPreferenceMaps());
+ }
+
/**
* Add a preference map of a partition
* @param partitionId the partition to set
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/FullAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/FullAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/FullAutoRebalancerConfig.java
index 3482d16..599d20d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/FullAutoRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/FullAutoRebalancerConfig.java
@@ -67,6 +67,14 @@ public final class FullAutoRebalancerConfig extends RebalancerConfig {
super(resourceId);
}
+ /**
+ * Construct a builder using an existing full-auto rebalancer config
+ * @param config
+ */
+ public Builder(FullAutoRebalancerConfig config) {
+ super(config);
+ }
+
@Override
public FullAutoRebalancerConfig build() {
if (_partitionMap.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
index 6c1be9e..46895d3 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
@@ -170,11 +170,10 @@ public class ParticipantAccessor {
} else {
// check partitions exist. warn if not
for (PartitionId partitionId : partitionIdSet) {
- String partitionName = partitionId.stringify();
if ((idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO && idealState
- .getPreferenceList(partitionName) == null)
+ .getPreferenceList(partitionId) == null)
|| (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED && idealState
- .getInstanceStateMap(partitionName) == null)) {
+ .getParticipantStateMap(partitionId) == null)) {
LOG.warn("Cluster: " + _clusterId + ", resource: " + resourceId + ", partition: "
+ partitionId + ", partition does NOT exist in ideal state");
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/PartitionId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/PartitionId.java b/helix-core/src/main/java/org/apache/helix/api/PartitionId.java
index e72f1b3..757406d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/PartitionId.java
+++ b/helix-core/src/main/java/org/apache/helix/api/PartitionId.java
@@ -28,6 +28,14 @@ public class PartitionId extends Id {
_partitionName = partitionName;
}
+ /**
+ * Get the id of the resource containing this partition
+ * @return ResourceId
+ */
+ public ResourceId getResourceId() {
+ return _resourceId;
+ }
+
@Override
public String stringify() {
// check in case the partition name is instantiated incorrectly
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
index 2f531e0..d377af8 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
@@ -376,6 +376,22 @@ public class RebalancerConfig extends NamespacedConfig {
}
/**
+ * Construct a builder from an existing rebalancer config
+ * @param config
+ */
+ public Builder(RebalancerConfig config) {
+ _resourceId = config.getResourceId();
+ _partitionMap = new TreeMap<PartitionId, Partition>();
+ _partitionMap.putAll(config.getPartitionMap());
+ _stateModelDefId = config.getStateModelDefId();
+ _stateModelFactoryId = config.getStateModelFactoryId();
+ _anyLiveParticipant = config.canAssignAnyLiveParticipant();
+ _replicaCount = config.getReplicaCount();
+ _maxPartitionsPerParticipant = config.getMaxPartitionsPerParticipant();
+ _participantGroupTag = config.getParticipantGroupTag();
+ }
+
+ /**
* Set the state model definition
* @param stateModelDefId state model identifier
* @return Builder
@@ -513,6 +529,14 @@ public class RebalancerConfig extends NamespacedConfig {
}
/**
+ * Construct with an existing rebalancer config
+ * @param config
+ */
+ public SimpleBuilder(RebalancerConfig config) {
+ super(config);
+ }
+
+ /**
* Build a rebalancer config
* @return
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/Scope.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Scope.java b/helix-core/src/main/java/org/apache/helix/api/Scope.java
index e887f98..4bff194 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Scope.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Scope.java
@@ -24,6 +24,13 @@ package org.apache.helix.api;
* of cluster, participant, partition, or resource, but not more than one of these at any time.
*/
public class Scope<T extends Id> {
+ public enum ScopeType {
+ CLUSTER,
+ PARTICIPANT,
+ PARTITION,
+ RESOURCE
+ }
+
private final T _id;
/**
@@ -42,6 +49,30 @@ public class Scope<T extends Id> {
return _id;
}
+ @Override
+ public String toString() {
+ return getType() + "{" + getScopedId() + "}";
+ }
+
+ /**
+ * Get the Helix entity type that this scope covers
+ * @return scope type
+ */
+ public ScopeType getType() {
+ Class<?> idClass = _id.getClass();
+ if (idClass == ClusterId.class) {
+ return ScopeType.CLUSTER;
+ } else if (idClass == ParticipantId.class) {
+ return ScopeType.PARTICIPANT;
+ } else if (idClass == PartitionId.class) {
+ return ScopeType.PARTITION;
+ } else if (idClass == ResourceId.class) {
+ return ScopeType.RESOURCE;
+ } else {
+ return null;
+ }
+ }
+
/**
* Get a cluster scope
* @param clusterId the id of the cluster that is scoped
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/SemiAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/SemiAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/SemiAutoRebalancerConfig.java
index a5284a8..409769e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/SemiAutoRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/SemiAutoRebalancerConfig.java
@@ -116,6 +116,16 @@ public final class SemiAutoRebalancerConfig extends RebalancerConfig {
}
/**
+ * Construct a builder from an existing semi-auto rebalancer config
+ * @param config
+ */
+ public Builder(SemiAutoRebalancerConfig config) {
+ super(config);
+ _preferenceLists = new HashMap<PartitionId, List<ParticipantId>>();
+ _preferenceLists.putAll(config.getPreferenceLists());
+ }
+
+ /**
* Add a preference list of a partition
* @param partitionId the partition to set
* @param preferenceList list of participant ids, most preferred first
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java
index 345c80b..d9517a9 100644
--- a/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/UserDefinedRebalancerConfig.java
@@ -88,6 +88,15 @@ public final class UserDefinedRebalancerConfig extends RebalancerConfig {
super(resourceId);
}
+ /**
+ * Construct a builder using an existing user-defined rebalancer config
+ * @param config
+ */
+ public Builder(UserDefinedRebalancerConfig config) {
+ super(config);
+ _rebalancerRef = config.getRebalancerRef();
+ }
+
public Builder rebalancerRef(RebalancerRef rebalancerRef) {
_rebalancerRef = rebalancerRef;
return this;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 32b4e92..1c69dfe 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -56,6 +56,7 @@ import org.apache.log4j.Logger;
* The output is a preference list and a mapping based on that preference list, i.e. partition p
* has a replica on node k with state s.
*/
+@Deprecated
public class AutoRebalancer implements Rebalancer {
// These should be final, but are initialized in init rather than a constructor
private HelixManager _manager;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 689561b..709b61e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -46,6 +46,7 @@ import org.apache.log4j.Logger;
* The output is a verified mapping based on that preference list, i.e. partition p has a replica
* on node k with state s, where s may be a dropped or error state if necessary.
*/
+@Deprecated
public class CustomRebalancer implements Rebalancer {
private static final Logger LOG = Logger.getLogger(CustomRebalancer.class);
@@ -70,7 +71,8 @@ public class CustomRebalancer implements Rebalancer {
Set<String> disabledInstancesForPartition =
clusterData.getDisabledInstancesForPartition(partition.toString());
Map<String, String> idealStateMap =
- currentIdealState.getInstanceStateMap(partition.getPartitionName());
+ IdealState.stringMapFromParticipantStateMap(currentIdealState
+ .getParticipantStateMap(PartitionId.from(partition.getPartitionName())));
Map<String, String> bestStateForPartition =
computeCustomizedBestStateForPartition(clusterData, stateModelDef, idealStateMap,
currentStateMap, disabledInstancesForPartition);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
index 8212b7f..1972f03 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewAutoRebalancer.java
@@ -35,7 +35,6 @@ import org.apache.helix.api.Participant;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.State;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
@@ -82,13 +81,23 @@ public class NewAutoRebalancer implements NewRebalancer<FullAutoRebalancerConfig
replicas = config.getReplicaCount();
}
- LinkedHashMap<String, Integer> stateCountMap =
- ConstraintBasedAssignment.stateCount(stateModelDef, liveParticipants.size(), replicas);
+ // count how many replicas should be in each state
+ LinkedHashMap<State, Integer> stateCountMap =
+ NewConstraintBasedAssignment.stateCount(cluster.getConfig(), config.getResourceId(),
+ stateModelDef, liveParticipants.size(), replicas);
+ LinkedHashMap<String, Integer> rawStateCountMap = new LinkedHashMap<String, Integer>();
+ for (State state : stateCountMap.keySet()) {
+ rawStateCountMap.put(state.toString(), stateCountMap.get(state));
+ }
+
+ // get the participant lists
List<ParticipantId> liveParticipantList =
new ArrayList<ParticipantId>(liveParticipants.keySet());
List<ParticipantId> allParticipantList =
new ArrayList<ParticipantId>(cluster.getParticipantMap().keySet());
List<String> liveNodes = Lists.transform(liveParticipantList, Functions.toStringFunction());
+
+ // compute the current mapping from the current state
Map<PartitionId, Map<ParticipantId, State>> currentMapping =
currentMapping(config, currentState, stateCountMap);
@@ -109,9 +118,9 @@ public class NewAutoRebalancer implements NewRebalancer<FullAutoRebalancerConfig
liveNodes = new ArrayList<String>(taggedNodes);
}
+ // determine which nodes the replicas should live on
List<String> allNodes = Lists.transform(allParticipantList, Functions.toStringFunction());
int maxPartition = config.getMaxPartitionsPerParticipant();
-
if (LOG.isInfoEnabled()) {
LOG.info("currentMapping: " + currentMapping);
LOG.info("stateCountMap: " + stateCountMap);
@@ -122,7 +131,7 @@ public class NewAutoRebalancer implements NewRebalancer<FullAutoRebalancerConfig
ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
_algorithm =
new AutoRebalanceStrategy(config.getResourceId().stringify(), partitionNames,
- stateCountMap, maxPartition, placementScheme);
+ rawStateCountMap, maxPartition, placementScheme);
ZNRecord newMapping =
_algorithm.computePartitionAssignment(liveNodes,
ResourceAssignment.stringMapsFromReplicaMaps(currentMapping), allNodes);
@@ -153,8 +162,8 @@ public class NewAutoRebalancer implements NewRebalancer<FullAutoRebalancerConfig
preferenceList =
NewConstraintBasedAssignment.getPreferenceList(cluster, partition, preferenceList);
Map<ParticipantId, State> bestStateForPartition =
- NewConstraintBasedAssignment.computeAutoBestStateForPartition(liveParticipants,
- stateModelDef, preferenceList,
+ NewConstraintBasedAssignment.computeAutoBestStateForPartition(cluster.getConfig(),
+ config.getResourceId(), liveParticipants, stateModelDef, preferenceList,
currentState.getCurrentStateMap(config.getResourceId(), partition),
disabledParticipantsForPartition);
partitionMapping.addReplicaMap(partition, bestStateForPartition);
@@ -164,7 +173,7 @@ public class NewAutoRebalancer implements NewRebalancer<FullAutoRebalancerConfig
private Map<PartitionId, Map<ParticipantId, State>> currentMapping(
FullAutoRebalancerConfig config, ResourceCurrentState currentStateOutput,
- Map<String, Integer> stateCountMap) {
+ Map<State, Integer> stateCountMap) {
Map<PartitionId, Map<ParticipantId, State>> map =
new HashMap<PartitionId, Map<ParticipantId, State>>();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
index cb061ab..3153623 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/NewSemiAutoRebalancer.java
@@ -66,9 +66,9 @@ public class NewSemiAutoRebalancer implements NewRebalancer<SemiAutoRebalancerCo
NewConstraintBasedAssignment.getPreferenceList(cluster, partition,
config.getPreferenceList(partition));
Map<ParticipantId, State> bestStateForPartition =
- NewConstraintBasedAssignment.computeAutoBestStateForPartition(
- cluster.getLiveParticipantMap(), stateModelDef, preferenceList, currentStateMap,
- disabledInstancesForPartition);
+ NewConstraintBasedAssignment.computeAutoBestStateForPartition(cluster.getConfig(),
+ config.getResourceId(), cluster.getLiveParticipantMap(), stateModelDef,
+ preferenceList, currentStateMap, disabledInstancesForPartition);
partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
return partitionMapping;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
index e4f165b..5dd399f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
@@ -31,6 +31,7 @@ import org.apache.helix.model.ResourceAssignment;
* This will be invoked on all changes that happen in the cluster.<br/>
* Simply return the newIdealState for a resource in this method.<br/>
*/
+@Deprecated
public interface Rebalancer {
/**
* Initialize the rebalancer with a HelixManager if necessary
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index d525ea3..433b4a7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -45,6 +45,7 @@ import org.apache.log4j.Logger;
* The output is a mapping based on that preference list, i.e. partition p has a replica on node k
* with state s.
*/
+@Deprecated
public class SemiAutoRebalancer implements Rebalancer {
private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index ee7524f..dce0a07 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -41,6 +41,7 @@ import org.apache.log4j.Logger;
* Collection of functions that will compute the best possible states given the live instances and
* an ideal state.
*/
+@Deprecated
public class ConstraintBasedAssignment {
private static Logger logger = Logger.getLogger(ConstraintBasedAssignment.class);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
index 69d9fcf..603edd0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
@@ -31,9 +31,12 @@ import java.util.Set;
import org.apache.helix.HelixConstants.StateModelToken;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.Cluster;
+import org.apache.helix.api.ClusterConfig;
import org.apache.helix.api.Participant;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.Scope;
import org.apache.helix.api.State;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -96,10 +99,10 @@ public class NewConstraintBasedAssignment {
* @param disabledParticipantsForPartition
* @return
*/
- public static Map<ParticipantId, State> computeAutoBestStateForPartition(
- Map<ParticipantId, Participant> liveParticipantMap, StateModelDefinition stateModelDef,
- List<ParticipantId> participantPreferenceList, Map<ParticipantId, State> currentStateMap,
- Set<ParticipantId> disabledParticipantsForPartition) {
+ public static Map<ParticipantId, State> computeAutoBestStateForPartition(ClusterConfig cluster,
+ ResourceId resourceId, Map<ParticipantId, Participant> liveParticipantMap,
+ StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
+ Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
// if the resource is deleted, instancePreferenceList will be empty and
@@ -128,7 +131,9 @@ public class NewConstraintBasedAssignment {
boolean assigned[] = new boolean[participantPreferenceList.size()];
for (State state : statesPriorityList) {
- String num = stateModelDef.getNumParticipantsPerState(state);
+ String num =
+ cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
+ stateModelDef.getStateModelDefId(), state);
int stateCount = -1;
if ("N".equals(num)) {
Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantMap.keySet());
@@ -171,19 +176,23 @@ public class NewConstraintBasedAssignment {
/**
* Get the number of replicas that should be in each state for a partition
+ * @param cluster cluster configuration
+ * @param resourceId the resource for which to get the state count
* @param stateModelDef StateModelDefinition object
* @param liveNodesNb number of live nodes
* @param total number of replicas
* @return state count map: state->count
*/
- public static LinkedHashMap<State, Integer> stateCount(StateModelDefinition stateModelDef,
- int liveNodesNb, int totalReplicas) {
+ public static LinkedHashMap<State, Integer> stateCount(ClusterConfig cluster,
+ ResourceId resourceId, StateModelDefinition stateModelDef, int liveNodesNb, int totalReplicas) {
LinkedHashMap<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
List<State> statesPriorityList = stateModelDef.getStatesPriorityList();
int replicas = totalReplicas;
for (State state : statesPriorityList) {
- String num = stateModelDef.getNumParticipantsPerState(state);
+ String num =
+ cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
+ stateModelDef.getStateModelDefId(), state);
if ("N".equals(num)) {
stateCountMap.put(state, liveNodesNb);
} else if ("R".equals(num)) {
@@ -207,7 +216,9 @@ public class NewConstraintBasedAssignment {
// get state count for R
for (State state : statesPriorityList) {
- String num = stateModelDef.getNumParticipantsPerState(state);
+ String num =
+ cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
+ stateModelDef.getStateModelDefId(), state);
if ("R".equals(num)) {
stateCountMap.put(state, replicas);
// should have at most one state using R
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index c7473ff..dbbcb1e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -97,8 +97,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
Rebalancer rebalancer = null;
if (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED
- && idealState.getRebalancerClassName() != null) {
- String rebalancerClassName = idealState.getRebalancerClassName();
+ && idealState.getRebalancerRef() != null) {
+ String rebalancerClassName = idealState.getRebalancerRef().toString();
logger
.info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
try {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
index a89707b..021c9b8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -101,7 +101,8 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
partitionId);
partitionMapping.addReplicaMap(partitionId, NewConstraintBasedAssignment
- .computeAutoBestStateForPartition(cluster.getLiveParticipantMap(), stateModelDef, null,
+ .computeAutoBestStateForPartition(cluster.getConfig(), resourceId,
+ cluster.getLiveParticipantMap(), stateModelDef, null,
currentStateOutput.getCurrentStateMap(resourceId, partitionId),
disabledParticipantsForPartition));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
index 0cc1ea1..457b470 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -20,10 +20,11 @@ package org.apache.helix.controller.stages;
*/
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.helix.api.Cluster;
+import org.apache.helix.api.CustomRebalancerConfig;
+import org.apache.helix.api.FullAutoRebalancerConfig;
import org.apache.helix.api.Participant;
import org.apache.helix.api.Partition;
import org.apache.helix.api.PartitionId;
@@ -31,7 +32,9 @@ import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.SemiAutoRebalancerConfig;
import org.apache.helix.api.StateModelFactoryId;
+import org.apache.helix.api.UserDefinedRebalancerConfig;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.CurrentState;
@@ -47,29 +50,99 @@ public class NewResourceComputationStage extends AbstractBaseStage {
private static Logger LOG = Logger.getLogger(NewResourceComputationStage.class);
@Override
- public void process(ClusterEvent event) throws Exception {
+ public void process(ClusterEvent event) throws StageException {
Cluster cluster = event.getAttribute("ClusterDataCache");
if (cluster == null) {
- throw new StageException("Missing attributes in event:" + event + ". Requires Cluster");
+ throw new StageException("Missing attributes in event: " + event + ". Requires Cluster");
+ }
+
+ Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
+ Map<ResourceId, ResourceConfig> csResCfgMap = getCurStateResourceCfgMap(cluster);
+
+ // ideal-state may be removed, add all resource config in current-state but not in ideal-state
+ for (ResourceId resourceId : csResCfgMap.keySet()) {
+ if (!cluster.getResourceMap().keySet().contains(resourceId)) {
+ resCfgMap.put(resourceId, csResCfgMap.get(resourceId));
+ }
}
- Map<ResourceId, ResourceConfig.Builder> resourceBuilderMap =
- new LinkedHashMap<ResourceId, ResourceConfig.Builder>();
- // include all resources in ideal-state
for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
Resource resource = cluster.getResource(resourceId);
- RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
-
- ResourceConfig.Builder resourceBuilder = new ResourceConfig.Builder(resourceId);
- resourceBuilder.rebalancerConfig(rebalancerConfig);
- resourceBuilder.bucketSize(resource.getBucketSize());
- resourceBuilder.batchMessageMode(resource.getBatchMessageMode());
- resourceBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
- resourceBuilderMap.put(resourceId, resourceBuilder);
+ RebalancerConfig rebalancerCfg = resource.getRebalancerConfig();
+
+ ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
+ resCfgBuilder.bucketSize(resource.getBucketSize());
+ resCfgBuilder.batchMessageMode(resource.getBatchMessageMode());
+ resCfgBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
+
+ switch (rebalancerCfg.getRebalancerMode()) {
+ case USER_DEFINED: {
+ UserDefinedRebalancerConfig.Builder builder =
+ new UserDefinedRebalancerConfig.Builder(UserDefinedRebalancerConfig.from(rebalancerCfg));
+ if (csResCfgMap.containsKey(resourceId)) {
+ builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
+ }
+ resCfgBuilder.rebalancerConfig(builder.build());
+ resCfgMap.put(resourceId, resCfgBuilder.build());
+ break;
+ }
+ case FULL_AUTO: {
+ FullAutoRebalancerConfig.Builder builder =
+ new FullAutoRebalancerConfig.Builder(FullAutoRebalancerConfig.from(rebalancerCfg));
+ if (csResCfgMap.containsKey(resourceId)) {
+ builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
+ }
+ resCfgBuilder.rebalancerConfig(builder.build());
+ resCfgMap.put(resourceId, resCfgBuilder.build());
+ break;
+ }
+ case SEMI_AUTO: {
+ SemiAutoRebalancerConfig.Builder builder =
+ new SemiAutoRebalancerConfig.Builder(SemiAutoRebalancerConfig.from(rebalancerCfg));
+ if (csResCfgMap.containsKey(resourceId)) {
+ builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
+ }
+ resCfgBuilder.rebalancerConfig(builder.build());
+ resCfgMap.put(resourceId, resCfgBuilder.build());
+ break;
+ }
+ case CUSTOMIZED: {
+ CustomRebalancerConfig.Builder builder =
+ new CustomRebalancerConfig.Builder(CustomRebalancerConfig.from(rebalancerCfg));
+ if (csResCfgMap.containsKey(resourceId)) {
+ builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
+ }
+ resCfgBuilder.rebalancerConfig(builder.build());
+ resCfgMap.put(resourceId, resCfgBuilder.build());
+ break;
+ }
+ default:
+ RebalancerConfig.SimpleBuilder builder = new RebalancerConfig.SimpleBuilder(rebalancerCfg);
+ if (csResCfgMap.containsKey(resourceId)) {
+ builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
+ }
+ resCfgBuilder.rebalancerConfig(builder.build());
+ resCfgMap.put(resourceId, resCfgBuilder.build());
+ break;
+ }
+
}
- // include all partitions from CurrentState as well since idealState might be removed
- Map<ResourceId, RebalancerConfig.SimpleBuilder> rebalancerConfigBuilderMap =
+ event.addAttribute(AttributeName.RESOURCES.toString(), resCfgMap);
+ }
+
+ /**
+ * Get resource config's from current-state
+ * @param cluster
+ * @return resource config map or empty map if not available
+ * @throws StageException
+ */
+ Map<ResourceId, ResourceConfig> getCurStateResourceCfgMap(Cluster cluster)
+ throws StageException {
+ Map<ResourceId, ResourceConfig.Builder> resCfgBuilderMap =
+ new HashMap<ResourceId, ResourceConfig.Builder>();
+
+ Map<ResourceId, RebalancerConfig.SimpleBuilder> rebCfgBuilderMap =
new HashMap<ResourceId, RebalancerConfig.SimpleBuilder>();
for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
@@ -84,46 +157,35 @@ public class NewResourceComputationStage extends AbstractBaseStage {
+ currentState.getResourceId());
}
- // don't overwrite ideal state configs
- if (!resourceBuilderMap.containsKey(resourceId)) {
- if (!rebalancerConfigBuilderMap.containsKey(resourceId)) {
- RebalancerConfig.SimpleBuilder rebalancerConfigBuilder =
- new RebalancerConfig.SimpleBuilder(resourceId);
- rebalancerConfigBuilder.stateModelDef(currentState.getStateModelDefId());
- rebalancerConfigBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
- .getStateModelFactoryName()));
- rebalancerConfigBuilderMap.put(resourceId, rebalancerConfigBuilder);
- }
- ResourceConfig.Builder resourceBuilder = new ResourceConfig.Builder(resourceId);
- resourceBuilder.bucketSize(currentState.getBucketSize());
- resourceBuilder.batchMessageMode(currentState.getBatchMessageMode());
- resourceBuilderMap.put(resourceId, resourceBuilder);
+ if (!resCfgBuilderMap.containsKey(resourceId)) {
+ RebalancerConfig.SimpleBuilder rebCfgBuilder =
+ new RebalancerConfig.SimpleBuilder(resourceId);
+ rebCfgBuilder.stateModelDef(currentState.getStateModelDefId());
+ rebCfgBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
+ .getStateModelFactoryName()));
+ rebCfgBuilderMap.put(resourceId, rebCfgBuilder);
+
+ ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
+ resCfgBuilder.bucketSize(currentState.getBucketSize());
+ resCfgBuilder.batchMessageMode(currentState.getBatchMessageMode());
+ resCfgBuilderMap.put(resourceId, resCfgBuilder);
}
- // add all partitions in current-state
- if (rebalancerConfigBuilderMap.containsKey(resourceId)) {
- RebalancerConfig.SimpleBuilder rebalancerConfigBuilder =
- rebalancerConfigBuilderMap.get(resourceId);
- for (PartitionId partitionId : currentState.getPartitionStateMap().keySet()) {
- rebalancerConfigBuilder.addPartition(new Partition(partitionId));
- }
+ RebalancerConfig.SimpleBuilder rebCfgBuilder = rebCfgBuilderMap.get(resourceId);
+ for (PartitionId partitionId : currentState.getPartitionStateMap().keySet()) {
+ rebCfgBuilder.addPartition(new Partition(partitionId));
}
}
-
}
- // convert builder-map to resource-map
- Map<ResourceId, ResourceConfig> resourceMap = new LinkedHashMap<ResourceId, ResourceConfig>();
- for (ResourceId resourceId : resourceBuilderMap.keySet()) {
- ResourceConfig.Builder resourceConfigBuilder = resourceBuilderMap.get(resourceId);
- if (rebalancerConfigBuilderMap.containsKey(resourceId)) {
- RebalancerConfig.SimpleBuilder rebalancerConfigBuilder =
- rebalancerConfigBuilderMap.get(resourceId);
- resourceConfigBuilder.rebalancerConfig(rebalancerConfigBuilder.build());
- }
- resourceMap.put(resourceId, resourceConfigBuilder.build());
+ Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
+ for (ResourceId resourceId : resCfgBuilderMap.keySet()) {
+ ResourceConfig.Builder resCfgBuilder = resCfgBuilderMap.get(resourceId);
+ RebalancerConfig.SimpleBuilder rebCfgBuilder = rebCfgBuilderMap.get(resourceId);
+ resCfgBuilder.rebalancerConfig(rebCfgBuilder.build());
+ resCfgMap.put(resourceId, resCfgBuilder.build());
}
- event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
+ return resCfgMap;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
index e6b7398..ffc14d6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/RebalanceIdealStateStage.java
@@ -54,8 +54,8 @@ public class RebalanceIdealStateStage extends AbstractBaseStage {
for (String resourceName : idealStateMap.keySet()) {
IdealState currentIdealState = idealStateMap.get(resourceName);
if (currentIdealState.getRebalanceMode() == RebalanceMode.USER_DEFINED
- && currentIdealState.getRebalancerClassName() != null) {
- String rebalancerClassName = currentIdealState.getRebalancerClassName();
+ && currentIdealState.getRebalancerRef() != null) {
+ String rebalancerClassName = currentIdealState.getRebalancerRef().toString();
LOG.info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
try {
Rebalancer balancer =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
index 00612dd..c48f156 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
@@ -320,7 +320,7 @@ public class StatsAggregationStage extends AbstractBaseStage {
Builder kb = accessor.keyBuilder();
List<IdealState> idealStates = accessor.getChildValues(kb.idealStates());
for (IdealState idealState : idealStates) {
- String resourceName = idealState.getResourceName();
+ String resourceName = idealState.getResourceId().stringify();
if (actualStatName.contains("=" + resourceName + ".")
|| actualStatName.contains("=" + resourceName + ";")) {
return resourceName;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/controller/strategy/EspressoRelayStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/EspressoRelayStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/EspressoRelayStrategy.java
index 4e88499..6a7bb59 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/EspressoRelayStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/EspressoRelayStrategy.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.helix.HelixException;
+import org.apache.helix.api.StateModelDefId;
import org.apache.helix.model.IdealState;
public class EspressoRelayStrategy {
@@ -42,7 +43,7 @@ public class EspressoRelayStrategy {
IdealState result = new IdealState(resultRecordName);
result.setNumPartitions(partitions.size());
result.setReplicas("" + replica);
- result.setStateModelDefRef(stateModelName);
+ result.setStateModelDefId(StateModelDefId.from(stateModelName));
int groups = instances.size() / replica;
int remainder = instances.size() % replica;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
index 0a223fb..9d61914 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
@@ -40,6 +40,7 @@ import org.apache.helix.api.MessageId;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
@@ -180,7 +181,7 @@ public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFact
}
IdealState newAddedScheduledTasks = new IdealState(taskQueueName);
newAddedScheduledTasks.setBucketSize(TASKQUEUE_BUCKET_NUM);
- newAddedScheduledTasks.setStateModelDefRef(SCHEDULER_TASK_QUEUE);
+ newAddedScheduledTasks.setStateModelDefId(StateModelDefId.from(SCHEDULER_TASK_QUEUE));
synchronized (_manager) {
int existingTopPartitionId = 0;
@@ -233,8 +234,9 @@ public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFact
private int findTopPartitionId(IdealState currentTaskQueue) {
int topId = 0;
- for (String partitionName : currentTaskQueue.getPartitionStringSet()) {
+ for (PartitionId partitionId : currentTaskQueue.getPartitionSet()) {
try {
+ String partitionName = partitionId.stringify();
String partitionNumStr = partitionName.substring(partitionName.lastIndexOf('_') + 1);
int num = Integer.parseInt(partitionNumStr);
if (topId < num) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 61b1cd1..14c63f4 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -53,12 +53,14 @@ import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
import org.apache.helix.alerts.AlertsHolder;
import org.apache.helix.alerts.StatsHolder;
+import org.apache.helix.api.ConstraintId;
import org.apache.helix.api.MessageId;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.SessionId;
import org.apache.helix.api.State;
import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.StateModelFactoryId;
import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
import org.apache.helix.model.Alerts;
import org.apache.helix.model.ClusterConstraints;
@@ -240,9 +242,9 @@ public class ZKHelixAdmin implements HelixAdmin {
IdealState idealState = new IdealState(idealStateRecord);
for (String partitionName : partitionNames) {
if ((idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO && idealState
- .getPreferenceList(partitionName) == null)
+ .getPreferenceList(PartitionId.from(partitionName)) == null)
|| (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED && idealState
- .getInstanceStateMap(partitionName) == null)) {
+ .getParticipantStateMap(PartitionId.from(partitionName)) == null)) {
logger.warn("Cluster: " + clusterName + ", resource: " + resourceName + ", partition: "
+ partitionName + ", partition does not exist in ideal state");
}
@@ -390,7 +392,7 @@ public class ZKHelixAdmin implements HelixAdmin {
message.setStateModelDef(stateModelDef);
message.setFromState(State.from(HelixDefinedState.ERROR.toString()));
message.setToState(stateModel.getInitialState());
- message.setStateModelFactoryName(idealState.getStateModelFactoryName());
+ message.setStateModelFactoryId(idealState.getStateModelFactoryId());
resetMessages.add(message);
messageKeys.add(keyBuilder.message(instanceName, message.getId()));
@@ -590,7 +592,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void addResource(String clusterName, String resourceName, IdealState idealstate) {
- String stateModelRef = idealstate.getStateModelDefRef();
+ String stateModelRef = idealstate.getStateModelDefId().stringify();
String stateModelDefPath =
PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS, clusterName, stateModelRef);
if (!_zkClient.exists(stateModelDefPath)) {
@@ -631,10 +633,11 @@ public class ZKHelixAdmin implements HelixAdmin {
}
IdealState idealState = new IdealState(resourceName);
idealState.setNumPartitions(partitions);
- idealState.setStateModelDefRef(stateModelRef);
+ idealState.setStateModelDefId(StateModelDefId.from(stateModelRef));
idealState.setRebalanceMode(mode);
idealState.setReplicas("" + 0);
- idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+ idealState.setStateModelFactoryId(StateModelFactoryId
+ .from(HelixConstants.DEFAULT_STATE_MODEL_FACTORY));
if (maxPartitionsPerInstance > 0 && maxPartitionsPerInstance < Integer.MAX_VALUE) {
idealState.setMaxPartitionsPerInstance(maxPartitionsPerInstance);
}
@@ -900,7 +903,7 @@ public class ZKHelixAdmin implements HelixAdmin {
IdealState idealState = new IdealState(clusterName);
idealState.setNumPartitions(1);
- idealState.setStateModelDefRef("LeaderStandby");
+ idealState.setStateModelDefId(StateModelDefId.from("LeaderStandby"));
List<String> controllers = getInstancesInCluster(grandCluster);
if (controllers.size() == 0) {
@@ -918,7 +921,7 @@ public class ZKHelixAdmin implements HelixAdmin {
new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.idealState(idealState.getResourceName()), idealState);
+ accessor.setProperty(keyBuilder.idealState(idealState.getResourceId().stringify()), idealState);
}
@Override
@@ -986,7 +989,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
idealState.setReplicas(Integer.toString(replica));
int partitions = idealState.getNumPartitions();
- String stateModelName = idealState.getStateModelDefRef();
+ String stateModelName = idealState.getStateModelDefId().stringify();
StateModelDefinition stateModDef = getStateModelDef(clusterName, stateModelName);
if (stateModDef == null) {
@@ -1103,7 +1106,7 @@ public class ZKHelixAdmin implements HelixAdmin {
currentData == null ? new ClusterConstraints(constraintType) : new ClusterConstraints(
currentData);
- constraints.addConstraintItem(constraintId, constraintItem);
+ constraints.addConstraintItem(ConstraintId.from(constraintId), constraintItem);
return constraints.getRecord();
}
}, AccessOption.PERSISTENT);
@@ -1123,7 +1126,7 @@ public class ZKHelixAdmin implements HelixAdmin {
if (currentData != null) {
ClusterConstraints constraints = new ClusterConstraints(currentData);
- constraints.removeConstraintItem(constraintId);
+ constraints.removeConstraintItem(ConstraintId.from(constraintId));
return constraints.getRecord();
}
return null;
@@ -1152,8 +1155,9 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void rebalance(String clusterName, IdealState currentIdealState, List<String> instanceNames) {
Set<String> activeInstances = new HashSet<String>();
- for (String partition : currentIdealState.getPartitionStringSet()) {
- activeInstances.addAll(currentIdealState.getRecord().getListField(partition));
+ for (PartitionId partition : currentIdealState.getPartitionSet()) {
+ activeInstances.addAll(IdealState.stringListFromPreferenceList(currentIdealState
+ .getPreferenceList(partition)));
}
instanceNames.removeAll(activeInstances);
Map<String, Object> previousIdealState =
@@ -1162,17 +1166,16 @@ public class ZKHelixAdmin implements HelixAdmin {
Map<String, Object> balancedRecord =
DefaultTwoStateStrategy.calculateNextIdealState(instanceNames, previousIdealState);
StateModelDefinition stateModDef =
- this.getStateModelDef(clusterName, currentIdealState.getStateModelDefRef());
+ this.getStateModelDef(clusterName, currentIdealState.getStateModelDefId().stringify());
if (stateModDef == null) {
- throw new HelixException("cannot find state model: "
- + currentIdealState.getStateModelDefRef());
+ throw new HelixException("cannot find state model: " + currentIdealState.getStateModelDefId());
}
String[] states = RebalanceUtil.parseStates(clusterName, stateModDef);
ZNRecord newIdealStateRecord =
- DefaultTwoStateStrategy.convertToZNRecord(balancedRecord,
- currentIdealState.getResourceName(), states[0], states[1]);
+ DefaultTwoStateStrategy.convertToZNRecord(balancedRecord, currentIdealState.getResourceId()
+ .stringify(), states[0], states[1]);
Set<String> partitionSet = new HashSet<String>();
partitionSet.addAll(newIdealStateRecord.getMapFields().keySet());
partitionSet.addAll(newIdealStateRecord.getListFields().keySet());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
index ef47a12..2b014ae 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConstraints.java
@@ -27,6 +27,7 @@ import java.util.TreeMap;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.ConstraintId;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.builder.ConstraintItemBuilder;
import org.apache.log4j.Logger;
@@ -42,9 +43,11 @@ public class ClusterConstraints extends HelixProperty {
*/
public enum ConstraintAttribute {
STATE,
+ STATE_MODEL,
MESSAGE_TYPE,
TRANSITION,
RESOURCE,
+ PARTITION,
INSTANCE,
CONSTRAINT_VALUE
}
@@ -53,7 +56,9 @@ public class ClusterConstraints extends HelixProperty {
* Possible special values that constraint attributes can take
*/
public enum ConstraintValue {
- ANY
+ ANY,
+ N,
+ R
}
/**
@@ -65,7 +70,8 @@ public class ClusterConstraints extends HelixProperty {
}
// constraint-id -> constraint-item
- private final Map<String, ConstraintItem> _constraints = new HashMap<String, ConstraintItem>();
+ private final Map<ConstraintId, ConstraintItem> _constraints =
+ new HashMap<ConstraintId, ConstraintItem>();
/**
* Instantiate constraints as a given type
@@ -96,7 +102,7 @@ public class ClusterConstraints extends HelixProperty {
builder.addConstraintAttributes(_record.getMapField(constraintId)).build();
// ignore item with empty attributes or no constraint-value
if (item.getAttributes().size() > 0 && item.getConstraintValue() != null) {
- addConstraintItem(constraintId, item);
+ addConstraintItem(ConstraintId.from(constraintId), item);
} else {
LOG.error("Skip invalid constraint. key: " + constraintId + ", value: "
+ _record.getMapField(constraintId));
@@ -109,13 +115,13 @@ public class ClusterConstraints extends HelixProperty {
* @param constraintId unique constraint identifier
* @param item the constraint as a {@link ConstraintItem}
*/
- public void addConstraintItem(String constraintId, ConstraintItem item) {
+ public void addConstraintItem(ConstraintId constraintId, ConstraintItem item) {
Map<String, String> map = new TreeMap<String, String>();
for (ConstraintAttribute attr : item.getAttributes().keySet()) {
map.put(attr.toString(), item.getAttributeValue(attr));
}
map.put(ConstraintAttribute.CONSTRAINT_VALUE.toString(), item.getConstraintValue());
- _record.setMapField(constraintId, map);
+ _record.setMapField(constraintId.stringify(), map);
_constraints.put(constraintId, item);
}
@@ -123,8 +129,8 @@ public class ClusterConstraints extends HelixProperty {
* Add multiple constraint items.
* @param items (constraint identifier, {@link ConstrantItem}) pairs
*/
- public void addConstraintItems(Map<String, ConstraintItem> items) {
- for (String constraintId : items.keySet()) {
+ public void addConstraintItems(Map<ConstraintId, ConstraintItem> items) {
+ for (ConstraintId constraintId : items.keySet()) {
addConstraintItem(constraintId, items.get(constraintId));
}
}
@@ -133,9 +139,9 @@ public class ClusterConstraints extends HelixProperty {
* remove a constraint-item
* @param constraintId unique constraint identifier
*/
- public void removeConstraintItem(String constraintId) {
+ public void removeConstraintItem(ConstraintId constraintId) {
_constraints.remove(constraintId);
- _record.getMapFields().remove(constraintId);
+ _record.getMapFields().remove(constraintId.stringify());
}
/**
@@ -143,7 +149,7 @@ public class ClusterConstraints extends HelixProperty {
* @param constraintId unique constraint identifier
* @return {@link ConstraintItem} or null if not present
*/
- public ConstraintItem getConstraintItem(String constraintId) {
+ public ConstraintItem getConstraintItem(ConstraintId constraintId) {
return _constraints.get(constraintId);
}
@@ -163,6 +169,14 @@ public class ClusterConstraints extends HelixProperty {
}
/**
+ * Get all constraint items in this collection of constraints
+ * @return map of constraint id to constraint item
+ */
+ public Map<ConstraintId, ConstraintItem> getConstraintItems() {
+ return _constraints;
+ }
+
+ /**
* convert a message to constraint attribute pairs
* @param msg a {@link Message} containing constraint attributes
* @return constraint attribute scope-value pairs
@@ -182,13 +196,15 @@ public class ClusterConstraints extends HelixProperty {
if (msg.getTgtName() != null) {
attributes.put(ConstraintAttribute.INSTANCE, msg.getTgtName());
}
+ if (msg.getStateModelDefId() != null) {
+ attributes.put(ConstraintAttribute.STATE_MODEL, msg.getStateModelDefId().stringify());
+ }
}
return attributes;
}
@Override
public boolean isValid() {
- // TODO Auto-generated method stub
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/243f2adf/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
index b8f4ee5..f4304f4 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
@@ -146,6 +146,9 @@ public class ExternalView extends HelixProperty {
*/
public Map<ParticipantId, State> getStateMap(PartitionId partitionId) {
Map<String, String> rawStateMap = getStateMap(partitionId.stringify());
+ if (rawStateMap == null) {
+ return null;
+ }
ImmutableMap.Builder<ParticipantId, State> builder =
new ImmutableMap.Builder<ParticipantId, State>();
for (String participantName : rawStateMap.keySet()) {