You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2020/03/27 07:30:09 UTC

[hadoop-ozone] branch master updated: HDDS-3179. Pipeline placement based on Topology does not have fallback (#678)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d132ce  HDDS-3179. Pipeline placement based on Topology does not have fallback (#678)
7d132ce is described below

commit 7d132ce38d5d8aeb3b72e770f99881888c2753ee
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Fri Mar 27 15:29:59 2020 +0800

    HDDS-3179. Pipeline placement based on Topology does not have fallback (#678)
---
 .../hadoop/hdds/protocol/DatanodeDetails.java      |   2 +-
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |  37 ++++--
 .../hadoop/hdds/scm/container/MockNodeManager.java |  10 +-
 .../scm/pipeline/TestPipelinePlacementPolicy.java  | 145 ++++++++++++++++-----
 4 files changed, 145 insertions(+), 49 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index a235a4b..28ed36d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -70,7 +70,7 @@ public class DatanodeDetails extends NodeImpl implements
     this.certSerialId = certSerialId;
   }
 
-  protected DatanodeDetails(DatanodeDetails datanodeDetails) {
+  public DatanodeDetails(DatanodeDetails datanodeDetails) {
     super(datanodeDetails.getHostName(), datanodeDetails.getNetworkLocation(),
         datanodeDetails.getCost());
     this.uuid = datanodeDetails.uuid;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 0f30449..e96b120 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -99,9 +99,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
       try {
         pipeline = stateManager.getPipeline(pid);
       } catch (PipelineNotFoundException e) {
-        LOG.error("Pipeline not found in pipeline state manager during" +
-            " pipeline creation. PipelineID: " + pid +
-            " exception: " + e.getMessage());
+        LOG.debug("Pipeline not found in pipeline state manager during" +
+            " pipeline creation. PipelineID: {}", pid, e);
         continue;
       }
       if (pipeline != null &&
@@ -282,26 +281,32 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
         LOG.debug("Second node chosen: {}", nextNode);
       }
     } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Pipeline Placement: Unable to find 2nd node on different " +
-            "rack based on rack awareness.");
-      }
+      LOG.debug("Pipeline Placement: Unable to find 2nd node on different " +
+          "rack based on rack awareness. anchor: {}", anchor);
     }
 
     // Then choose nodes close to anchor based on network topology
     int nodesToFind = nodesRequired - results.size();
     for (int x = 0; x < nodesToFind; x++) {
       // Pick remaining nodes based on the existence of rack awareness.
-      DatanodeDetails pick = rackAwareness
-          ? chooseNodeFromNetworkTopology(
-              nodeManager.getClusterNetworkTopologyMap(), anchor, exclude)
-          : fallBackPickNodes(healthyNodes, exclude);
+      DatanodeDetails pick = null;
+      if (rackAwareness) {
+        pick = chooseNodeFromNetworkTopology(
+            nodeManager.getClusterNetworkTopologyMap(), anchor, exclude);
+      }
+      // fall back protection
+      if (pick == null) {
+        pick = fallBackPickNodes(healthyNodes, exclude);
+        if (rackAwareness) {
+          LOG.debug("Failed to choose node based on topology. Fallback " +
+              "picks node as: {}", pick);
+        }
+      }
+
       if (pick != null) {
         results.add(pick);
         exclude.add(pick);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Remaining node chosen: {}", pick);
-        }
+        LOG.debug("Remaining node chosen: {}", pick);
       }
     }
 
@@ -414,6 +419,10 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     Node pick = networkTopology.chooseRandom(
         anchor.getNetworkLocation(), excluded);
     DatanodeDetails pickedNode = (DatanodeDetails) pick;
+    if (pickedNode == null) {
+      LOG.debug("Pick node is null, excluded nodes {}, anchor {}.",
+          excluded, anchor);
+    }
     return pickedNode;
   }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index cbeef7f..f15bfdd 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -94,6 +94,7 @@ public class MockNodeManager implements NodeManager {
   private ConcurrentMap<String, Set<String>> dnsToUuidMap;
 
   public MockNodeManager(NetworkTopologyImpl clusterMap,
+                         List<DatanodeDetails> nodes,
                          boolean initializeFakeNodes, int nodeCount) {
     this.healthyNodes = new LinkedList<>();
     this.staleNodes = new LinkedList<>();
@@ -104,6 +105,13 @@ public class MockNodeManager implements NodeManager {
     this.dnsToUuidMap = new ConcurrentHashMap<>();
     this.aggregateStat = new SCMNodeStat();
     this.clusterMap = clusterMap;
+    if (!nodes.isEmpty()) {
+      for (int x = 0; x < nodes.size(); x++) {
+        DatanodeDetails node = nodes.get(x);
+        register(node, null, null);
+        populateNodeMetric(node, x);
+      }
+    }
     if (initializeFakeNodes) {
       for (int x = 0; x < nodeCount; x++) {
         DatanodeDetails dd = MockDatanodeDetails.randomDatanodeDetails();
@@ -116,7 +124,7 @@ public class MockNodeManager implements NodeManager {
   }
 
   public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
-    this(new NetworkTopologyImpl(new OzoneConfiguration()),
+    this(new NetworkTopologyImpl(new OzoneConfiguration()), new ArrayList<>(),
         initializeFakeNodes, nodeCount);
   }
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index daad808..fafc4b0 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -35,6 +36,9 @@ import java.util.*;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
 
 /**
  * Test for PipelinePlacementPolicy.
@@ -43,25 +47,55 @@ public class TestPipelinePlacementPolicy {
   private MockNodeManager nodeManager;
   private OzoneConfiguration conf;
   private PipelinePlacementPolicy placementPolicy;
+  private NetworkTopologyImpl cluster;
   private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10;
 
+  private List<DatanodeDetails> nodesWithOutRackAwareness = new ArrayList<>();
+  private List<DatanodeDetails> nodesWithRackAwareness = new ArrayList<>();
+
   @Before
   public void init() throws Exception {
-    nodeManager = new MockNodeManager(true,
-        PIPELINE_PLACEMENT_MAX_NODES_COUNT);
+    cluster = initTopology();
+    // start with nodes with rack awareness.
+    nodeManager = new MockNodeManager(cluster, getNodesWithRackAwareness(),
+        false, PIPELINE_PLACEMENT_MAX_NODES_COUNT);
     conf = new OzoneConfiguration();
     conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 5);
     placementPolicy = new PipelinePlacementPolicy(
         nodeManager, new PipelineStateManager(), conf);
   }
 
+  private NetworkTopologyImpl initTopology() {
+    NodeSchema[] schemas = new NodeSchema[]
+        {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
+    NodeSchemaManager.getInstance().init(schemas, true);
+    NetworkTopologyImpl topology =
+        new NetworkTopologyImpl(NodeSchemaManager.getInstance());
+    return topology;
+  }
+
+  private List<DatanodeDetails> getNodesWithRackAwareness() {
+    List<DatanodeDetails> datanodes = new ArrayList<>();
+    for (Node node : NODES) {
+      DatanodeDetails datanode = overwriteLocationInNode(
+          getNodesWithoutRackAwareness(), node);
+      nodesWithRackAwareness.add(datanode);
+      datanodes.add(datanode);
+    }
+    return datanodes;
+  }
+
+  private DatanodeDetails getNodesWithoutRackAwareness() {
+    DatanodeDetails node = MockDatanodeDetails.randomDatanodeDetails();
+    nodesWithOutRackAwareness.add(node);
+    return node;
+  }
+
   @Test
-  public void testChooseNodeBasedOnNetworkTopology() {
-    List<DatanodeDetails> healthyNodes =
-        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
-    DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes);
+  public void testChooseNodeBasedOnNetworkTopology() throws SCMException {
+    DatanodeDetails anchor = placementPolicy.chooseNode(nodesWithRackAwareness);
     // anchor should be removed from healthyNodes after being chosen.
-    Assert.assertFalse(healthyNodes.contains(anchor));
+    Assert.assertFalse(nodesWithRackAwareness.contains(anchor));
 
     List<DatanodeDetails> excludedNodes =
         new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT);
@@ -69,11 +103,43 @@ public class TestPipelinePlacementPolicy {
     DatanodeDetails nextNode = placementPolicy.chooseNodeFromNetworkTopology(
         nodeManager.getClusterNetworkTopologyMap(), anchor, excludedNodes);
     Assert.assertFalse(excludedNodes.contains(nextNode));
-    // nextNode should not be the same as anchor.
+    // next node should not be the same as anchor.
     Assert.assertTrue(anchor.getUuid() != nextNode.getUuid());
+    // next node should be on the same rack based on topology.
+    Assert.assertEquals(anchor.getNetworkLocation(),
+        nextNode.getNetworkLocation());
   }
 
   @Test
+  public void testChooseNodeWithSingleNodeRack() throws SCMException {
+    // There is only one node on 3 racks altogether.
+    List<DatanodeDetails> datanodes = new ArrayList<>();
+    for (Node node : SINGLE_NODE_RACK) {
+      DatanodeDetails datanode = overwriteLocationInNode(
+          MockDatanodeDetails.randomDatanodeDetails(), node);
+      datanodes.add(datanode);
+    }
+    MockNodeManager localNodeManager = new MockNodeManager(initTopology(),
+        datanodes, false, datanodes.size());
+    PipelinePlacementPolicy localPlacementPolicy = new PipelinePlacementPolicy(
+        localNodeManager, new PipelineStateManager(), conf);
+    int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
+    List<DatanodeDetails> results = localPlacementPolicy.chooseDatanodes(
+        new ArrayList<>(datanodes.size()),
+        new ArrayList<>(datanodes.size()),
+        nodesRequired, 0);
+
+    Assert.assertEquals(nodesRequired, results.size());
+    // 3 nodes should be on different racks.
+    Assert.assertNotEquals(results.get(0).getNetworkLocation(),
+        results.get(1).getNetworkLocation());
+    Assert.assertNotEquals(results.get(0).getNetworkLocation(),
+        results.get(2).getNetworkLocation());
+    Assert.assertNotEquals(results.get(1).getNetworkLocation(),
+        results.get(2).getNetworkLocation());
+  }
+  
+  @Test
   public void testChooseNodeBasedOnRackAwareness() {
     List<DatanodeDetails> healthyNodes = overWriteLocationInNodes(
         nodeManager.getNodes(HddsProtos.NodeState.HEALTHY));
@@ -84,8 +150,9 @@ public class TestPipelinePlacementPolicy {
         healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
         topologyWithDifRacks, anchor);
     Assert.assertNotNull(nextNode);
-    Assert.assertFalse(anchor.getNetworkLocation().equals(
-        nextNode.getNetworkLocation()));
+    // next node should be on a different rack.
+    Assert.assertNotEquals(anchor.getNetworkLocation(),
+        nextNode.getNetworkLocation());
   }
 
   @Test
@@ -115,25 +182,25 @@ public class TestPipelinePlacementPolicy {
 
   @Test
   public void testRackAwarenessNotEnabledWithFallBack() throws SCMException{
-    List<DatanodeDetails> healthyNodes =
-        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
-    DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes);
-    DatanodeDetails randomNode = placementPolicy.chooseNode(healthyNodes);
+    DatanodeDetails anchor = placementPolicy
+        .chooseNode(nodesWithOutRackAwareness);
+    DatanodeDetails randomNode = placementPolicy
+        .chooseNode(nodesWithOutRackAwareness);
     // rack awareness is not enabled.
     Assert.assertTrue(anchor.getNetworkLocation().equals(
         randomNode.getNetworkLocation()));
 
     NetworkTopology topology = new NetworkTopologyImpl(new Configuration());
     DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnRackAwareness(
-        healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
-        topology, anchor);
+        nodesWithOutRackAwareness, new ArrayList<>(
+            PIPELINE_PLACEMENT_MAX_NODES_COUNT), topology, anchor);
     // RackAwareness should not be able to choose any node.
     Assert.assertNull(nextNode);
 
     // PlacementPolicy should still be able to pick a set of 3 nodes.
     int numOfNodes = HddsProtos.ReplicationFactor.THREE.getNumber();
     List<DatanodeDetails> results = placementPolicy
-        .getResultSet(numOfNodes, healthyNodes);
+        .getResultSet(numOfNodes, nodesWithOutRackAwareness);
     
     Assert.assertEquals(numOfNodes, results.size());
     // All nodes are on same rack.
@@ -146,14 +213,20 @@ public class TestPipelinePlacementPolicy {
   private final static Node[] NODES = new NodeImpl[] {
       new NodeImpl("h1", "/r1", NetConstants.NODE_COST_DEFAULT),
       new NodeImpl("h2", "/r1", NetConstants.NODE_COST_DEFAULT),
-      new NodeImpl("h3", "/r1", NetConstants.NODE_COST_DEFAULT),
-      new NodeImpl("h4", "/r1", NetConstants.NODE_COST_DEFAULT),
-      new NodeImpl("h5", "/r2", NetConstants.NODE_COST_DEFAULT),
-      new NodeImpl("h6", "/r2", NetConstants.NODE_COST_DEFAULT),
-      new NodeImpl("h7", "/r2", NetConstants.NODE_COST_DEFAULT),
-      new NodeImpl("h8", "/r2", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h3", "/r2", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h4", "/r2", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h5", "/r3", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h6", "/r3", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h7", "/r4", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h8", "/r4", NetConstants.NODE_COST_DEFAULT),
   };
 
+  // 3 racks with single node.
+  private final static Node[] SINGLE_NODE_RACK = new NodeImpl[] {
+      new NodeImpl("h1", "/r1", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h2", "/r2", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h3", "/r3", NetConstants.NODE_COST_DEFAULT)
+  };
 
   private NetworkTopology createNetworkTopologyOnDifRacks() {
     NetworkTopology topology = new NetworkTopologyImpl(new Configuration());
@@ -163,20 +236,26 @@ public class TestPipelinePlacementPolicy {
     return topology;
   }
 
+  private DatanodeDetails overwriteLocationInNode(
+      DatanodeDetails datanode, Node node) {
+    DatanodeDetails result = DatanodeDetails.newBuilder()
+        .setUuid(datanode.getUuidString())
+        .setHostName(datanode.getHostName())
+        .setIpAddress(datanode.getIpAddress())
+        .addPort(datanode.getPort(DatanodeDetails.Port.Name.STANDALONE))
+        .addPort(datanode.getPort(DatanodeDetails.Port.Name.RATIS))
+        .addPort(datanode.getPort(DatanodeDetails.Port.Name.REST))
+        .setNetworkLocation(node.getNetworkLocation()).build();
+    return result;
+  }
+
   private List<DatanodeDetails> overWriteLocationInNodes(
       List<DatanodeDetails> datanodes) {
     List<DatanodeDetails> results = new ArrayList<>(datanodes.size());
     for (int i = 0; i < datanodes.size(); i++) {
-      DatanodeDetails datanode = datanodes.get(i);
-      DatanodeDetails result = DatanodeDetails.newBuilder()
-          .setUuid(datanode.getUuidString())
-          .setHostName(datanode.getHostName())
-          .setIpAddress(datanode.getIpAddress())
-          .addPort(datanode.getPort(DatanodeDetails.Port.Name.STANDALONE))
-          .addPort(datanode.getPort(DatanodeDetails.Port.Name.RATIS))
-          .addPort(datanode.getPort(DatanodeDetails.Port.Name.REST))
-          .setNetworkLocation(NODES[i].getNetworkLocation()).build();
-      results.add(result);
+      DatanodeDetails datanode = overwriteLocationInNode(
+          datanodes.get(i), NODES[i]);
+      results.add(datanode);
     }
     return results;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org