You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/09/12 17:09:43 UTC
[4/5] helix git commit: [HELIX-634] Refactor AutoRebalancer to allow
configuable placement strategy.
[HELIX-634] Refactor AutoRebalancer to allow configuable placement strategy.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ea0fbbbc
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ea0fbbbc
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ea0fbbbc
Branch: refs/heads/helix-0.6.x
Commit: ea0fbbbce302974b88a2b8253bf06616fd91aa5b
Parents: bc0aa76
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Jun 7 14:42:43 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 12 10:06:33 2016 -0700
----------------------------------------------------------------------
.../controller/rebalancer/AutoRebalancer.java | 40 +++++++---
.../strategy/AutoRebalanceStrategy.java | 40 +++++-----
.../controller/strategy/RebalanceStrategy.java | 52 +++++++++++++
.../java/org/apache/helix/model/IdealState.java | 23 +++++-
.../helix/model/builder/IdealStateBuilder.java | 80 ++++++++++++++++++++
.../task/GenericTaskAssignmentCalculator.java | 5 +-
.../strategy/TestAutoRebalanceStrategy.java | 25 +++---
7 files changed, 217 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index e47297f..6682426 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
@@ -35,8 +36,7 @@ import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.controller.strategy.RebalanceStrategy;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.LiveInstance;
@@ -44,6 +44,7 @@ import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
import org.apache.log4j.Logger;
/**
@@ -59,14 +60,14 @@ import org.apache.log4j.Logger;
public class AutoRebalancer implements Rebalancer, MappingCalculator {
// These should be final, but are initialized in init rather than a constructor
private HelixManager _manager;
- private AutoRebalanceStrategy _algorithm;
+ private RebalanceStrategy _rebalanceStrategy;
private static final Logger LOG = Logger.getLogger(AutoRebalancer.class);
@Override
public void init(HelixManager manager) {
this._manager = manager;
- this._algorithm = null;
+ this._rebalanceStrategy = null;
}
@Override
@@ -127,13 +128,32 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
- ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
- placementScheme.init(_manager);
- _algorithm =
- new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition,
- placementScheme);
+ String rebalanceStrategyName = currentIdealState.getRebalanceStrategy();
+ if (rebalanceStrategyName == null || rebalanceStrategyName.equalsIgnoreCase("default")) {
+ _rebalanceStrategy =
+ new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition);
+ } else {
+ try {
+ _rebalanceStrategy = RebalanceStrategy.class
+ .cast(HelixUtil.loadClass(getClass(), rebalanceStrategyName).newInstance());
+ _rebalanceStrategy.init(resourceName, partitions, stateCountMap, maxPartition);
+ } catch (ClassNotFoundException ex) {
+ throw new HelixException(
+ "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
+ ex);
+ } catch (InstantiationException ex) {
+ throw new HelixException(
+ "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
+ ex);
+ } catch (IllegalAccessException ex) {
+ throw new HelixException(
+ "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
+ ex);
+ }
+ }
+
ZNRecord newMapping =
- _algorithm.computePartitionAssignment(liveNodes, currentMapping, allNodes);
+ _rebalanceStrategy.computePartitionAssignment(liveNodes, currentMapping, allNodes);
if (LOG.isDebugEnabled()) {
LOG.debug("currentMapping: " + currentMapping);
http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 11b5b0d..959609f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -36,16 +36,15 @@ import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.log4j.Logger;
-public class AutoRebalanceStrategy {
-
+public class AutoRebalanceStrategy implements RebalanceStrategy {
private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);
-
- private final String _resourceName;
- private final List<String> _partitions;
- private final LinkedHashMap<String, Integer> _states;
- private final int _maximumPerNode;
private final ReplicaPlacementScheme _placementScheme;
+ private String _resourceName;
+ private List<String> _partitions;
+ private LinkedHashMap<String, Integer> _states;
+ private int _maximumPerNode;
+
private Map<String, Node> _nodeMap;
private List<Node> _liveNodesList;
private Map<Integer, String> _stateMap;
@@ -56,24 +55,26 @@ public class AutoRebalanceStrategy {
private Set<Replica> _orphaned;
public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
- final LinkedHashMap<String, Integer> states, int maximumPerNode,
- ReplicaPlacementScheme placementScheme) {
- _resourceName = resourceName;
- _partitions = partitions;
- _states = states;
- _maximumPerNode = maximumPerNode;
- if (placementScheme != null) {
- _placementScheme = placementScheme;
- } else {
- _placementScheme = new DefaultPlacementScheme();
- }
+ final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+ init(resourceName, partitions, states, maximumPerNode);
+ _placementScheme = new DefaultPlacementScheme();
}
public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
final LinkedHashMap<String, Integer> states) {
- this(resourceName, partitions, states, Integer.MAX_VALUE, new DefaultPlacementScheme());
+ this(resourceName, partitions, states, Integer.MAX_VALUE);
+ }
+
+ @Override
+ public void init(String resourceName, final List<String> partitions,
+ final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+ _resourceName = resourceName;
+ _partitions = partitions;
+ _states = states;
+ _maximumPerNode = maximumPerNode;
}
+ @Override
public ZNRecord computePartitionAssignment(final List<String> liveNodes,
final Map<String, Map<String, String>> currentMapping, final List<String> allNodes) {
int numReplicas = countStateReplicas();
@@ -546,7 +547,6 @@ public class AutoRebalanceStrategy {
/**
* Counts the total number of replicas given a state-count mapping
- * @param states
* @return
*/
private int countStateReplicas() {
http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
new file mode 100644
index 0000000..4daae82
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
@@ -0,0 +1,52 @@
+package org.apache.helix.controller.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.ZNRecord;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Assignment strategy interface that computes the assignment of partition->instance.
+ */
+public interface RebalanceStrategy {
+ /**
+ * Perform the necessary initialization for the rebalance strategy object.
+ * @param resourceName
+ * @param partitions
+ * @param states
+ * @param maximumPerNode
+ */
+ void init(String resourceName, final List<String> partitions,
+ final LinkedHashMap<String, Integer> states, int maximumPerNode);
+
+ /**
+ * Compute the preference lists and (optional partition-state mapping) for the given resource.
+ *
+ * @param liveNodes
+ * @param currentMapping
+ * @param allNodes
+ * @return
+ */
+ ZNRecord computePartitionAssignment(final List<String> liveNodes,
+ final Map<String, Map<String, String>> currentMapping, final List<String> allNodes);
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 44f4219..7c4cf54 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -53,10 +53,11 @@ public class IdealState extends HelixProperty {
@Deprecated
IDEAL_STATE_MODE,
REBALANCE_MODE,
+ REBALANCER_CLASS_NAME,
REBALANCE_TIMER_PERIOD,
+ REBALANCE_STRATEGY,
MAX_PARTITIONS_PER_INSTANCE,
INSTANCE_GROUP_TAG,
- REBALANCER_CLASS_NAME,
HELIX_ENABLED,
RESOURCE_GROUP_NAME,
GROUP_ROUTING_ENABLED,
@@ -165,6 +166,26 @@ public class IdealState extends HelixProperty {
}
/**
+ * Specify the strategy for Helix to use to compute the partition-instance assignment,
+ * i,e, the custom rebalance strategy that implements {@link org.apache.helix.controller.strategy.RebalanceStrategy}
+ *
+ * @param rebalanceStrategy
+ * @return
+ */
+ public void setRebalanceStrategy(String rebalanceStrategy) {
+ _record.setSimpleField(IdealStateProperty.REBALANCE_STRATEGY.name(), rebalanceStrategy);
+ }
+
+ /**
+ * Get the rebalance strategy for this resource.
+ *
+ * @return rebalance strategy, or null if not specified.
+ */
+ public String getRebalanceStrategy() {
+ return _record.getSimpleField(IdealStateProperty.REBALANCE_STRATEGY.name());
+ }
+
+ /**
* Set the resource group name
* @param resourceGroupName
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
index d3bc3f2..9ad3023 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
@@ -52,6 +52,17 @@ public abstract class IdealStateBuilder {
* Helix rebalancer strategies. AUTO, SEMI_AUTO, CUSTOMIZED
*/
protected IdealState.RebalanceMode rebalancerMode;
+
+ /**
+ * Customized rebalancer class.
+ */
+ private String rebalancerClassName;
+
+ /**
+ * Custom rebalance strategy
+ */
+ private String rebalanceStrategy;
+
/**
* A constraint that limits the maximum number of partitions per Node.
*/
@@ -68,6 +79,16 @@ public abstract class IdealStateBuilder {
*/
private Boolean disableExternalView = null;
+ /**
+ * Resource group name.
+ */
+ private String resourceGroupName;
+
+ /**
+ * Whether the resource group routing should be enabled in routingProvider.
+ */
+ private Boolean enableGroupRouting;
+
protected ZNRecord _record;
/**
@@ -144,6 +165,44 @@ public abstract class IdealStateBuilder {
}
/**
+ * Set custom rebalancer class name.
+ * @return IdealStateBuilder
+ */
+ public IdealStateBuilder setRebalancerClass(String rebalancerClassName) {
+ this.rebalancerClassName = rebalancerClassName;
+ return this;
+ }
+
+ /**
+ * Set custom rebalance strategy name.
+ * @param rebalanceStrategy
+ * @return
+ */
+ public IdealStateBuilder setRebalanceStrategy(String rebalanceStrategy) {
+ this.rebalanceStrategy = rebalanceStrategy;
+ return this;
+ }
+
+ /**
+ *
+ * @param resourceGroupName
+ * @return
+ */
+ public IdealStateBuilder setResourceGroupName(String resourceGroupName) {
+ this.resourceGroupName = resourceGroupName;
+ return this;
+ }
+
+ /**
+ * Enable Group Routing for this resource.
+ * @return
+ */
+ public IdealStateBuilder enableGroupRouting() {
+ this.enableGroupRouting = true;
+ return this;
+ }
+
+ /**
* @return
*/
public IdealState build() {
@@ -154,10 +213,31 @@ public abstract class IdealStateBuilder {
idealstate.setStateModelFactoryName(stateModelFactoryName);
idealstate.setRebalanceMode(rebalancerMode);
idealstate.setReplicas("" + numReplica);
+
+ if (rebalancerClassName != null) {
+ idealstate.setRebalancerClassName(rebalancerClassName);
+ }
+
+ if (rebalanceStrategy != null) {
+ idealstate.setRebalanceStrategy(rebalanceStrategy);
+ }
+
+ if (maxPartitionsPerNode > 0) {
+ idealstate.setMaxPartitionsPerInstance(maxPartitionsPerNode);
+ }
+
if (disableExternalView != null) {
idealstate.setDisableExternalView(disableExternalView);
}
+ if (resourceGroupName != null) {
+ idealstate.setResourceGroupName(resourceGroupName);
+ }
+
+ if (enableGroupRouting != null) {
+ idealstate.enableGroupRouting(enableGroupRouting);
+ }
+
if (!idealstate.isValid()) {
throw new HelixException("invalid ideal-state: " + idealstate);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index b0a1a33..623357f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -33,6 +33,7 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.RebalanceStrategy;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
@@ -121,9 +122,7 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
}
// Get the assignment keyed on partition
- AutoRebalanceStrategy strategy =
- new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE,
- new AutoRebalanceStrategy.DefaultPlacementScheme());
+ RebalanceStrategy strategy = new AutoRebalanceStrategy(resourceId, partitions, states);
List<String> allNodes =
Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache));
Collections.sort(allNodes);
http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index 985d0c8..adc92d6 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -116,8 +116,7 @@ public class TestAutoRebalanceStrategy {
StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states);
new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode,
- stateModelDef, new AutoRebalanceStrategy.DefaultPlacementScheme())
- .runRepeatedly(numIterations);
+ stateModelDef).runRepeatedly(numIterations);
}
/**
@@ -157,13 +156,11 @@ public class TestAutoRebalanceStrategy {
private List<String> _allNodes;
private int _maxPerNode;
private StateModelDefinition _stateModelDef;
- private ReplicaPlacementScheme _placementScheme;
private Random _random;
public AutoRebalanceTester(List<String> partitions, LinkedHashMap<String, Integer> states,
List<String> liveNodes, Map<String, Map<String, String>> currentMapping,
- List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef,
- ReplicaPlacementScheme placementScheme) {
+ List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef) {
_partitions = partitions;
_states = states;
_liveNodes = liveNodes;
@@ -182,7 +179,6 @@ public class TestAutoRebalanceStrategy {
}
_maxPerNode = maxPerNode;
_stateModelDef = stateModelDef;
- _placementScheme = placementScheme;
_random = new Random();
}
@@ -193,9 +189,10 @@ public class TestAutoRebalanceStrategy {
*/
public void runRepeatedly(int numIterations) {
logger.info("~~~~ Initial State ~~~~~");
+ RebalanceStrategy strategy =
+ new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode);
ZNRecord initialResult =
- new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
- _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+ strategy.computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
_currentMapping = getMapping(initialResult.getListFields());
logger.info(_currentMapping);
getRunResult(_currentMapping, initialResult.getListFields());
@@ -500,8 +497,8 @@ public class TestAutoRebalanceStrategy {
_liveSet.add(node);
_nonLiveSet.remove(node);
- return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
- _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+ return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode).
+ computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
}
/**
@@ -534,8 +531,8 @@ public class TestAutoRebalanceStrategy {
}
}
- return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
- _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+ return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
+ .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
}
/**
@@ -560,8 +557,8 @@ public class TestAutoRebalanceStrategy {
_liveNodes.add(node);
_liveSet.add(node);
- return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
- _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+ return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
+ .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
}
private <T> T getRandomSetElement(Set<T> source) {