You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:26 UTC
[18/53] [abbrv] [HELIX-100] Improve the helix config api
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
new file mode 100644
index 0000000..3aff151
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerContext.java
@@ -0,0 +1,93 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.StateModelFactoryId;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Defines the state available to a rebalancer. The most common use case is to use a
+ * {@link PartitionedRebalancerContext} or a subclass and set up a resource with it. A rebalancer
+ * configuration, at a minimum, is aware of subunits of a resource, the state model to follow, and
+ * how the configuration should be serialized.
+ */
+public interface RebalancerContext {
+ /**
+ * Get a map of resource partition identifiers to partitions. A partition is a subunit of a
+ * resource, e.g. a subtask of a task
+ * @return map of (subunit id, subunit) pairs
+ */
+ public Map<? extends PartitionId, ? extends Partition> getSubUnitMap();
+
+ /**
+ * Get the subunits of the resource (e.g. partitions)
+ * @return set of subunit ids
+ */
+ public Set<? extends PartitionId> getSubUnitIdSet();
+
+ /**
+ * Get a specific subunit
+ * @param subUnitId the id of the subunit
+ * @return SubUnit
+ */
+ public Partition getSubUnit(PartitionId partitionId);
+
+ /**
+ * Get the resource to rebalance
+ * @return resource id
+ */
+ public ResourceId getResourceId();
+
+ /**
+ * Get the state model definition that the resource follows
+ * @return state model definition id
+ */
+ public StateModelDefId getStateModelDefId();
+
+ /**
+ * Get the state model factory of this resource
+ * @return state model factory id
+ */
+ public StateModelFactoryId getStateModelFactoryId();
+
+ /**
+ * Get the tag, if any, that participants must have in order to serve this resource
+ * @return participant group tag, or null
+ */
+ public String getParticipantGroupTag();
+
+ /**
+ * Get the serializer for this context
+ * @return ContextSerializer class object
+ */
+ public Class<? extends ContextSerializer> getSerializerClass();
+
+ /**
+ * Get a reference to the class used to rebalance this resource
+ * @return RebalancerRef
+ */
+ public RebalancerRef getRebalancerRef();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerRef.java
new file mode 100644
index 0000000..a90b77a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerRef.java
@@ -0,0 +1,94 @@
+package org.apache.helix.controller.rebalancer.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Reference to a class that extends {@link Rebalancer}. It loads the class automatically.
+ */
+public class RebalancerRef {
+ private static final Logger LOG = Logger.getLogger(RebalancerRef.class);
+
+ @JsonProperty("rebalancerClassName")
+ private final String _rebalancerClassName;
+
+ @JsonCreator
+ private RebalancerRef(@JsonProperty("rebalancerClassName") String rebalancerClassName) {
+ _rebalancerClassName = rebalancerClassName;
+ }
+
+ /**
+ * Get an instantiated Rebalancer
+ * @return Rebalancer or null if instantiation failed
+ */
+ @JsonIgnore
+ public Rebalancer getRebalancer() {
+ try {
+ return (Rebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
+ } catch (Exception e) {
+ LOG.warn("Exception while invoking custom rebalancer class:" + _rebalancerClassName, e);
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return _rebalancerClassName;
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that instanceof RebalancerRef) {
+ return this.toString().equals(((RebalancerRef) that).toString());
+ } else if (that instanceof String) {
+ return this.toString().equals(that);
+ }
+ return false;
+ }
+
+ /**
+ * Get a rebalancer class reference
+ * @param rebalancerClassName name of the class
+ * @return RebalancerRef or null if name is null
+ */
+ public static RebalancerRef from(String rebalancerClassName) {
+ if (rebalancerClassName == null) {
+ return null;
+ }
+ return new RebalancerRef(rebalancerClassName);
+ }
+
+ /**
+ * Get a RebalancerRef from a class object
+ * @param rebalancerClass class that implements Rebalancer
+ * @return RebalancerRef
+ */
+ public static RebalancerRef from(Class<? extends Rebalancer> rebalancerClass) {
+ if (rebalancerClass == null) {
+ return null;
+ }
+ return RebalancerRef.from(rebalancerClass.getName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
new file mode 100644
index 0000000..525931d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/ReplicatedRebalancerContext.java
@@ -0,0 +1,40 @@
+package org.apache.helix.controller.rebalancer.context;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Methods specifying a rebalancer context that allows replicas. For instance, a rebalancer context
+ * with partitions may accept state model definitions that support multiple replicas per partition,
+ * and it's possible that the policy is that each live participant in the system should have a
+ * replica.
+ */
+public interface ReplicatedRebalancerContext extends RebalancerContext {
+ /**
+ * Check if this resource should be assigned to any live participant
+ * @return true if any live participant expected, false otherwise
+ */
+ public boolean anyLiveParticipant();
+
+ /**
+ * Get the number of replicas that each resource subunit should have
+ * @return replica count
+ */
+ public int getReplicaCount();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
new file mode 100644
index 0000000..6fe3f54
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancer.java
@@ -0,0 +1,78 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.State;
+import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Rebalancer for the SEMI_AUTO mode. It expects a RebalancerConfig that understands the preferred
+ * locations of each partition replica
+ */
+public class SemiAutoRebalancer implements Rebalancer {
+ private static final Logger LOG = Logger.getLogger(SemiAutoRebalancer.class);
+
+ @Override
+ public void init(HelixManager helixManager) {
+ // do nothing
+ }
+
+ @Override
+ public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, Cluster cluster,
+ ResourceCurrentState currentState) {
+ SemiAutoRebalancerContext config =
+ rebalancerConfig.getRebalancerContext(SemiAutoRebalancerContext.class);
+ StateModelDefinition stateModelDef =
+ cluster.getStateModelMap().get(config.getStateModelDefId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing resource:" + config.getResourceId());
+ }
+ ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+ for (PartitionId partition : config.getPartitionSet()) {
+ Map<ParticipantId, State> currentStateMap =
+ currentState.getCurrentStateMap(config.getResourceId(), partition);
+ Set<ParticipantId> disabledInstancesForPartition =
+ NewConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+ partition);
+ List<ParticipantId> preferenceList =
+ NewConstraintBasedAssignment.getPreferenceList(cluster, partition,
+ config.getPreferenceList(partition));
+ Map<ParticipantId, State> bestStateForPartition =
+ NewConstraintBasedAssignment.computeAutoBestStateForPartition(cluster.getConfig(),
+ config.getResourceId(), cluster.getLiveParticipantMap(), stateModelDef,
+ preferenceList, currentStateMap, disabledInstancesForPartition);
+ partitionMapping.addReplicaMap(partition, bestStateForPartition);
+ }
+ return partitionMapping;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
new file mode 100644
index 0000000..6cdfbb6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/SemiAutoRebalancerContext.java
@@ -0,0 +1,117 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * RebalancerContext for SEMI_AUTO rebalancer mode. It indicates the preferred locations of each
+ * partition replica. By default, it corresponds to {@link SemiAutoRebalancer}
+ */
+public final class SemiAutoRebalancerContext extends PartitionedRebalancerContext {
+ @JsonProperty("preferenceLists")
+ private Map<PartitionId, List<ParticipantId>> _preferenceLists;
+
+ /**
+ * Instantiate a SemiAutoRebalancerContext
+ */
+ public SemiAutoRebalancerContext() {
+ super(RebalanceMode.SEMI_AUTO);
+ setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
+ _preferenceLists = Maps.newHashMap();
+ }
+
+ /**
+ * Get the preference lists of all partitions of the resource
+ * @return map of partition id to list of participant ids
+ */
+ public Map<PartitionId, List<ParticipantId>> getPreferenceLists() {
+ return _preferenceLists;
+ }
+
+ /**
+ * Set the preference lists of all partitions of the resource
+ * @param preferenceLists
+ */
+ public void setPreferenceLists(Map<PartitionId, List<ParticipantId>> preferenceLists) {
+ _preferenceLists = preferenceLists;
+ }
+
+ /**
+ * Get the preference list of a partition
+ * @param partitionId the partition to look up
+ * @return list of participant ids
+ */
+ @JsonIgnore
+ public List<ParticipantId> getPreferenceList(PartitionId partitionId) {
+ return _preferenceLists.get(partitionId);
+ }
+
+ /**
+ * Build a SemiAutoRebalancerContext. By default, it corresponds to {@link SemiAutoRebalancer}
+ */
+ public static final class Builder extends PartitionedRebalancerContext.AbstractBuilder<Builder> {
+ private final Map<PartitionId, List<ParticipantId>> _preferenceLists;
+
+ /**
+ * Instantiate for a resource
+ * @param resourceId resource id
+ */
+ public Builder(ResourceId resourceId) {
+ super(resourceId);
+ super.rebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
+ _preferenceLists = Maps.newHashMap();
+ }
+
+ /**
+ * Add a preference list for a partition
+ * @param partitionId partition to set
+ * @param preferenceList ordered list of participants who can serve the partition
+ * @return Builder
+ */
+ public Builder preferenceList(PartitionId partitionId, List<ParticipantId> preferenceList) {
+ _preferenceLists.put(partitionId, preferenceList);
+ return self();
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+
+ @Override
+ public SemiAutoRebalancerContext build() {
+ SemiAutoRebalancerContext context = new SemiAutoRebalancerContext();
+ super.update(context);
+ context.setPreferenceLists(_preferenceLists);
+ return context;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 2c4d8e1..f50b95c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -42,7 +42,6 @@ import org.apache.log4j.Logger;
* Reads the data from the cluster using data accessor. This output ClusterData which
* provides useful methods to search/lookup properties
*/
-@Deprecated
public class ClusterDataCache {
Map<String, LiveInstance> _liveInstanceMap;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
index 021c9b8..6334279 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -22,25 +22,19 @@ package org.apache.helix.controller.stages;
import java.util.Map;
import java.util.Set;
+import org.apache.helix.HelixManager;
import org.apache.helix.api.Cluster;
-import org.apache.helix.api.CustomRebalancerConfig;
-import org.apache.helix.api.FullAutoRebalancerConfig;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
-import org.apache.helix.api.SemiAutoRebalancerConfig;
import org.apache.helix.api.StateModelDefId;
-import org.apache.helix.api.UserDefinedRebalancerConfig;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.controller.rebalancer.NewAutoRebalancer;
-import org.apache.helix.controller.rebalancer.NewCustomRebalancer;
-import org.apache.helix.controller.rebalancer.NewSemiAutoRebalancer;
-import org.apache.helix.controller.rebalancer.NewUserDefinedRebalancer;
+import org.apache.helix.controller.rebalancer.context.Rebalancer;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
-import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -115,52 +109,24 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
for (ResourceId resourceId : resourceMap.keySet()) {
- LOG.debug("Processing resource:" + resourceId);
- // Resource may be gone. In that case we need to get the state model name
- // from the current state
- // if (cluster.getResource(resourceId) == null) {
- // // if resource is deleted, then we do not know which rebalancer to use
- // // instead, just mark all partitions of the resource as dropped
- // if (LOG.isInfoEnabled()) {
- // LOG.info("resource:" + resourceId + " does not exist anymore");
- // }
- // StateModelDefinition stateModelDef =
- // stateModelDefs.get(currentStateOutput.getResourceStateModelDef(resourceId));
- // ResourceAssignment droppedAssignment =
- // mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
- // output.setResourceAssignment(resourceId, droppedAssignment);
- // continue;
- // }
-
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing resource:" + resourceId);
+ }
ResourceConfig resourceConfig = resourceMap.get(resourceId);
RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
ResourceAssignment resourceAssignment = null;
- if (rebalancerConfig.getRebalancerMode() == RebalanceMode.USER_DEFINED) {
- UserDefinedRebalancerConfig config = UserDefinedRebalancerConfig.from(rebalancerConfig);
- if (config.getRebalancerRef() != null) {
- NewUserDefinedRebalancer rebalancer = config.getRebalancerRef().getRebalancer();
- resourceAssignment =
- rebalancer.computeResourceMapping(config, cluster, currentStateOutput);
- }
- } else {
- if (rebalancerConfig.getRebalancerMode() == RebalanceMode.FULL_AUTO) {
- FullAutoRebalancerConfig config = FullAutoRebalancerConfig.from(rebalancerConfig);
- resourceAssignment =
- new NewAutoRebalancer().computeResourceMapping(config, cluster, currentStateOutput);
- } else if (rebalancerConfig.getRebalancerMode() == RebalanceMode.SEMI_AUTO) {
- SemiAutoRebalancerConfig config = SemiAutoRebalancerConfig.from(rebalancerConfig);
- resourceAssignment =
- new NewSemiAutoRebalancer().computeResourceMapping(config, cluster,
- currentStateOutput);
- } else if (rebalancerConfig.getRebalancerMode() == RebalanceMode.CUSTOMIZED) {
- CustomRebalancerConfig config = CustomRebalancerConfig.from(rebalancerConfig);
+ if (rebalancerConfig != null) {
+ Rebalancer rebalancer = rebalancerConfig.getRebalancer();
+ if (rebalancer != null) {
+ HelixManager manager = event.getAttribute("helixmanager");
+ rebalancer.init(manager);
resourceAssignment =
- new NewCustomRebalancer().computeResourceMapping(config, cluster, currentStateOutput);
+ rebalancer.computeResourceMapping(rebalancerConfig, cluster, currentStateOutput);
}
}
if (resourceAssignment == null) {
- StateModelDefinition stateModelDef =
- stateModelDefs.get(rebalancerConfig.getStateModelDefId());
+ RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+ StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
resourceAssignment =
mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
index 873419c..1a92919 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
@@ -80,7 +80,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
if (!message.getBatchMessageMode()) {
PartitionId partitionId = message.getPartitionId();
- Partition partition = resource.getPartition(partitionId);
+ Partition partition = resource.getSubUnit(partitionId);
if (partition != null) {
currentStateOutput.setPendingState(resourceId, partitionId, participantId,
message.getToState());
@@ -91,7 +91,7 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
List<PartitionId> partitionNames = message.getPartitionIds();
if (!partitionNames.isEmpty()) {
for (PartitionId partitionId : partitionNames) {
- Partition partition = resource.getPartition(partitionId);
+ Partition partition = resource.getSubUnit(partitionId);
if (partition != null) {
currentStateOutput.setPendingState(resourceId, partitionId, participantId,
message.getToState());
@@ -126,11 +126,10 @@ public class NewCurrentStateComputationStage extends AbstractBaseStage {
Map<PartitionId, State> partitionStateMap = curState.getPartitionStateMap();
for (PartitionId partitionId : partitionStateMap.keySet()) {
- Partition partition = resource.getPartition(partitionId);
+ Partition partition = resource.getSubUnit(partitionId);
if (partition != null) {
currentStateOutput.setCurrentState(resourceId, partitionId, participantId,
curState.getState(partitionId));
-
} else {
// log
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
index 95c76e6..477247e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewExternalViewComputeStage.java
@@ -37,7 +37,6 @@ import org.apache.helix.ZNRecordDelta.MergeOperation;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.SchedulerTaskConfig;
@@ -45,6 +44,8 @@ import org.apache.helix.api.State;
import org.apache.helix.api.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
@@ -53,7 +54,7 @@ import org.apache.helix.model.StatusUpdate;
import org.apache.log4j.Logger;
public class NewExternalViewComputeStage extends AbstractBaseStage {
- private static Logger LOG = Logger.getLogger(ExternalViewComputeStage.class);
+ private static Logger LOG = Logger.getLogger(NewExternalViewComputeStage.class);
@Override
public void process(ClusterEvent event) throws Exception {
@@ -97,7 +98,7 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
} else {
view.setBucketSize(currentStateOutput.getBucketSize(resourceId));
}
- for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+ for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
Map<ParticipantId, State> currentStateMap =
currentStateOutput.getCurrentStateMap(resourceId, partitionId);
if (currentStateMap != null && currentStateMap.size() > 0) {
@@ -138,8 +139,11 @@ public class NewExternalViewComputeStage extends AbstractBaseStage {
// partitions are finished (COMPLETED or ERROR), update the status update of the original
// scheduler
// message, and then remove the partitions from the ideal state
- if (rebalancerConfig != null
- && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
+ RebalancerContext rebalancerContext =
+ (rebalancerConfig != null) ? rebalancerConfig
+ .getRebalancerContext(RebalancerContext.class) : null;
+ if (rebalancerContext != null
+ && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
StateModelDefId.SchedulerTaskQueue)) {
updateScheduledTaskStatus(resourceId, view, manager, schedulerTaskConfig);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
index c0bde54..ad0ad95 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
@@ -30,7 +30,6 @@ import org.apache.helix.api.Cluster;
import org.apache.helix.api.MessageId;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.SchedulerTaskConfig;
@@ -40,6 +39,7 @@ import org.apache.helix.api.StateModelDefId;
import org.apache.helix.api.StateModelFactoryId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
@@ -76,13 +76,14 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
ResourceConfig resourceConfig = resourceMap.get(resourceId);
int bucketSize = resourceConfig.getBucketSize();
- StateModelDefinition stateModelDef =
- stateModelDefMap.get(resourceConfig.getRebalancerConfig().getStateModelDefId());
+ RebalancerContext rebalancerCtx =
+ resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
+ StateModelDefinition stateModelDef = stateModelDefMap.get(rebalancerCtx.getStateModelDefId());
ResourceAssignment resourceAssignment =
bestPossibleStateOutput.getResourceAssignment(resourceId);
- for (PartitionId partitionId : resourceConfig.getPartitionMap().keySet()) {
- Map<ParticipantId, State> instanceStateMap = resourceAssignment.getReplicaMap(partitionId);
+ for (PartitionId subUnitId : resourceConfig.getSubUnitMap().keySet()) {
+ Map<ParticipantId, State> instanceStateMap = resourceAssignment.getReplicaMap(subUnitId);
// we should generate message based on the desired-state priority
// so keep generated messages in a temp map keyed by state
@@ -93,7 +94,7 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
State desiredState = instanceStateMap.get(participantId);
State currentState =
- currentStateOutput.getCurrentState(resourceId, partitionId, participantId);
+ currentStateOutput.getCurrentState(resourceId, subUnitId, participantId);
if (currentState == null) {
currentState = stateModelDef.getInitialState();
}
@@ -103,12 +104,12 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
}
State pendingState =
- currentStateOutput.getPendingState(resourceId, partitionId, participantId);
+ currentStateOutput.getPendingState(resourceId, subUnitId, participantId);
// TODO fix it
State nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
if (nextState == null) {
- LOG.error("Unable to find a next state for partition: " + partitionId
+ LOG.error("Unable to find a next state for partition: " + subUnitId
+ " from stateModelDefinition" + stateModelDef.getClass() + " from:" + currentState
+ " to:" + desiredState);
continue;
@@ -116,13 +117,13 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
if (pendingState != null) {
if (nextState.equals(pendingState)) {
- LOG.debug("Message already exists for " + participantId + " to transit "
- + partitionId + " from " + currentState + " to " + nextState);
+ LOG.debug("Message already exists for " + participantId + " to transit " + subUnitId
+ + " from " + currentState + " to " + nextState);
} else if (currentState.equals(pendingState)) {
LOG.info("Message hasn't been removed for " + participantId + " to transit"
- + partitionId + " to " + pendingState + ", desiredState: " + desiredState);
+ + subUnitId + " to " + pendingState + ", desiredState: " + desiredState);
} else {
- LOG.info("IdealState changed before state transition completes for " + partitionId
+ LOG.info("IdealState changed before state transition completes for " + subUnitId
+ " on " + participantId + ", pendingState: " + pendingState + ", currentState: "
+ currentState + ", nextState: " + nextState);
}
@@ -131,20 +132,21 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
SessionId sessionId =
cluster.getLiveParticipantMap().get(participantId).getRunningInstance()
.getSessionId();
+ RebalancerContext rebalancerContext =
+ resourceConfig.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
Message message =
- createMessage(manager, resourceId, partitionId, participantId, currentState,
+ createMessage(manager, resourceId, subUnitId, participantId, currentState,
nextState, sessionId, StateModelDefId.from(stateModelDef.getId()),
- resourceConfig.getRebalancerConfig().getStateModelFactoryId(), bucketSize);
+ rebalancerContext.getStateModelFactoryId(), bucketSize);
// TODO refactor get/set timeout/inner-message
- RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
- if (rebalancerConfig != null
- && rebalancerConfig.getStateModelDefId().equalsIgnoreCase(
+ if (rebalancerContext != null
+ && rebalancerContext.getStateModelDefId().equalsIgnoreCase(
StateModelDefId.SchedulerTaskQueue)) {
- if (resourceConfig.getPartitionMap().size() > 0) {
+ if (resourceConfig.getSubUnitMap().size() > 0) {
// TODO refactor it -- we need a way to read in scheduler tasks a priori
Message innerMsg =
- resourceConfig.getSchedulerTaskConfig().getInnerMessage(partitionId);
+ resourceConfig.getSchedulerTaskConfig().getInnerMessage(subUnitId);
if (innerMsg != null) {
message.setInnerMessage(innerMsg);
}
@@ -157,12 +159,12 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
Message.Attributes.TIMEOUT.name());
SchedulerTaskConfig schedulerTaskConfig = resourceConfig.getSchedulerTaskConfig();
if (schedulerTaskConfig != null) {
- int timeout = schedulerTaskConfig.getTimeout(stateTransition, partitionId);
+ int timeout = schedulerTaskConfig.getTimeout(stateTransition, subUnitId);
if (timeout > 0) {
message.setExecutionTimeout(timeout);
}
}
- message.getRecord().setSimpleField("ClusterEventName", event.getName());
+ message.setClusterEvent(event);
if (!messageMap.containsKey(desiredState)) {
messageMap.put(desiredState, new ArrayList<Message>());
@@ -176,7 +178,7 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
for (State state : statesPriorityList) {
if (messageMap.containsKey(state)) {
for (Message message : messageMap.get(state)) {
- output.addMessage(resourceId, partitionId, message);
+ output.addMessage(resourceId, subUnitId, message);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
index 8ea2013..04d6af8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
@@ -30,14 +30,17 @@ import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.Scope;
import org.apache.helix.api.State;
import org.apache.helix.api.StateModelDefId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.ReplicatedRebalancerContext;
import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -104,7 +107,8 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
for (ResourceId resourceId : resourceMap.keySet()) {
ResourceConfig resource = resourceMap.get(resourceId);
StateModelDefinition stateModelDef =
- stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
+ stateModelDefMap.get(resource.getRebalancerConfig()
+ .getRebalancerContext(RebalancerContext.class).getStateModelDefId());
// TODO have a logical model for transition
Map<String, Integer> stateTransitionPriorities = getStateTransitionPriorityMap(stateModelDef);
@@ -116,7 +120,7 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
configResource == null ? null : configResource.getRebalancerConfig(), cluster);
// TODO fix it
- for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+ for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
List<Message> messages = messageGenOutput.getMessages(resourceId, partitionId);
List<Message> selectedMessages =
selectMessages(cluster.getLiveParticipantMap(),
@@ -259,22 +263,27 @@ public class NewMessageSelectionStage extends AbstractBaseStage {
*/
private Map<State, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
RebalancerConfig rebalancerConfig, Cluster cluster) {
+ ReplicatedRebalancerContext context =
+ (rebalancerConfig != null) ? rebalancerConfig
+ .getRebalancerContext(ReplicatedRebalancerContext.class) : null;
Map<State, Bounds> stateConstraints = new HashMap<State, Bounds>();
List<State> statePriorityList = stateModelDefinition.getStatesPriorityList();
for (State state : statePriorityList) {
- String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state.toString());
+ String numInstancesPerState =
+ cluster.getStateUpperBoundConstraint(Scope.cluster(cluster.getId()),
+ stateModelDefinition.getStateModelDefId(), state);
int max = -1;
if ("N".equals(numInstancesPerState)) {
max = cluster.getLiveParticipantMap().size();
} else if ("R".equals(numInstancesPerState)) {
// idealState is null when resource has been dropped,
// R can't be evaluated and ignore state constraints
- if (rebalancerConfig != null) {
- if (rebalancerConfig.canAssignAnyLiveParticipant()) {
+ if (context != null) {
+ if (context.anyLiveParticipant()) {
max = cluster.getLiveParticipantMap().size();
} else {
- max = rebalancerConfig.getReplicaCount();
+ max = context.getReplicaCount();
}
}
} else {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
index cfbd45c..8c3c847 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
@@ -147,7 +147,7 @@ public class NewMessageThrottleStage extends AbstractBaseStage {
for (ResourceId resourceId : resourceMap.keySet()) {
ResourceConfig resource = resourceMap.get(resourceId);
// TODO fix it
- for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+ for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
List<Message> messages = msgSelectionOutput.getMessages(resourceId, partitionId);
if (constraint != null && messages != null && messages.size() > 0) {
messages = throttle(throttleCounterMap, constraint, messages, true);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
index 457b470..38da0ac 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -23,20 +23,18 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.helix.api.Cluster;
-import org.apache.helix.api.CustomRebalancerConfig;
-import org.apache.helix.api.FullAutoRebalancerConfig;
import org.apache.helix.api.Participant;
import org.apache.helix.api.Partition;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.Resource;
import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
-import org.apache.helix.api.SemiAutoRebalancerConfig;
import org.apache.helix.api.StateModelFactoryId;
-import org.apache.helix.api.UserDefinedRebalancerConfig;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.model.CurrentState;
import org.apache.log4j.Logger;
@@ -74,58 +72,8 @@ public class NewResourceComputationStage extends AbstractBaseStage {
resCfgBuilder.bucketSize(resource.getBucketSize());
resCfgBuilder.batchMessageMode(resource.getBatchMessageMode());
resCfgBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
-
- switch (rebalancerCfg.getRebalancerMode()) {
- case USER_DEFINED: {
- UserDefinedRebalancerConfig.Builder builder =
- new UserDefinedRebalancerConfig.Builder(UserDefinedRebalancerConfig.from(rebalancerCfg));
- if (csResCfgMap.containsKey(resourceId)) {
- builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
- }
- resCfgBuilder.rebalancerConfig(builder.build());
- resCfgMap.put(resourceId, resCfgBuilder.build());
- break;
- }
- case FULL_AUTO: {
- FullAutoRebalancerConfig.Builder builder =
- new FullAutoRebalancerConfig.Builder(FullAutoRebalancerConfig.from(rebalancerCfg));
- if (csResCfgMap.containsKey(resourceId)) {
- builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
- }
- resCfgBuilder.rebalancerConfig(builder.build());
- resCfgMap.put(resourceId, resCfgBuilder.build());
- break;
- }
- case SEMI_AUTO: {
- SemiAutoRebalancerConfig.Builder builder =
- new SemiAutoRebalancerConfig.Builder(SemiAutoRebalancerConfig.from(rebalancerCfg));
- if (csResCfgMap.containsKey(resourceId)) {
- builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
- }
- resCfgBuilder.rebalancerConfig(builder.build());
- resCfgMap.put(resourceId, resCfgBuilder.build());
- break;
- }
- case CUSTOMIZED: {
- CustomRebalancerConfig.Builder builder =
- new CustomRebalancerConfig.Builder(CustomRebalancerConfig.from(rebalancerCfg));
- if (csResCfgMap.containsKey(resourceId)) {
- builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
- }
- resCfgBuilder.rebalancerConfig(builder.build());
- resCfgMap.put(resourceId, resCfgBuilder.build());
- break;
- }
- default:
- RebalancerConfig.SimpleBuilder builder = new RebalancerConfig.SimpleBuilder(rebalancerCfg);
- if (csResCfgMap.containsKey(resourceId)) {
- builder.addPartitions(csResCfgMap.get(resourceId).getPartitionMap().values());
- }
- resCfgBuilder.rebalancerConfig(builder.build());
- resCfgMap.put(resourceId, resCfgBuilder.build());
- break;
- }
-
+ resCfgBuilder.rebalancerContext(rebalancerCfg.getRebalancerContext(RebalancerContext.class));
+ resCfgMap.put(resourceId, resCfgBuilder.build());
}
event.addAttribute(AttributeName.RESOURCES.toString(), resCfgMap);
@@ -137,13 +85,12 @@ public class NewResourceComputationStage extends AbstractBaseStage {
* @return resource config map or empty map if not available
* @throws StageException
*/
- Map<ResourceId, ResourceConfig> getCurStateResourceCfgMap(Cluster cluster)
- throws StageException {
+ Map<ResourceId, ResourceConfig> getCurStateResourceCfgMap(Cluster cluster) throws StageException {
Map<ResourceId, ResourceConfig.Builder> resCfgBuilderMap =
new HashMap<ResourceId, ResourceConfig.Builder>();
- Map<ResourceId, RebalancerConfig.SimpleBuilder> rebCfgBuilderMap =
- new HashMap<ResourceId, RebalancerConfig.SimpleBuilder>();
+ Map<ResourceId, PartitionedRebalancerContext.Builder> rebCtxBuilderMap =
+ new HashMap<ResourceId, PartitionedRebalancerContext.Builder>();
for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
@@ -158,12 +105,12 @@ public class NewResourceComputationStage extends AbstractBaseStage {
}
if (!resCfgBuilderMap.containsKey(resourceId)) {
- RebalancerConfig.SimpleBuilder rebCfgBuilder =
- new RebalancerConfig.SimpleBuilder(resourceId);
- rebCfgBuilder.stateModelDef(currentState.getStateModelDefId());
- rebCfgBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
+ PartitionedRebalancerContext.Builder rebCtxBuilder =
+ new PartitionedRebalancerContext.Builder(resourceId);
+ rebCtxBuilder.stateModelDefId(currentState.getStateModelDefId());
+ rebCtxBuilder.stateModelFactoryId(StateModelFactoryId.from(currentState
.getStateModelFactoryName()));
- rebCfgBuilderMap.put(resourceId, rebCfgBuilder);
+ rebCtxBuilderMap.put(resourceId, rebCtxBuilder);
ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
resCfgBuilder.bucketSize(currentState.getBucketSize());
@@ -171,9 +118,9 @@ public class NewResourceComputationStage extends AbstractBaseStage {
resCfgBuilderMap.put(resourceId, resCfgBuilder);
}
- RebalancerConfig.SimpleBuilder rebCfgBuilder = rebCfgBuilderMap.get(resourceId);
+ PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
for (PartitionId partitionId : currentState.getPartitionStateMap().keySet()) {
- rebCfgBuilder.addPartition(new Partition(partitionId));
+ rebCtxBuilder.addPartition(new Partition(partitionId));
}
}
}
@@ -181,8 +128,8 @@ public class NewResourceComputationStage extends AbstractBaseStage {
Map<ResourceId, ResourceConfig> resCfgMap = new HashMap<ResourceId, ResourceConfig>();
for (ResourceId resourceId : resCfgBuilderMap.keySet()) {
ResourceConfig.Builder resCfgBuilder = resCfgBuilderMap.get(resourceId);
- RebalancerConfig.SimpleBuilder rebCfgBuilder = rebCfgBuilderMap.get(resourceId);
- resCfgBuilder.rebalancerConfig(rebCfgBuilder.build());
+ PartitionedRebalancerContext.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
+ resCfgBuilder.rebalancerContext(rebCtxBuilder.build());
resCfgMap.put(resourceId, resCfgBuilder.build());
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
index 1bbfc15..74b29c7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
@@ -66,7 +66,7 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
List<Message> messagesToSend = new ArrayList<Message>();
for (ResourceId resourceId : resourceMap.keySet()) {
ResourceConfig resource = resourceMap.get(resourceId);
- for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+ for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
messagesToSend.addAll(messages);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 88992c1..ff0923d 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -34,12 +34,11 @@ import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerRef;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
import org.apache.helix.api.StateModelDefId;
import org.apache.helix.api.StateModelFactoryId;
-import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.rebalancer.context.RebalancerRef;
import org.apache.log4j.Logger;
import com.google.common.base.Function;
@@ -559,7 +558,9 @@ public class IdealState extends HelixProperty {
* @param name state model factory id
*/
public void setStateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
- setStateModelFactoryName(stateModelFactoryId.stringify());
+ if (stateModelFactoryId != null) {
+ setStateModelFactoryName(stateModelFactoryId.stringify());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 29ac173..b372ac2 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -41,6 +41,7 @@ import org.apache.helix.api.SessionId;
import org.apache.helix.api.State;
import org.apache.helix.api.StateModelDefId;
import org.apache.helix.api.StateModelFactoryId;
+import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
import com.google.common.collect.ImmutableList;
@@ -732,6 +733,22 @@ public class Message extends HelixProperty {
_record.setMapField(Attributes.INNER_MESSAGE.name(), message.getRecord().getSimpleFields());
}
+ /**
+ * Set the cluster event generating this message
+ * @param event cluster event
+ */
+ public void setClusterEvent(ClusterEvent event) {
+ _record.setSimpleField("ClusterEventName", event.getName());
+ }
+
+ /**
+ * Get the cluster event name generating this message
+ * @param the cluster event event name
+ */
+ public String getClusterEventName() {
+ return _record.getSimpleField("ClusterEventName");
+ }
+
private boolean isNullOrEmpty(String data) {
return data == null || data.length() == 0 || data.trim().length() == 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
index 00f8472..8966434 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
@@ -1,13 +1,10 @@
package org.apache.helix.model;
-import java.util.ArrayList;
import java.util.List;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
-import org.apache.helix.api.NamespacedConfig;
import org.apache.helix.api.PartitionId;
-import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.ResourceId;
import com.google.common.base.Function;
@@ -91,25 +88,4 @@ public class ResourceConfiguration extends HelixProperty {
return null;
}
- /**
- * Add a rebalancer config to this resource
- * @param config populated rebalancer config
- */
- public void addRebalancerConfig(RebalancerConfig config) {
- addNamespacedConfig(config);
- setPartitionIds(new ArrayList<PartitionId>(config.getPartitionSet()));
- }
-
- /**
- * Create a new ResourceConfiguration from a NamespacedConfig
- * @param namespacedConfig namespaced configuration properties
- * @return ResourceConfiguration
- */
- public static ResourceConfiguration from(NamespacedConfig namespacedConfig) {
- ResourceConfiguration resourceConfiguration =
- new ResourceConfiguration(ResourceId.from(namespacedConfig.getId()));
- resourceConfiguration.addNamespacedConfig(namespacedConfig);
- return resourceConfiguration;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index 6e5576a..7e16de0 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -276,7 +276,7 @@ public class ClusterStateVerifier {
PartitionId partitionId = PartitionId.from(partitionName);
ParticipantId participantId = ParticipantId.from(instanceName);
raBuilder.addAssignment(partitionId, participantId,
- new State(HelixDefinedState.ERROR.toString()));
+ State.from(HelixDefinedState.ERROR.toString()));
}
bestPossOutput.setResourceAssignment(resourceId, raBuilder.build());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java b/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java
index 9f8f494..29370cb 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java
@@ -1,13 +1,10 @@
package org.apache.helix.api;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
-import org.apache.helix.model.ResourceConfiguration;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -86,44 +83,4 @@ public class TestNamespacedConfig {
Assert.assertEquals(instanceConfig.getRecord().getListField(prefixedKey), testListValue);
Assert.assertEquals(instanceConfig.getRecord().getMapField(prefixedKey), testMapValue);
}
-
- @Test
- public void testConfiguredResource() {
- // Set up the namespaced configs
- String userKey = "userKey";
- String userValue = "userValue";
- ResourceId resourceId = ResourceId.from("testResource");
- UserConfig userConfig = new UserConfig(Scope.resource(resourceId));
- userConfig.setSimpleField(userKey, userValue);
- PartitionId partitionId = PartitionId.from(resourceId, "0");
- Partition partition = new Partition(partitionId);
- Map<ParticipantId, State> preferenceMap = new HashMap<ParticipantId, State>();
- ParticipantId participantId = ParticipantId.from("participant");
- preferenceMap.put(participantId, State.from("ONLINE"));
- CustomRebalancerConfig rebalancerConfig =
- new CustomRebalancerConfig.Builder(resourceId).replicaCount(1).addPartition(partition)
- .stateModelDef(StateModelDefId.from("OnlineOffline"))
- .preferenceMap(partitionId, preferenceMap).build();
-
- // copy in the configs
- ResourceConfiguration config = new ResourceConfiguration(resourceId);
- config.addNamespacedConfig(userConfig);
- config.addRebalancerConfig(rebalancerConfig);
-
- // recreate the configs and check the fields
- UserConfig retrievedUserConfig = UserConfig.from(config);
- Assert.assertEquals(retrievedUserConfig.getSimpleField(userKey), userValue);
- Map<PartitionId, UserConfig> partitionConfigs = Collections.emptyMap();
- RebalancerConfig retrievedRebalancerConfig = RebalancerConfig.from(config, partitionConfigs);
- Assert.assertEquals(retrievedRebalancerConfig.getReplicaCount(),
- rebalancerConfig.getReplicaCount());
- Assert.assertEquals(retrievedRebalancerConfig.getStateModelDefId(),
- rebalancerConfig.getStateModelDefId());
- Assert.assertTrue(retrievedRebalancerConfig.getPartitionMap().containsKey(partitionId));
- Assert.assertEquals(retrievedRebalancerConfig.getPartitionSet().size(), rebalancerConfig
- .getPartitionSet().size());
- CustomRebalancerConfig customConfig = CustomRebalancerConfig.from(retrievedRebalancerConfig);
- Assert.assertEquals(customConfig.getPreferenceMap(partitionId).get(participantId),
- State.from("ONLINE"));
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index 7fc4c83..5775f9e 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -27,7 +27,7 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.controller.rebalancer.NewSemiAutoRebalancer;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.NewBestPossibleStateCalcStage;
@@ -38,7 +38,6 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.mock.controller.ClusterController;
import org.apache.helix.mock.participant.MockParticipant;
import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -94,8 +93,8 @@ public class TestNewStages extends ZkUnitTestBase {
ResourceId resourceId = ResourceId.from("TestDB0");
Assert.assertTrue(resourceMap.containsKey(resourceId));
Resource resource = resourceMap.get(resourceId);
- Assert
- .assertEquals(resource.getRebalancerConfig().getRebalancerMode(), RebalanceMode.SEMI_AUTO);
+ Assert.assertNotNull(resource.getRebalancerConfig().getRebalancerContext(
+ SemiAutoRebalancerContext.class));
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}
@@ -157,11 +156,9 @@ public class TestNewStages extends ZkUnitTestBase {
ResourceId resourceId = ResourceId.from("TestDB0");
Resource resource = cluster.getResource(resourceId);
ResourceCurrentState currentStateOutput = new ResourceCurrentState();
- SemiAutoRebalancerConfig semiAutoConfig =
- SemiAutoRebalancerConfig.from(resource.getRebalancerConfig());
ResourceAssignment semiAutoResult =
- new NewSemiAutoRebalancer().computeResourceMapping(semiAutoConfig, cluster,
- currentStateOutput);
+ resource.getRebalancerConfig().getRebalancer()
+ .computeResourceMapping(resource.getRebalancerConfig(), cluster, currentStateOutput);
verifySemiAutoRebalance(resource, semiAutoResult);
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
@@ -173,10 +170,11 @@ public class TestNewStages extends ZkUnitTestBase {
* @param assignment the assignment to verify
*/
private void verifySemiAutoRebalance(Resource resource, ResourceAssignment assignment) {
- Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getPartitionSet().size());
- SemiAutoRebalancerConfig config = SemiAutoRebalancerConfig.from(resource.getRebalancerConfig());
+ Assert.assertEquals(assignment.getMappedPartitions().size(), resource.getSubUnitSet().size());
+ SemiAutoRebalancerContext context =
+ resource.getRebalancerConfig().getRebalancerContext(SemiAutoRebalancerContext.class);
for (PartitionId partitionId : assignment.getMappedPartitions()) {
- List<ParticipantId> preferenceList = config.getPreferenceList(partitionId);
+ List<ParticipantId> preferenceList = context.getPreferenceList(partitionId);
Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partitionId);
Assert.assertEquals(replicaMap.size(), preferenceList.size());
Assert.assertEquals(replicaMap.size(), r);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
new file mode 100644
index 0000000..ebdcaff
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/context/TestSerializeRebalancerContext.java
@@ -0,0 +1,103 @@
+package org.apache.helix.controller.rebalancer.context;
+
+import java.util.Map;
+
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.model.ResourceConfiguration;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Ensure that a RebalancerContext of a specified type is able to be serialized and deserialized.
+ */
+public class TestSerializeRebalancerContext {
+ @Test
+ public void basicTest() {
+ // populate a context
+ CustomRebalancerContext context = new CustomRebalancerContext();
+ context.setAnyLiveParticipant(false);
+ context.setMaxPartitionsPerParticipant(Integer.MAX_VALUE);
+ Map<PartitionId, Partition> partitionMap = Maps.newHashMap();
+ ResourceId resourceId = ResourceId.from("testResource");
+ PartitionId partitionId = PartitionId.from(resourceId, "0");
+ partitionMap.put(partitionId, new Partition(partitionId));
+ context.setPartitionMap(partitionMap);
+ Map<PartitionId, Map<ParticipantId, State>> preferenceMaps = Maps.newHashMap();
+ ParticipantId participant1 = ParticipantId.from("participant1");
+ ParticipantId participant2 = ParticipantId.from("participant2");
+ Map<ParticipantId, State> preferenceMap =
+ ImmutableMap.of(participant1, State.from("MASTER"), participant2, State.from("SLAVE"));
+ preferenceMaps.put(partitionId, preferenceMap);
+ context.setPreferenceMaps(preferenceMaps);
+ context.setReplicaCount(3);
+ context.setStateModelDefId(StateModelDefId.from("MasterSlave"));
+ context.setResourceId(resourceId);
+
+ // serialize and deserialize by wrapping in a config
+ RebalancerConfig config = new RebalancerConfig(context);
+ CustomRebalancerContext deserialized =
+ config.getRebalancerContext(CustomRebalancerContext.class);
+
+ // check to make sure that the two objects contain the same data
+ Assert.assertNotNull(deserialized);
+ Assert.assertEquals(deserialized.anyLiveParticipant(), context.anyLiveParticipant());
+ Assert.assertEquals(deserialized.getPreferenceMap(partitionId).get(participant1), context
+ .getPreferenceMap(partitionId).get(participant1));
+ Assert.assertEquals(deserialized.getPreferenceMap(partitionId).get(participant2), context
+ .getPreferenceMap(partitionId).get(participant2));
+ Assert.assertEquals(deserialized.getReplicaCount(), context.getReplicaCount());
+ Assert.assertEquals(deserialized.getStateModelDefId(), context.getStateModelDefId());
+ Assert.assertEquals(deserialized.getResourceId(), context.getResourceId());
+
+ // wrap in a physical config and then unwrap it
+ ResourceConfiguration physicalConfig = new ResourceConfiguration(resourceId);
+ physicalConfig.addNamespacedConfig(config.toNamespacedConfig());
+ RebalancerConfig extractedConfig = new RebalancerConfig(physicalConfig);
+ CustomRebalancerContext extractedContext =
+ extractedConfig.getRebalancerContext(CustomRebalancerContext.class);
+
+ // make sure the unwrapped data hasn't changed
+ Assert.assertNotNull(extractedContext);
+ Assert.assertEquals(extractedContext.anyLiveParticipant(), context.anyLiveParticipant());
+ Assert.assertEquals(extractedContext.getPreferenceMap(partitionId).get(participant1), context
+ .getPreferenceMap(partitionId).get(participant1));
+ Assert.assertEquals(extractedContext.getPreferenceMap(partitionId).get(participant2), context
+ .getPreferenceMap(partitionId).get(participant2));
+ Assert.assertEquals(extractedContext.getReplicaCount(), context.getReplicaCount());
+ Assert.assertEquals(extractedContext.getStateModelDefId(), context.getStateModelDefId());
+ Assert.assertEquals(extractedContext.getResourceId(), context.getResourceId());
+
+ // make sure that it's legal to use a base rebalancer context
+ RebalancerContext rebalancerContext =
+ extractedConfig.getRebalancerContext(RebalancerContext.class);
+ Assert.assertNotNull(rebalancerContext);
+ Assert.assertEquals(rebalancerContext.getResourceId(), context.getResourceId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index f7cdcba..262f779 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -20,7 +20,6 @@ package org.apache.helix.controller.stages;
*/
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -33,8 +32,6 @@ import org.apache.helix.Mocks;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.ParticipantId;
-import org.apache.helix.api.Partition;
-import org.apache.helix.api.PartitionId;
import org.apache.helix.api.Resource;
import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
@@ -43,6 +40,8 @@ import org.apache.helix.api.StateModelDefId;
import org.apache.helix.api.UserConfig;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.InstanceConfig;
@@ -169,14 +168,11 @@ public class BaseStageTest {
Map<ResourceId, ResourceConfig> resourceMap = new HashMap<ResourceId, ResourceConfig>();
for (IdealState idealState : idealStates) {
ResourceId resourceId = idealState.getResourceId();
- Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
- for (PartitionId partitionId : idealState.getPartitionSet()) {
- partitionMap.put(partitionId, new Partition(partitionId));
- }
- Map<PartitionId, UserConfig> partitionConfigMap = Collections.emptyMap();
+ RebalancerContext context = PartitionedRebalancerContext.from(idealState);
Resource resource =
- new Resource(resourceId, idealState, null, null, new UserConfig(
- Scope.resource(resourceId)), partitionConfigMap);
+ new Resource(resourceId, idealState, null, null, context, new UserConfig(
+ Scope.resource(resourceId)), idealState.getBucketSize(),
+ idealState.getBatchMessageMode());
resourceMap.put(resourceId, resource.getConfig());
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
index 45507a1..ec6c525 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
@@ -33,6 +33,7 @@ import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
import org.apache.helix.api.StateModelDefId;
import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
@@ -78,9 +79,10 @@ public class TestResourceComputationStage extends BaseStageTest {
AssertJUnit.assertEquals(resource.values().iterator().next().getId(),
ResourceId.from(resourceName));
AssertJUnit.assertEquals(resource.values().iterator().next().getRebalancerConfig()
- .getStateModelDefId(), idealState.getStateModelDefId());
- AssertJUnit.assertEquals(resource.values().iterator().next().getPartitionSet().size(),
- partitions);
+ .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
+ idealState.getStateModelDefId());
+ AssertJUnit
+ .assertEquals(resource.values().iterator().next().getSubUnitSet().size(), partitions);
}
@Test
@@ -105,8 +107,9 @@ public class TestResourceComputationStage extends BaseStageTest {
AssertJUnit.assertTrue(resourceMap.containsKey(resourceId));
AssertJUnit.assertEquals(resourceMap.get(resourceId).getId(), resourceId);
AssertJUnit.assertEquals(resourceMap.get(resourceId).getRebalancerConfig()
- .getStateModelDefId(), idealState.getStateModelDefId());
- AssertJUnit.assertEquals(resourceMap.get(resourceId).getPartitionSet().size(),
+ .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
+ idealState.getStateModelDefId());
+ AssertJUnit.assertEquals(resourceMap.get(resourceId).getSubUnitSet().size(),
idealState.getNumPartitions());
}
}
@@ -179,8 +182,9 @@ public class TestResourceComputationStage extends BaseStageTest {
AssertJUnit.assertTrue(resourceMap.containsKey(resourceId));
AssertJUnit.assertEquals(resourceMap.get(resourceId).getId(), resourceId);
AssertJUnit.assertEquals(resourceMap.get(resourceId).getRebalancerConfig()
- .getStateModelDefId(), idealState.getStateModelDefId());
- AssertJUnit.assertEquals(resourceMap.get(resourceId).getPartitionSet().size(),
+ .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
+ idealState.getStateModelDefId());
+ AssertJUnit.assertEquals(resourceMap.get(resourceId).getSubUnitSet().size(),
idealState.getNumPartitions());
}
// Test the data derived from CurrentState
@@ -188,14 +192,15 @@ public class TestResourceComputationStage extends BaseStageTest {
AssertJUnit.assertTrue(resourceMap.containsKey(oldResourceId));
AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getId(), oldResourceId);
AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getRebalancerConfig()
- .getStateModelDefId(), currentState.getStateModelDefId());
- AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getPartitionSet().size(), currentState
+ .getRebalancerContext(RebalancerContext.class).getStateModelDefId(),
+ currentState.getStateModelDefId());
+ AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getSubUnitSet().size(), currentState
.getPartitionStateMap().size());
- AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getPartition(
+ AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getSubUnit(
PartitionId.from("testResourceOld_0")));
- AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getPartition(
+ AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getSubUnit(
PartitionId.from("testResourceOld_1")));
- AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getPartition(
+ AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getSubUnit(
PartitionId.from("testResourceOld_2")));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index 4f7b814..90c53d6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -25,14 +25,16 @@ import java.util.List;
import java.util.Map;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.State;
-import org.apache.helix.api.UserDefinedRebalancerConfig;
-import org.apache.helix.controller.rebalancer.NewUserDefinedRebalancer;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.Rebalancer;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -52,24 +54,27 @@ import org.testng.annotations.Test;
public class TestCustomizedIdealStateRebalancer extends
ZkStandAloneCMTestBaseWithPropertyServerCheck {
String db2 = TEST_DB + "2";
+ static boolean testRebalancerCreated = false;
static boolean testRebalancerInvoked = false;
- public static class TestRebalancer implements NewUserDefinedRebalancer {
+ public static class TestRebalancer implements Rebalancer {
/**
* Very basic mapping that evenly assigns one replica of each partition to live nodes, each of
* which is in the highest-priority state.
*/
@Override
- public ResourceAssignment computeResourceMapping(UserDefinedRebalancerConfig config,
- Cluster cluster, ResourceCurrentState currentState) {
+ public ResourceAssignment computeResourceMapping(RebalancerConfig config, Cluster cluster,
+ ResourceCurrentState currentState) {
+ PartitionedRebalancerContext context =
+ config.getRebalancerContext(PartitionedRebalancerContext.class);
StateModelDefinition stateModelDef =
- cluster.getStateModelMap().get(config.getStateModelDefId());
+ cluster.getStateModelMap().get(context.getStateModelDefId());
List<ParticipantId> liveParticipants =
new ArrayList<ParticipantId>(cluster.getLiveParticipantMap().keySet());
- ResourceAssignment resourceMapping = new ResourceAssignment(config.getResourceId());
+ ResourceAssignment resourceMapping = new ResourceAssignment(context.getResourceId());
int i = 0;
- for (PartitionId partitionId : config.getPartitionSet()) {
+ for (PartitionId partitionId : context.getPartitionSet()) {
int nodeIndex = i % liveParticipants.size();
Map<ParticipantId, State> replicaMap = new HashMap<ParticipantId, State>();
replicaMap.put(liveParticipants.get(nodeIndex), stateModelDef.getStatesPriorityList()
@@ -80,6 +85,11 @@ public class TestCustomizedIdealStateRebalancer extends
testRebalancerInvoked = true;
return resourceMapping;
}
+
+ @Override
+ public void init(HelixManager helixManager) {
+ testRebalancerCreated = true;
+ }
}
@Test
@@ -111,6 +121,7 @@ public class TestCustomizedIdealStateRebalancer extends
Assert.assertEquals(is.getPreferenceList(partition).size(), 0);
Assert.assertEquals(is.getParticipantStateMap(partition).size(), 0);
}
+ Assert.assertTrue(testRebalancerCreated);
Assert.assertTrue(testRebalancerInvoked);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/917af3eb/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
index 7be5ab1..ed66ae2 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
@@ -9,7 +9,6 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.api.ClusterAccessor;
import org.apache.helix.api.ClusterConfig;
import org.apache.helix.api.ClusterId;
-import org.apache.helix.api.FullAutoRebalancerConfig;
import org.apache.helix.api.ParticipantConfig;
import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.Partition;
@@ -20,6 +19,7 @@ import org.apache.helix.api.Scope;
import org.apache.helix.api.State;
import org.apache.helix.api.StateModelDefId;
import org.apache.helix.api.UserConfig;
+import org.apache.helix.controller.rebalancer.context.FullAutoRebalancerContext;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -118,21 +118,18 @@ public class NewModelExample {
// identify the resource
ResourceId resourceId = ResourceId.from("exampleResource");
- // create a partition with no user-defined configuration
+ // create a partition
Partition partition1 = new Partition(PartitionId.from("partition1"));
- // create a partition with (optional) user-defined configuration
- PartitionId partition2Id = PartitionId.from("partition2");
- UserConfig partition2Config = new UserConfig(Scope.partition(partition2Id));
- partition2Config.setSimpleField("sampleString", "partition config");
- Partition partition2 = new Partition(partition2Id, partition2Config);
+ // create a second partition
+ Partition partition2 = new Partition(PartitionId.from("partition2"));
// specify the rebalancer configuration
// this resource will be rebalanced in FULL_AUTO mode, so use the FullAutoRebalancerConfig
// builder
- FullAutoRebalancerConfig.Builder rebalanceConfigBuilder =
- new FullAutoRebalancerConfig.Builder(resourceId).replicaCount(3).addPartition(partition1)
- .addPartition(partition2).stateModelDef(stateModelDef.getStateModelDefId());
+ FullAutoRebalancerContext.Builder rebalanceContextBuilder =
+ new FullAutoRebalancerContext.Builder(resourceId).replicaCount(1).addPartition(partition1)
+ .addPartition(partition2).stateModelDefId(stateModelDef.getStateModelDefId());
// create (optional) user-defined configuration properties for the resource
UserConfig userConfig = new UserConfig(Scope.resource(resourceId));
@@ -140,7 +137,7 @@ public class NewModelExample {
// create the configuration for a new resource
ResourceConfig.Builder resourceBuilder =
- new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalanceConfigBuilder.build())
+ new ResourceConfig.Builder(resourceId).rebalancerContext(rebalanceContextBuilder.build())
.userConfig(userConfig);
return resourceBuilder.build();
}