You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2020/04/21 15:15:44 UTC

[hadoop-ozone] branch master updated: HDDS-3139. Pipeline placement should max out pipeline usage (#668)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new fac26e1  HDDS-3139. Pipeline placement should max out pipeline usage (#668)
fac26e1 is described below

commit fac26e134b6ec6c0cc85e1dc867d66aea82c0b8a
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Tue Apr 21 23:15:32 2020 +0800

    HDDS-3139. Pipeline placement should max out pipeline usage (#668)
---
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java | 134 +++++++++------------
 .../scm/pipeline/TestPipelinePlacementPolicy.java  |  76 ++++++++++--
 2 files changed, 124 insertions(+), 86 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index b6a6858..68fe65d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -25,16 +25,14 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
-import org.apache.hadoop.hdds.scm.net.Node;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -76,19 +74,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
         ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
   }
 
-  /**
-   * Returns true if this node meets the criteria.
-   *
-   * @param datanodeDetails DatanodeDetails
-   * @param nodesRequired nodes required count
-   * @return true if we have enough space.
-   */
-  @VisibleForTesting
-  boolean meetCriteria(DatanodeDetails datanodeDetails, int nodesRequired) {
-    if (heavyNodeCriteria == 0) {
-      // no limit applied.
-      return true;
-    }
+  int currentPipelineCount(DatanodeDetails datanodeDetails, int nodesRequired) {
+
     // Datanodes from pipeline in some states can also be considered available
     // for pipeline allocation. Thus the number of these pipeline shall be
     // deducted from total heaviness calculation.
@@ -110,21 +97,16 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
         pipelineNumDeductable++;
       }
     }
-    boolean meet = (nodeManager.getPipelinesCount(datanodeDetails)
-        - pipelineNumDeductable) < heavyNodeCriteria;
-    if (!meet && LOG.isDebugEnabled()) {
-      LOG.debug("Pipeline Placement: can't place more pipeline on heavy " +
-          "datanodeļ¼š " + datanodeDetails.getUuid().toString() +
-          " Heaviness: " + nodeManager.getPipelinesCount(datanodeDetails) +
-          " limit: " + heavyNodeCriteria);
-    }
-    return meet;
+    return pipelines.size() - pipelineNumDeductable;
   }
 
+
+
   /**
    * Filter out viable nodes based on
    * 1. nodes that are healthy
    * 2. nodes that are not too heavily engaged in other pipelines
+   * The results are sorted based on pipeline count of each node.
    *
    * @param excludedNodes - excluded nodes
    * @param nodesRequired - number of datanodes required.
@@ -154,8 +136,15 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
 
     // filter nodes that meet the size and pipeline engagement criteria.
     // Pipeline placement doesn't take node space left into account.
+    // Sort the DNs by pipeline load.
+    // TODO check if sorting could cause performance issue: HDDS-3466.
     List<DatanodeDetails> healthyList = healthyNodes.stream()
-        .filter(d -> meetCriteria(d, nodesRequired))
+        .map(d ->
+            new DnWithPipelines(d, currentPipelineCount(d, nodesRequired)))
+        .filter(d ->
+            ((d.getPipelines() < heavyNodeCriteria) || heavyNodeCriteria == 0))
+        .sorted(Comparator.comparingInt(DnWithPipelines::getPipelines))
+        .map(d -> d.getDn())
         .collect(Collectors.toList());
 
     if (healthyList.size() < nodesRequired) {
@@ -253,7 +242,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     // Since nodes are widely distributed, the results should be selected
     // base on distance in topology, rack awareness and load balancing.
     List<DatanodeDetails> exclude = new ArrayList<>();
-    // First choose an anchor nodes randomly
+    // First choose an anchor node.
     DatanodeDetails anchor = chooseNode(healthyNodes);
     if (anchor != null) {
       results.add(anchor);
@@ -291,8 +280,9 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
       // Pick remaining nodes based on the existence of rack awareness.
       DatanodeDetails pick = null;
       if (rackAwareness) {
-        pick = chooseNodeFromNetworkTopology(
-            nodeManager.getClusterNetworkTopologyMap(), anchor, exclude);
+        pick = chooseNodeBasedOnSameRack(
+            healthyNodes, exclude,
+            nodeManager.getClusterNetworkTopologyMap(), anchor);
       }
       // fall back protection
       if (pick == null) {
@@ -333,24 +323,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     if (healthyNodes == null || healthyNodes.isEmpty()) {
       return null;
     }
-    int firstNodeNdx = getRand().nextInt(healthyNodes.size());
-    int secondNodeNdx = getRand().nextInt(healthyNodes.size());
-
-    DatanodeDetails datanodeDetails;
-    // There is a possibility that both numbers will be same.
-    // if that is so, we just return the node.
-    if (firstNodeNdx == secondNodeNdx) {
-      datanodeDetails = healthyNodes.get(firstNodeNdx);
-    } else {
-      DatanodeDetails firstNodeDetails = healthyNodes.get(firstNodeNdx);
-      DatanodeDetails secondNodeDetails = healthyNodes.get(secondNodeNdx);
-      SCMNodeMetric firstNodeMetric =
-          nodeManager.getNodeStat(firstNodeDetails);
-      SCMNodeMetric secondNodeMetric =
-          nodeManager.getNodeStat(secondNodeDetails);
-      datanodeDetails = firstNodeMetric.isGreater(secondNodeMetric.get())
-          ? firstNodeDetails : secondNodeDetails;
-    }
+    DatanodeDetails datanodeDetails = healthyNodes.get(0);
     healthyNodes.remove(datanodeDetails);
     return datanodeDetails;
   }
@@ -373,13 +346,31 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
       return null;
     }
 
-    for (DatanodeDetails node : healthyNodes) {
-      if (excludedNodes.contains(node) ||
-          anchor.getNetworkLocation().equals(node.getNetworkLocation())) {
-        continue;
-      } else {
-        return node;
-      }
+    List<DatanodeDetails> nodesOnOtherRack = healthyNodes.stream().filter(
+        p -> !excludedNodes.contains(p)
+            && !anchor.getNetworkLocation().equals(p.getNetworkLocation()))
+        .collect(Collectors.toList());
+    if (!nodesOnOtherRack.isEmpty()) {
+      return nodesOnOtherRack.get(0);
+    }
+    return null;
+  }
+
+  @VisibleForTesting
+  protected DatanodeDetails chooseNodeBasedOnSameRack(
+      List<DatanodeDetails> healthyNodes,  List<DatanodeDetails> excludedNodes,
+      NetworkTopology networkTopology, DatanodeDetails anchor) {
+    Preconditions.checkArgument(networkTopology != null);
+    if (checkAllNodesAreEqual(networkTopology)) {
+      return null;
+    }
+
+    List<DatanodeDetails> nodesOnSameRack = healthyNodes.stream().filter(
+        p -> !excludedNodes.contains(p)
+            && anchor.getNetworkLocation().equals(p.getNetworkLocation()))
+        .collect(Collectors.toList());
+    if (!nodesOnSameRack.isEmpty()) {
+      return nodesOnSameRack.get(0);
     }
     return null;
   }
@@ -398,31 +389,22 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     return (topology.getNumOfNodes(topology.getMaxLevel() - 1) == 1);
   }
 
-  /**
-   * Choose node based on network topology.
-   * @param networkTopology network topology
-   * @param anchor anchor datanode to start with
-   * @param excludedNodes excluded datanodes
-   * @return chosen datanode
-   */
-  @VisibleForTesting
-  protected DatanodeDetails chooseNodeFromNetworkTopology(
-      NetworkTopology networkTopology, DatanodeDetails anchor,
-      List<DatanodeDetails> excludedNodes) {
-    Preconditions.checkArgument(networkTopology != null);
+  private static class DnWithPipelines {
+    private DatanodeDetails dn;
+    private int pipelines;
+
+    DnWithPipelines(DatanodeDetails dn, int pipelines) {
+      this.dn = dn;
+      this.pipelines = pipelines;
+    }
 
-    Collection<Node> excluded = new ArrayList<>();
-    if (excludedNodes != null && excludedNodes.size() != 0) {
-      excluded.addAll(excludedNodes);
+    public int getPipelines() {
+      return pipelines;
     }
 
-    Node pick = networkTopology.chooseRandom(
-        anchor.getNetworkLocation(), excluded);
-    DatanodeDetails pickedNode = (DatanodeDetails) pick;
-    if (pickedNode == null) {
-      LOG.debug("Pick node is null, excluded nodes {}, anchor {}.",
-          excluded, anchor);
+    public DatanodeDetails getDn() {
+      return dn;
     }
-    return pickedNode;
   }
+
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index b8b8622..426a643 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -41,27 +41,37 @@ import org.apache.hadoop.hdds.scm.net.NodeSchema;
 import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
 import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
 
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
 
 /**
  * Test for PipelinePlacementPolicy.
  */
 public class TestPipelinePlacementPolicy {
   private MockNodeManager nodeManager;
+  private PipelineStateManager stateManager;
   private OzoneConfiguration conf;
   private PipelinePlacementPolicy placementPolicy;
   private NetworkTopologyImpl cluster;
   private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10;
+  private static final int PIPELINE_LOAD_LIMIT = 5;
 
   private List<DatanodeDetails> nodesWithOutRackAwareness = new ArrayList<>();
   private List<DatanodeDetails> nodesWithRackAwareness = new ArrayList<>();
 
+  static final Logger LOG =
+      LoggerFactory.getLogger(TestPipelinePlacementPolicy.class);
+
   @Before
   public void init() throws Exception {
     cluster = initTopology();
@@ -69,9 +79,10 @@ public class TestPipelinePlacementPolicy {
     nodeManager = new MockNodeManager(cluster, getNodesWithRackAwareness(),
         false, PIPELINE_PLACEMENT_MAX_NODES_COUNT);
     conf = new OzoneConfiguration();
-    conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 5);
+    conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, PIPELINE_LOAD_LIMIT);
+    stateManager = new PipelineStateManager();
     placementPolicy = new PipelinePlacementPolicy(
-        nodeManager, new PipelineStateManager(), conf);
+        nodeManager, stateManager, conf);
   }
 
   private NetworkTopologyImpl initTopology() {
@@ -85,11 +96,14 @@ public class TestPipelinePlacementPolicy {
 
   private List<DatanodeDetails> getNodesWithRackAwareness() {
     List<DatanodeDetails> datanodes = new ArrayList<>();
-    for (Node node : NODES) {
+    int iter = 0;
+    int delimiter = NODES.length;
+    while (iter < PIPELINE_PLACEMENT_MAX_NODES_COUNT) {
       DatanodeDetails datanode = overwriteLocationInNode(
-          getNodesWithoutRackAwareness(), node);
+          getNodesWithoutRackAwareness(), NODES[iter % delimiter]);
       nodesWithRackAwareness.add(datanode);
       datanodes.add(datanode);
+      iter++;
     }
     return datanodes;
   }
@@ -101,7 +115,7 @@ public class TestPipelinePlacementPolicy {
   }
 
   @Test
-  public void testChooseNodeBasedOnNetworkTopology() throws SCMException {
+  public void testChooseNodeBasedOnNetworkTopology() {
     DatanodeDetails anchor = placementPolicy.chooseNode(nodesWithRackAwareness);
     // anchor should be removed from healthyNodes after being chosen.
     Assert.assertFalse(nodesWithRackAwareness.contains(anchor));
@@ -109,8 +123,11 @@ public class TestPipelinePlacementPolicy {
     List<DatanodeDetails> excludedNodes =
         new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT);
     excludedNodes.add(anchor);
-    DatanodeDetails nextNode = placementPolicy.chooseNodeFromNetworkTopology(
-        nodeManager.getClusterNetworkTopologyMap(), anchor, excludedNodes);
+    DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnSameRack(
+        nodesWithRackAwareness, excludedNodes,
+        nodeManager.getClusterNetworkTopologyMap(), anchor);
+    //DatanodeDetails nextNode = placementPolicy.chooseNodeFromNetworkTopology(
+    //    nodeManager.getClusterNetworkTopologyMap(), anchor, excludedNodes);
     Assert.assertFalse(excludedNodes.contains(nextNode));
     // next node should not be the same as anchor.
     Assert.assertTrue(anchor.getUuid() != nextNode.getUuid());
@@ -149,6 +166,45 @@ public class TestPipelinePlacementPolicy {
   }
   
   @Test
+  public void testPickLowestLoadAnchor() throws IOException{
+    List<DatanodeDetails> healthyNodes = nodeManager
+        .getNodes(HddsProtos.NodeState.HEALTHY);
+
+    int maxPipelineCount = PIPELINE_LOAD_LIMIT * healthyNodes.size()
+        / HddsProtos.ReplicationFactor.THREE.getNumber();
+    for (int i = 0; i < maxPipelineCount; i++) {
+      try {
+        List<DatanodeDetails> nodes = placementPolicy.chooseDatanodes(null,
+            null, HddsProtos.ReplicationFactor.THREE.getNumber(), 0);
+
+        Pipeline pipeline = Pipeline.newBuilder()
+            .setId(PipelineID.randomId())
+            .setState(Pipeline.PipelineState.ALLOCATED)
+            .setType(HddsProtos.ReplicationType.RATIS)
+            .setFactor(HddsProtos.ReplicationFactor.THREE)
+            .setNodes(nodes)
+            .build();
+        nodeManager.addPipeline(pipeline);
+        stateManager.addPipeline(pipeline);
+      } catch (SCMException e) {
+        break;
+      }
+    }
+
+    // Every node should be evenly used.
+    int averageLoadOnNode = maxPipelineCount *
+        HddsProtos.ReplicationFactor.THREE.getNumber() / healthyNodes.size();
+    for (DatanodeDetails node : healthyNodes) {
+      Assert.assertTrue(nodeManager.getPipelinesCount(node)
+          >= averageLoadOnNode);
+    }
+    
+    // Should max out pipeline usage.
+    Assert.assertEquals(maxPipelineCount,
+        stateManager.getPipelines(HddsProtos.ReplicationType.RATIS).size());
+  }
+
+  @Test
   public void testChooseNodeBasedOnRackAwareness() {
     List<DatanodeDetails> healthyNodes = overWriteLocationInNodes(
         nodeManager.getNodes(HddsProtos.NodeState.HEALTHY));


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org