You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2020/02/10 01:14:07 UTC

[hadoop-ozone] branch HDDS-1564 updated: HDDS-2923 Add fall-back protection for rack awareness in pipeline creation. (#516)

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-1564
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/HDDS-1564 by this push:
     new 43845da  HDDS-2923 Add fall-back protection for rack awareness in pipeline creation. (#516)
43845da is described below

commit 43845daa30822c9fb259acaafc1748d633e76fc6
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Mon Feb 10 09:13:57 2020 +0800

    HDDS-2923 Add fall-back protection for rack awareness in pipeline creation. (#516)
---
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java | 81 ++++++++++++++++------
 .../scm/pipeline/TestPipelinePlacementPolicy.java  | 56 +++++++++++++++
 2 files changed, 114 insertions(+), 23 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 9d78063..0f30449 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -208,6 +208,29 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     }
   }
 
+  // Fall back logic for node pick up.
+  DatanodeDetails fallBackPickNodes(
+      List<DatanodeDetails> nodeSet, List<DatanodeDetails> excludedNodes)
+      throws SCMException{
+    DatanodeDetails node;
+    if (excludedNodes == null || excludedNodes.isEmpty()) {
+      node = chooseNode(nodeSet);
+    } else {
+      List<DatanodeDetails> inputNodes = nodeSet.stream()
+          .filter(p -> !excludedNodes.contains(p)).collect(Collectors.toList());
+      node = chooseNode(inputNodes);
+    }
+
+    if (node == null) {
+      String msg = String.format("Unable to find fall back node in" +
+          " pipeline allocation. nodeSet size: {}", nodeSet.size());
+      LOG.warn(msg);
+      throw new SCMException(msg,
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+    return node;
+  }
+
   /**
    * Get result set based on the pipeline placement algorithm which considers
    * network topology and rack awareness.
@@ -220,50 +243,59 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
   public List<DatanodeDetails> getResultSet(
       int nodesRequired, List<DatanodeDetails> healthyNodes)
       throws SCMException {
+    if (nodesRequired != HddsProtos.ReplicationFactor.THREE.getNumber()) {
+      throw new SCMException("Nodes required number is not supported: " +
+          nodesRequired, SCMException.ResultCodes.INVALID_CAPACITY);
+    }
+
+    // Assume rack awareness is not enabled.
+    boolean rackAwareness = false;
     List <DatanodeDetails> results = new ArrayList<>(nodesRequired);
     // Since nodes are widely distributed, the results should be selected
     // base on distance in topology, rack awareness and load balancing.
     List<DatanodeDetails> exclude = new ArrayList<>();
     // First choose an anchor nodes randomly
     DatanodeDetails anchor = chooseNode(healthyNodes);
-    if (anchor == null) {
-      LOG.warn("Unable to find healthy node for anchor(first) node." +
-              " Required nodes: {}, Found nodes: {}",
-          nodesRequired, results.size());
-      throw new SCMException("Unable to find required number of nodes.",
+    if (anchor != null) {
+      results.add(anchor);
+      exclude.add(anchor);
+    } else {
+      LOG.warn("Unable to find healthy node for anchor(first) node.");
+      throw new SCMException("Unable to find anchor node.",
           SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("First node chosen: {}", anchor);
     }
 
-    results.add(anchor);
-    exclude.add(anchor);
 
     // Choose the second node on different racks from anchor.
-    DatanodeDetails nodeOnDifferentRack = chooseNodeBasedOnRackAwareness(
+    DatanodeDetails nextNode = chooseNodeBasedOnRackAwareness(
         healthyNodes, exclude,
         nodeManager.getClusterNetworkTopologyMap(), anchor);
-    if (nodeOnDifferentRack == null) {
-      LOG.warn("Pipeline Placement: Unable to find 2nd node on different " +
-          "racks that meets the criteria. Required nodes: {}, Found nodes:" +
-          " {}", nodesRequired, results.size());
-      throw new SCMException("Unable to find required number of nodes.",
-          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Second node chosen: {}", nodeOnDifferentRack);
+    if (nextNode != null) {
+      // Rack awareness is detected.
+      rackAwareness = true;
+      results.add(nextNode);
+      exclude.add(nextNode);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Second node chosen: {}", nextNode);
+      }
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Pipeline Placement: Unable to find 2nd node on different " +
+            "rack based on rack awareness.");
+      }
     }
 
-    results.add(nodeOnDifferentRack);
-    exclude.add(nodeOnDifferentRack);
-
     // Then choose nodes close to anchor based on network topology
     int nodesToFind = nodesRequired - results.size();
     for (int x = 0; x < nodesToFind; x++) {
-      // invoke the choose function defined in the derived classes.
-      DatanodeDetails pick = chooseNodeFromNetworkTopology(
-          nodeManager.getClusterNetworkTopologyMap(), anchor, exclude);
+      // Pick remaining nodes based on the existence of rack awareness.
+      DatanodeDetails pick = rackAwareness
+          ? chooseNodeFromNetworkTopology(
+              nodeManager.getClusterNetworkTopologyMap(), anchor, exclude)
+          : fallBackPickNodes(healthyNodes, exclude);
       if (pick != null) {
         results.add(pick);
         exclude.add(pick);
@@ -293,6 +325,9 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
   @Override
   public DatanodeDetails chooseNode(
       List<DatanodeDetails> healthyNodes) {
+    if (healthyNodes == null || healthyNodes.isEmpty()) {
+      return null;
+    }
     int firstNodeNdx = getRand().nextInt(healthyNodes.size());
     int secondNodeNdx = getRand().nextInt(healthyNodes.size());
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index b9aa9af..daad808 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -83,10 +83,66 @@ public class TestPipelinePlacementPolicy {
     DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnRackAwareness(
         healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
         topologyWithDifRacks, anchor);
+    Assert.assertNotNull(nextNode);
     Assert.assertFalse(anchor.getNetworkLocation().equals(
         nextNode.getNetworkLocation()));
   }
 
+  @Test
+  public void testFallBackPickNodes() {
+    List<DatanodeDetails> healthyNodes = overWriteLocationInNodes(
+        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY));
+    DatanodeDetails node;
+    try {
+      node = placementPolicy.fallBackPickNodes(healthyNodes, null);
+      Assert.assertNotNull(node);
+    } catch (SCMException e) {
+      Assert.fail("Should not reach here.");
+    }
+
+    // when input nodeSet are all excluded.
+    List<DatanodeDetails> exclude = healthyNodes;
+    try {
+      node = placementPolicy.fallBackPickNodes(healthyNodes, exclude);
+      Assert.assertNull(node);
+    } catch (SCMException e) {
+      Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE,
+          e.getResult());
+    } catch (Exception ex) {
+      Assert.fail("Should not reach here.");
+    }
+  }
+
+  @Test
+  public void testRackAwarenessNotEnabledWithFallBack() throws SCMException{
+    List<DatanodeDetails> healthyNodes =
+        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+    DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes);
+    DatanodeDetails randomNode = placementPolicy.chooseNode(healthyNodes);
+    // rack awareness is not enabled.
+    Assert.assertTrue(anchor.getNetworkLocation().equals(
+        randomNode.getNetworkLocation()));
+
+    NetworkTopology topology = new NetworkTopologyImpl(new Configuration());
+    DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnRackAwareness(
+        healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+        topology, anchor);
+    // RackAwareness should not be able to choose any node.
+    Assert.assertNull(nextNode);
+
+    // PlacementPolicy should still be able to pick a set of 3 nodes.
+    int numOfNodes = HddsProtos.ReplicationFactor.THREE.getNumber();
+    List<DatanodeDetails> results = placementPolicy
+        .getResultSet(numOfNodes, healthyNodes);
+    
+    Assert.assertEquals(numOfNodes, results.size());
+    // All nodes are on same rack.
+    Assert.assertEquals(results.get(0).getNetworkLocation(),
+        results.get(1).getNetworkLocation());
+    Assert.assertEquals(results.get(0).getNetworkLocation(),
+        results.get(2).getNetworkLocation());
+  }
+
   private final static Node[] NODES = new NodeImpl[] {
       new NodeImpl("h1", "/r1", NetConstants.NODE_COST_DEFAULT),
       new NodeImpl("h2", "/r1", NetConstants.NODE_COST_DEFAULT),


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org