You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/09/12 17:09:43 UTC

[4/5] helix git commit: [HELIX-634] Refactor AutoRebalancer to allow configuable placement strategy.

[HELIX-634] Refactor AutoRebalancer to allow configuable placement strategy.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ea0fbbbc
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ea0fbbbc
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ea0fbbbc

Branch: refs/heads/helix-0.6.x
Commit: ea0fbbbce302974b88a2b8253bf06616fd91aa5b
Parents: bc0aa76
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Jun 7 14:42:43 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 12 10:06:33 2016 -0700

----------------------------------------------------------------------
 .../controller/rebalancer/AutoRebalancer.java   | 40 +++++++---
 .../strategy/AutoRebalanceStrategy.java         | 40 +++++-----
 .../controller/strategy/RebalanceStrategy.java  | 52 +++++++++++++
 .../java/org/apache/helix/model/IdealState.java | 23 +++++-
 .../helix/model/builder/IdealStateBuilder.java  | 80 ++++++++++++++++++++
 .../task/GenericTaskAssignmentCalculator.java   |  5 +-
 .../strategy/TestAutoRebalanceStrategy.java     | 25 +++---
 7 files changed, 217 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index e47297f..6682426 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
@@ -35,8 +36,7 @@ import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.controller.strategy.RebalanceStrategy;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.LiveInstance;
@@ -44,6 +44,7 @@ import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 
 /**
@@ -59,14 +60,14 @@ import org.apache.log4j.Logger;
 public class AutoRebalancer implements Rebalancer, MappingCalculator {
   // These should be final, but are initialized in init rather than a constructor
   private HelixManager _manager;
-  private AutoRebalanceStrategy _algorithm;
+  private RebalanceStrategy _rebalanceStrategy;
 
   private static final Logger LOG = Logger.getLogger(AutoRebalancer.class);
 
   @Override
   public void init(HelixManager manager) {
     this._manager = manager;
-    this._algorithm = null;
+    this._rebalanceStrategy = null;
   }
 
   @Override
@@ -127,13 +128,32 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
 
     int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
 
-    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
-    placementScheme.init(_manager);
-    _algorithm =
-        new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition,
-            placementScheme);
+    String rebalanceStrategyName = currentIdealState.getRebalanceStrategy();
+    if (rebalanceStrategyName == null || rebalanceStrategyName.equalsIgnoreCase("default")) {
+      _rebalanceStrategy =
+          new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition);
+    } else {
+      try {
+        _rebalanceStrategy = RebalanceStrategy.class
+            .cast(HelixUtil.loadClass(getClass(), rebalanceStrategyName).newInstance());
+        _rebalanceStrategy.init(resourceName, partitions, stateCountMap, maxPartition);
+      } catch (ClassNotFoundException ex) {
+        throw new HelixException(
+            "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
+            ex);
+      } catch (InstantiationException ex) {
+        throw new HelixException(
+            "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
+            ex);
+      } catch (IllegalAccessException ex) {
+        throw new HelixException(
+            "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
+            ex);
+      }
+    }
+
     ZNRecord newMapping =
-        _algorithm.computePartitionAssignment(liveNodes, currentMapping, allNodes);
+        _rebalanceStrategy.computePartitionAssignment(liveNodes, currentMapping, allNodes);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("currentMapping: " + currentMapping);

http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 11b5b0d..959609f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -36,16 +36,15 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.log4j.Logger;
 
-public class AutoRebalanceStrategy {
-
+public class AutoRebalanceStrategy implements RebalanceStrategy {
   private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);
-
-  private final String _resourceName;
-  private final List<String> _partitions;
-  private final LinkedHashMap<String, Integer> _states;
-  private final int _maximumPerNode;
   private final ReplicaPlacementScheme _placementScheme;
 
+  private String _resourceName;
+  private List<String> _partitions;
+  private LinkedHashMap<String, Integer> _states;
+  private int _maximumPerNode;
+
   private Map<String, Node> _nodeMap;
   private List<Node> _liveNodesList;
   private Map<Integer, String> _stateMap;
@@ -56,24 +55,26 @@ public class AutoRebalanceStrategy {
   private Set<Replica> _orphaned;
 
   public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
-      final LinkedHashMap<String, Integer> states, int maximumPerNode,
-      ReplicaPlacementScheme placementScheme) {
-    _resourceName = resourceName;
-    _partitions = partitions;
-    _states = states;
-    _maximumPerNode = maximumPerNode;
-    if (placementScheme != null) {
-      _placementScheme = placementScheme;
-    } else {
-      _placementScheme = new DefaultPlacementScheme();
-    }
+      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+    init(resourceName, partitions, states, maximumPerNode);
+    _placementScheme = new DefaultPlacementScheme();
   }
 
   public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
       final LinkedHashMap<String, Integer> states) {
-    this(resourceName, partitions, states, Integer.MAX_VALUE, new DefaultPlacementScheme());
+    this(resourceName, partitions, states, Integer.MAX_VALUE);
+  }
+
+  @Override
+  public void init(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+    _resourceName = resourceName;
+    _partitions = partitions;
+    _states = states;
+    _maximumPerNode = maximumPerNode;
   }
 
+  @Override
   public ZNRecord computePartitionAssignment(final List<String> liveNodes,
       final Map<String, Map<String, String>> currentMapping, final List<String> allNodes) {
     int numReplicas = countStateReplicas();
@@ -546,7 +547,6 @@ public class AutoRebalanceStrategy {
 
   /**
    * Counts the total number of replicas given a state-count mapping
-   * @param states
    * @return
    */
   private int countStateReplicas() {

http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
new file mode 100644
index 0000000..4daae82
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
@@ -0,0 +1,52 @@
+package org.apache.helix.controller.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.ZNRecord;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Assignment strategy interface that computes the assignment of partition->instance.
+ */
+public interface RebalanceStrategy {
+  /**
+   * Perform the necessary initialization for the rebalance strategy object.
+   * @param resourceName
+   * @param partitions
+   * @param states
+   * @param maximumPerNode
+   */
+  void init(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states, int maximumPerNode);
+
+  /**
+   * Compute the preference lists and (optional partition-state mapping) for the given resource.
+   *
+   * @param liveNodes
+   * @param currentMapping
+   * @param allNodes
+   * @return
+   */
+  ZNRecord computePartitionAssignment(final List<String> liveNodes,
+      final Map<String, Map<String, String>> currentMapping, final List<String> allNodes);
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 44f4219..7c4cf54 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -53,10 +53,11 @@ public class IdealState extends HelixProperty {
     @Deprecated
     IDEAL_STATE_MODE,
     REBALANCE_MODE,
+    REBALANCER_CLASS_NAME,
     REBALANCE_TIMER_PERIOD,
+    REBALANCE_STRATEGY,
     MAX_PARTITIONS_PER_INSTANCE,
     INSTANCE_GROUP_TAG,
-    REBALANCER_CLASS_NAME,
     HELIX_ENABLED,
     RESOURCE_GROUP_NAME,
     GROUP_ROUTING_ENABLED,
@@ -165,6 +166,26 @@ public class IdealState extends HelixProperty {
   }
 
   /**
+   * Specify the strategy for Helix to use to compute the partition-instance assignment,
+   * i,e, the custom rebalance strategy that implements {@link org.apache.helix.controller.strategy.RebalanceStrategy}
+   *
+   * @param rebalanceStrategy
+   * @return
+   */
+  public void setRebalanceStrategy(String rebalanceStrategy) {
+    _record.setSimpleField(IdealStateProperty.REBALANCE_STRATEGY.name(), rebalanceStrategy);
+  }
+
+  /**
+   * Get the rebalance strategy for this resource.
+   *
+   * @return rebalance strategy, or null if not specified.
+   */
+  public String getRebalanceStrategy() {
+    return _record.getSimpleField(IdealStateProperty.REBALANCE_STRATEGY.name());
+  }
+
+  /**
    * Set the resource group name
    * @param resourceGroupName
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
index d3bc3f2..9ad3023 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
@@ -52,6 +52,17 @@ public abstract class IdealStateBuilder {
    * Helix rebalancer strategies. AUTO, SEMI_AUTO, CUSTOMIZED
    */
   protected IdealState.RebalanceMode rebalancerMode;
+
+  /**
+   * Customized rebalancer class.
+   */
+  private String rebalancerClassName;
+
+  /**
+   * Custom rebalance strategy
+   */
+  private String rebalanceStrategy;
+
   /**
    * A constraint that limits the maximum number of partitions per Node.
    */
@@ -68,6 +79,16 @@ public abstract class IdealStateBuilder {
    */
   private Boolean disableExternalView = null;
 
+  /**
+   * Resource group name.
+   */
+  private String resourceGroupName;
+
+  /**
+   * Whether the resource group routing should be enabled in routingProvider.
+   */
+  private Boolean enableGroupRouting;
+
   protected ZNRecord _record;
 
   /**
@@ -144,6 +165,44 @@ public abstract class IdealStateBuilder {
   }
 
   /**
+   * Set custom rebalancer class name.
+   * @return IdealStateBuilder
+   */
+  public IdealStateBuilder setRebalancerClass(String rebalancerClassName) {
+    this.rebalancerClassName = rebalancerClassName;
+    return this;
+  }
+
+  /**
+   * Set custom rebalance strategy name.
+   * @param rebalanceStrategy
+   * @return
+   */
+  public IdealStateBuilder setRebalanceStrategy(String rebalanceStrategy) {
+    this.rebalanceStrategy = rebalanceStrategy;
+    return this;
+  }
+
+  /**
+   *
+   * @param resourceGroupName
+   * @return
+   */
+  public IdealStateBuilder setResourceGroupName(String resourceGroupName) {
+    this.resourceGroupName = resourceGroupName;
+    return this;
+  }
+
+  /**
+   * Enable Group Routing for this resource.
+   * @return
+   */
+  public IdealStateBuilder enableGroupRouting() {
+    this.enableGroupRouting = true;
+    return this;
+  }
+
+  /**
    * @return
    */
   public IdealState build() {
@@ -154,10 +213,31 @@ public abstract class IdealStateBuilder {
     idealstate.setStateModelFactoryName(stateModelFactoryName);
     idealstate.setRebalanceMode(rebalancerMode);
     idealstate.setReplicas("" + numReplica);
+
+    if (rebalancerClassName != null) {
+      idealstate.setRebalancerClassName(rebalancerClassName);
+    }
+
+    if (rebalanceStrategy != null) {
+      idealstate.setRebalanceStrategy(rebalanceStrategy);
+    }
+
+    if (maxPartitionsPerNode > 0) {
+      idealstate.setMaxPartitionsPerInstance(maxPartitionsPerNode);
+    }
+
     if (disableExternalView != null) {
       idealstate.setDisableExternalView(disableExternalView);
     }
 
+    if (resourceGroupName != null) {
+      idealstate.setResourceGroupName(resourceGroupName);
+    }
+
+    if (enableGroupRouting != null) {
+      idealstate.enableGroupRouting(enableGroupRouting);
+    }
+
     if (!idealstate.isValid()) {
       throw new HelixException("invalid ideal-state: " + idealstate);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index b0a1a33..623357f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -33,6 +33,7 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.RebalanceStrategy;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Partition;
@@ -121,9 +122,7 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
     }
 
     // Get the assignment keyed on partition
-    AutoRebalanceStrategy strategy =
-        new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE,
-            new AutoRebalanceStrategy.DefaultPlacementScheme());
+    RebalanceStrategy strategy = new AutoRebalanceStrategy(resourceId, partitions, states);
     List<String> allNodes =
         Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache));
     Collections.sort(allNodes);

http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index 985d0c8..adc92d6 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -116,8 +116,7 @@ public class TestAutoRebalanceStrategy {
     StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states);
 
     new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode,
-        stateModelDef, new AutoRebalanceStrategy.DefaultPlacementScheme())
-        .runRepeatedly(numIterations);
+        stateModelDef).runRepeatedly(numIterations);
   }
 
   /**
@@ -157,13 +156,11 @@ public class TestAutoRebalanceStrategy {
     private List<String> _allNodes;
     private int _maxPerNode;
     private StateModelDefinition _stateModelDef;
-    private ReplicaPlacementScheme _placementScheme;
     private Random _random;
 
     public AutoRebalanceTester(List<String> partitions, LinkedHashMap<String, Integer> states,
         List<String> liveNodes, Map<String, Map<String, String>> currentMapping,
-        List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef,
-        ReplicaPlacementScheme placementScheme) {
+        List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef) {
       _partitions = partitions;
       _states = states;
       _liveNodes = liveNodes;
@@ -182,7 +179,6 @@ public class TestAutoRebalanceStrategy {
       }
       _maxPerNode = maxPerNode;
       _stateModelDef = stateModelDef;
-      _placementScheme = placementScheme;
       _random = new Random();
     }
 
@@ -193,9 +189,10 @@ public class TestAutoRebalanceStrategy {
      */
     public void runRepeatedly(int numIterations) {
       logger.info("~~~~ Initial State ~~~~~");
+      RebalanceStrategy strategy =
+          new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode);
       ZNRecord initialResult =
-          new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
-              _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+          strategy.computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
       _currentMapping = getMapping(initialResult.getListFields());
       logger.info(_currentMapping);
       getRunResult(_currentMapping, initialResult.getListFields());
@@ -500,8 +497,8 @@ public class TestAutoRebalanceStrategy {
       _liveSet.add(node);
       _nonLiveSet.remove(node);
 
-      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
-          _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode).
+          computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
     }
 
     /**
@@ -534,8 +531,8 @@ public class TestAutoRebalanceStrategy {
         }
       }
 
-      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
-          _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
+          .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
     }
 
     /**
@@ -560,8 +557,8 @@ public class TestAutoRebalanceStrategy {
       _liveNodes.add(node);
       _liveSet.add(node);
 
-      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
-          _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
+          .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
     }
 
     private <T> T getRandomSetElement(Set<T> source) {