You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/08 17:59:40 UTC

[05/38] helix git commit: Add Multi-round CRUSH rebalance strategy.

Add Multi-round CRUSH rebalance strategy.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0f7c3e42
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0f7c3e42
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0f7c3e42

Branch: refs/heads/helix-0.6.x
Commit: 0f7c3e42080ba8e2b17e36ca1c5c51c6209b0f03
Parents: 7d0885c
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Jun 27 15:46:13 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Sun Feb 5 18:50:52 2017 -0800

----------------------------------------------------------------------
 .../MultiRoundCrushRebalanceStrategy.java       | 327 +++++++++++++++++++
 .../rebalancer/topology/Topology.java           |  67 +++-
 .../integration/TestCrushAutoRebalance.java     |   5 +-
 3 files changed, 397 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/0f7c3e42/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
new file mode 100644
index 0000000..93bd980
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
@@ -0,0 +1,327 @@
+package org.apache.helix.controller.rebalancer.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm;
+import org.apache.helix.controller.rebalancer.topology.Node;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.model.InstanceConfig;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.util.JenkinsHash;
+
+
+/**
+ * Multi-round CRUSH partition mapping strategy.
+ * This gives more even partition distribution in case of small number of partitions,
+ * but number of partitions to be reshuffled during node outage could be higher than CrushRebalanceStrategy.
+ */
+public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy {
+  private String _resourceName;
+  private List<String> _partitions;
+  private Topology _clusterTopo;
+  private int _replicas;
+  private LinkedHashMap<String, Integer> _stateCountMap;
+
+  private final int MAX_ITERNATION = 5;
+
+  @Override
+  public void init(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+    _resourceName = resourceName;
+    _partitions = partitions;
+    _replicas = countStateReplicas(states);
+    _stateCountMap = states;
+  }
+
+  /**
+   * Compute the preference lists and (optional partition-state mapping) for the given resource.
+   *
+   * @param allNodes       All instances
+   * @param liveNodes      List of live instances
+   * @param currentMapping current replica mapping
+   * @param clusterData    cluster data
+   * @return
+   * @throws HelixException if a map can not be found
+   */
+  @Override public ZNRecord computePartitionAssignment(final List<String> allNodes,
+      final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping,
+      ClusterDataCache clusterData) throws HelixException {
+    Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+    _clusterTopo = new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
+    Node root = _clusterTopo.getRootNode();
+
+    Map<String, List<Node>> zoneMapping = new HashMap<String, List<Node>>();
+    for (int i = 0; i < _partitions.size(); i++) {
+      String partitionName = _partitions.get(i);
+      long pData = partitionName.hashCode();
+
+      // select zones for this partition
+      List<Node> zones = select(root, _clusterTopo.getFaultZoneType(), pData, _replicas);
+      zoneMapping.put(partitionName, zones);
+    }
+
+    /* map the position in preference list to the state */
+    Map<Integer, String> idxStateMap = new HashMap<Integer, String>();
+    int i = 0;
+    for (Map.Entry<String, Integer> e : _stateCountMap.entrySet()) {
+      String state = e.getKey();
+      int count = e.getValue();
+      for (int j = 0; j < count; j++) {
+        idxStateMap.put(i + j, state);
+      }
+      i += count;
+    }
+
+    // Final mapping <partition, state> -> list(node)
+    Map<String, Map<String, List<Node>>> partitionStateMapping =
+        new HashMap<String, Map<String, List<Node>>>();
+
+    for (Node zone : _clusterTopo.getFaultZones()) {
+      // partition state -> list(partitions)
+      LinkedHashMap<String, List<String>> statePartitionMap =
+          new LinkedHashMap<String, List<String>>();
+      // TODO: move this outside?
+      for (Map.Entry<String, List<Node>> e : zoneMapping.entrySet()) {
+        String partition = e.getKey();
+        List<Node> zones = e.getValue();
+        for (int k = 0; k < zones.size(); k++) {
+          if (zones.get(k).equals(zone)) {
+            String state = idxStateMap.get(k);
+            if (!statePartitionMap.containsKey(state)) {
+              statePartitionMap.put(state, new ArrayList<String>());
+            }
+            statePartitionMap.get(state).add(partition);
+          }
+        }
+      }
+
+      for (String state : _stateCountMap.keySet()) {
+        List<String> partitions = statePartitionMap.get(state);
+        if (partitions != null && !partitions.isEmpty()) {
+          Map<String, Node> assignments = singleZoneMapping(zone, partitions);
+          for (String partition : assignments.keySet()) {
+            Node node = assignments.get(partition);
+            if (!partitionStateMapping.containsKey(partition)) {
+              partitionStateMapping.put(partition, new HashMap<String, List<Node>>());
+            }
+            Map<String, List<Node>> stateMapping = partitionStateMapping.get(partition);
+            if (!stateMapping.containsKey(state)) {
+              stateMapping.put(state, new ArrayList<Node>());
+            }
+            stateMapping.get(state).add(node);
+          }
+        }
+      }
+    }
+
+    return generateZNRecord(_resourceName, _partitions, partitionStateMapping);
+  }
+
+  private ZNRecord generateZNRecord(String resource, List<String> partitions,
+      Map<String, Map<String, List<Node>>> partitionStateMapping) {
+    Map<String, List<String>> newPreferences = new HashMap<String, List<String>>();
+    for (int i = 0; i < partitions.size(); i++) {
+      String partitionName = partitions.get(i);
+      Map<String, List<Node>> stateNodeMap = partitionStateMapping.get(partitionName);
+
+      for (String state : _stateCountMap.keySet()) {
+        List<Node> nodes = stateNodeMap.get(state);
+        List<String> nodeList = new ArrayList<String>();
+        for (int j = 0; j < nodes.size(); j++) {
+          nodeList.add(nodes.get(j).getName());
+        }
+        if (!newPreferences.containsKey(partitionName)) {
+          newPreferences.put(partitionName, new ArrayList<String>());
+        }
+        newPreferences.get(partitionName).addAll(nodeList);
+      }
+    }
+    ZNRecord result = new ZNRecord(resource);
+    result.setListFields(newPreferences);
+
+    return result;
+  }
+
+  /**
+   * Compute mapping of partition to node in a single zone.
+   * Assumption: A partition should have only one replica at one zone.
+   * Will apply CRUSH multiple times until all partitions are mostly even distributed.
+   *
+   * @param zone       the zone
+   * @param partitions partitions to be assigned to nodes in the given zone.
+   * @return partition to node mapping in this zone.
+   */
+  private Map<String, Node> singleZoneMapping(Node zone, List<String> partitions) {
+    if (zone.isFailed() || zone.getWeight() == 0 || partitions.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    long totalWeight = zone.getWeight();
+    int totalPartition = partitions.size();
+
+    // node to all its assigned partitions.
+    Map<Node, List<String>> nodePartitionsMap = new HashMap<Node, List<String>>();
+
+    List<String> partitionsToAssign = new ArrayList<String>(partitions);
+    Map<Node, List<String>> toRemovedMap = new HashMap<Node, List<String>>();
+
+    int iteration = 0;
+    Node root = zone;
+    while (iteration++ < MAX_ITERNATION) {
+      for (Map.Entry<Node, List<String>> e : toRemovedMap.entrySet()) {
+        List<String> curAssignedPartitions = nodePartitionsMap.get(e.getKey());
+        List<String> toRemoved = e.getValue();
+        curAssignedPartitions.removeAll(toRemoved);
+        partitionsToAssign.addAll(toRemoved);
+      }
+
+      for (String p : partitionsToAssign) {
+        long pData = p.hashCode();
+        List<Node> nodes = select(root, _clusterTopo.getEndNodeType(), pData, 1);
+        for (Node n : nodes) {
+          if (!nodePartitionsMap.containsKey(n)) {
+            nodePartitionsMap.put(n, new ArrayList<String>());
+          }
+          nodePartitionsMap.get(n).add(p);
+        }
+      }
+
+      Map<String, Integer> newNodeWeight = new HashMap<String, Integer>();
+      Set<String> completedNodes = new HashSet<String>();
+      for (Node node : Topology.getAllLeafNodes(zone)) {
+        if (node.isFailed()) {
+          completedNodes.add(node.getName());
+          continue;
+        }
+        long weight = node.getWeight();
+        double ratio = ((double) weight) / (double) totalWeight;
+        int target = (int) Math.floor(ratio * totalPartition);
+
+        List<String> assignedPatitions = nodePartitionsMap.get(node);
+        int numPartitions = 0;
+        if (assignedPatitions != null) {
+          numPartitions = assignedPatitions.size();
+        }
+        if (numPartitions > target + 1) {
+          int remove = numPartitions - target - 1;
+          Collections.sort(partitions);
+          List<String> toRemoved = new ArrayList<String>(assignedPatitions.subList(0, remove));
+          toRemovedMap.put(node, toRemoved);
+        }
+
+        int missing = target - numPartitions;
+        if (missing > 0) {
+          newNodeWeight.put(node.getName(), missing * 10);
+        } else {
+          completedNodes.add(node.getName());
+        }
+      }
+
+      if (newNodeWeight.isEmpty()) {
+        // already converged
+        break;
+      } else {
+        // iterate more
+        root = _clusterTopo.clone(zone, newNodeWeight, completedNodes);
+      }
+
+      partitionsToAssign.clear();
+    }
+
+    Map<String, Node> partitionMap = new HashMap<String, Node>();
+    for (Map.Entry<Node, List<String>> e : nodePartitionsMap.entrySet()) {
+      Node n = e.getKey();
+      List<String> assigned = e.getValue();
+      for (String p : assigned) {
+        partitionMap.put(p, n);
+      }
+    }
+
+    return partitionMap;
+  }
+
+  /**
+   * Number of retries for finding an appropriate instance for a replica.
+   */
+  private static final int MAX_RETRY = 100;
+  private final JenkinsHash hashFun = new JenkinsHash();
+  private CRUSHPlacementAlgorithm placementAlgorithm = new CRUSHPlacementAlgorithm();
+
+  /**
+   * For given input, select a number of children with given type.
+   * The caller will either get the expected number of selected nodes as a result,
+   * or an exception will be thrown.
+   */
+  private List<Node> select(Node topNode, String nodeType, long data, int rf)
+      throws HelixException {
+    List<Node> zones = new ArrayList<Node>();
+    long input = data;
+    int count = rf;
+    int tries = 0;
+    while (zones.size() < rf) {
+      List<Node> selected = placementAlgorithm
+          .select(topNode, input, rf, nodeType, nodeAlreadySelected(new HashSet<Node>(zones)));
+      // add the racks to the selected racks
+      zones.addAll(selected);
+      count = rf - zones.size();
+      if (count > 0) {
+        input = hashFun.hash(input); // create a different hash value for retrying
+        tries++;
+        if (tries >= MAX_RETRY) {
+          throw new HelixException(
+              String.format("could not find all mappings after %d tries", tries));
+        }
+      }
+    }
+    return zones;
+  }
+
+  /**
+   * Use the predicate to reject already selected zones or nodes.
+   */
+  private Predicate<Node> nodeAlreadySelected(Set<Node> selectedNodes) {
+    return Predicates.not(Predicates.in(selectedNodes));
+  }
+
+  /**
+   * Counts the total number of replicas given a state-count mapping
+   * @return
+   */
+  private int countStateReplicas(Map<String, Integer> stateCountMap) {
+    int total = 0;
+    for (Integer count : stateCountMap.values()) {
+      total += count;
+    }
+    return total;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/0f7c3e42/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
index 1057fad..1d34cbd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
@@ -141,6 +142,60 @@ public class Topology {
   }
 
   /**
+   * Returns all leaf nodes that belong in the tree. Returns itself if this node is a leaf.
+   *
+   * @return
+   */
+  public static List<Node> getAllLeafNodes(Node root) {
+    List<Node> nodes = new ArrayList<Node>();
+    if (root.isLeaf()) {
+      nodes.add(root);
+    } else {
+      for (Node child : root.getChildren()) {
+        nodes.addAll(getAllLeafNodes(child));
+      }
+    }
+    return nodes;
+  }
+
+  /**
+   * Clone a node tree structure, with node weight updated using specified new weight,
+   * and all nodes in @failedNodes as failed.
+   *
+   * @param root          origin root of the tree
+   * @param newNodeWeight map of node name to its new weight. If absent, keep its original weight.
+   * @param failedNodes   set of nodes that need to be failed.
+   * @return new root node.
+   */
+  public static Node clone(Node root, Map<String, Integer> newNodeWeight, Set<String> failedNodes) {
+    Node newRoot = cloneTree(root, newNodeWeight, failedNodes);
+    computeWeight(newRoot);
+    return newRoot;
+  }
+
+  private static Node cloneTree(Node root, Map<String, Integer> newNodeWeight, Set<String> failedNodes) {
+    Node newRoot = new Node(root);
+    if (newNodeWeight.containsKey(root.getName())) {
+      newRoot.setWeight(newNodeWeight.get(root.getName()));
+    }
+    if (failedNodes.contains(root.getName())) {
+      newRoot.setFailed(true);
+      newRoot.setWeight(0);
+    }
+
+    List<Node> children = root.getChildren();
+    if (children != null) {
+      for (int i = 0; i < children.size(); i++) {
+        Node newChild = cloneTree(children.get(i), newNodeWeight, failedNodes);
+        newChild.setParent(root);
+        newRoot.addChild(newChild);
+      }
+    }
+
+    return newRoot;
+  }
+
+  /**
    * Creates a tree representing the cluster structure using default cluster topology definition
    * (i,e no topology definition given and no domain id set).
    */
@@ -172,7 +227,7 @@ public class Topology {
       if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) {
         weight = DEFAULT_NODE_WEIGHT;
       }
-      addEndNode(root, ins, pathValueMap, weight, _liveInstances);
+      root = addEndNode(root, ins, pathValueMap, weight, _liveInstances);
     }
 
     return root;
@@ -280,6 +335,16 @@ public class Topology {
     return bstrTo32bit(h);
   }
 
+  private static void computeWeight(Node node) {
+    int weight = 0;
+    for (Node child : node.getChildren()) {
+      if (!child.isFailed()) {
+        weight += child.getWeight();
+      }
+    }
+    node.setWeight(weight);
+  }
+
   private long bstrTo32bit(byte[] bstr) {
     if (bstr.length < 4) {
       throw new IllegalArgumentException("hashed is less than 4 bytes!");

http://git-wip-us.apache.org/repos/asf/helix/blob/0f7c3e42/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java
index 00a6169..026db1c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java
@@ -19,6 +19,7 @@ package org.apache.helix.integration;
  * under the License.
  */
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.MultiRoundCrushRebalanceStrategy;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -98,7 +99,9 @@ public class TestCrushAutoRebalance extends ZkIntegrationTestBase {
 
   @DataProvider(name = "rebalanceStrategies")
   public static String [][] rebalanceStrategies() {
-    return new String[][] { {"CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName()}};
+    return new String[][] { {"CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName()},
+        {"MultiRoundCrushRebalanceStrategy", MultiRoundCrushRebalanceStrategy.class.getName()}
+    };
   }
 
   @Test(dataProvider = "rebalanceStrategies")