You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2019/11/25 07:16:48 UTC

[hadoop-ozone] 01/04: HDDS-1577. Add default pipeline placement policy implementation. (#1366)

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-1564
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 748cb018a0b20b0e0993565ce15c8ad908d6d1e2
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Thu Sep 5 11:51:40 2019 +0800

    HDDS-1577. Add default pipeline placement policy implementation. (#1366)
    
    
    
    (cherry picked from commit b640a5f6d53830aee4b9c2a7d17bf57c987962cd)
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |   5 +
 .../common/src/main/resources/ozone-default.xml    |   7 +
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |  14 +
 .../hadoop/hdds/scm/node/NodeStateManager.java     |   9 +
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  19 ++
 .../hdds/scm/node/states/Node2ObjectsMap.java      |   4 +-
 .../hdds/scm/node/states/Node2PipelineMap.java     |  12 +-
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java | 338 +++++++++++++++++++++
 .../hadoop/hdds/scm/container/MockNodeManager.java |  36 ++-
 .../scm/pipeline/TestPipelinePlacementPolicy.java  | 197 ++++++++++++
 .../testutils/ReplicationNodeManagerMock.java      |  16 +
 11 files changed, 654 insertions(+), 3 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 3c35e56..e6fed5b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -315,6 +315,11 @@ public final class ScmConfigKeys {
   public static final String OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT =
       "ozone.scm.pipeline.owner.container.count";
   public static final int OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT = 3;
+  // Pipeline placement policy:
+  // the max number of pipelines can a single datanode be engaged in.
+  public static final String OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT =
+          "ozone.scm.datanode.max.pipeline.engagement";
+  public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 5;
 
   public static final String
       OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 8110242..94e8557 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -839,6 +839,13 @@
     </description>
   </property>
   <property>
+    <name>ozone.scm.datanode.max.pipeline.engagement</name>
+    <value>5</value>
+    <tag>OZONE, SCM, PIPELINE</tag>
+    <description>Max number of pipelines per datanode can be engaged in.
+    </description>
+  </property>
+  <property>
     <name>ozone.scm.container.size</name>
     <value>5GB</value>
     <tag>OZONE, PERFORMANCE, MANAGEMENT</tag>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index fd8bb87..37562fe 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.node;
 
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -118,6 +119,13 @@ public interface NodeManager extends StorageContainerNodeProtocol,
   Set<PipelineID> getPipelines(DatanodeDetails datanodeDetails);
 
   /**
+   * Get the count of pipelines a datanodes is associated with.
+   * @param datanodeDetails DatanodeDetails
+   * @return The number of pipelines
+   */
+  int getPipelinesCount(DatanodeDetails datanodeDetails);
+
+  /**
    * Add pipeline information in the NodeManager.
    * @param pipeline - Pipeline to be added
    */
@@ -199,4 +207,10 @@ public interface NodeManager extends StorageContainerNodeProtocol,
    * @return the given datanode, or empty list if none found
    */
   List<DatanodeDetails> getNodesByAddress(String address);
+
+  /**
+   * Get cluster map as in network topology for this node manager.
+   * @return cluster map
+   */
+  NetworkTopology getClusterNetworkTopologyMap();
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index 954cb0e..9d2a9f2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -284,6 +284,15 @@ public class NodeStateManager implements Runnable, Closeable {
   }
 
   /**
+   * Get the count of pipelines associated to single datanode.
+   * @param datanodeDetails single datanode
+   * @return number of pipelines associated with it
+   */
+  public int getPipelinesCount(DatanodeDetails datanodeDetails) {
+    return node2PipelineMap.getPipelinesCount(datanodeDetails.getUuid());
+  }
+
+  /**
    * Get information about the node.
    *
    * @param datanodeDetails DatanodeDetails
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index f077e72..66cca46 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -510,6 +510,16 @@ public class SCMNodeManager implements NodeManager {
   }
 
   /**
+   * Get the count of pipelines a datanodes is associated with.
+   * @param datanodeDetails DatanodeDetails
+   * @return The number of pipelines
+   */
+  @Override
+  public int getPipelinesCount(DatanodeDetails datanodeDetails) {
+    return nodeStateManager.getPipelinesCount(datanodeDetails);
+  }
+
+  /**
    * Add pipeline information in the NodeManager.
    *
    * @param pipeline - Pipeline to be added
@@ -643,6 +653,15 @@ public class SCMNodeManager implements NodeManager {
     return results;
   }
 
+  /**
+   * Get cluster map as in network topology for this node manager.
+   * @return cluster map
+   */
+  @Override
+  public NetworkTopology getClusterNetworkTopologyMap() {
+    return clusterMap;
+  }
+
   private String nodeResolve(String hostname) {
     List<String> hosts = new ArrayList<>(1);
     hosts.add(hostname);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
index 37525b0..57a377d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
@@ -67,6 +67,7 @@ public class Node2ObjectsMap<T> {
    * @param datanodeID   -- Datanode UUID
    * @param containerIDs - List of ContainerIDs.
    */
+  @VisibleForTesting
   public void insertNewDatanode(UUID datanodeID, Set<T> containerIDs)
       throws SCMException {
     Preconditions.checkNotNull(containerIDs);
@@ -83,7 +84,8 @@ public class Node2ObjectsMap<T> {
    *
    * @param datanodeID - Datanode ID.
    */
-  void removeDatanode(UUID datanodeID) {
+  @VisibleForTesting
+  public void removeDatanode(UUID datanodeID) {
     Preconditions.checkNotNull(datanodeID);
     dn2ObjectMap.computeIfPresent(datanodeID, (k, v) -> null);
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
index f8633f9..714188d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
@@ -42,7 +42,7 @@ public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
   }
 
   /**
-   * Returns null if there no pipelines associated with this datanode ID.
+   * Returns null if there are no pipelines associated with this datanode ID.
    *
    * @param datanode - UUID
    * @return Set of pipelines or Null.
@@ -52,6 +52,16 @@ public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
   }
 
   /**
+   * Return 0 if there are no pipelines associated with this datanode ID.
+   * @param datanode - UUID
+   * @return Number of pipelines or 0.
+   */
+  public int getPipelinesCount(UUID datanode) {
+    Set<PipelineID> pipelines = getObjects(datanode);
+    return pipelines == null ? 0 : pipelines.size();
+  }
+
+  /**
    * Adds a pipeline entry to a given dataNode in the map.
    *
    * @param pipeline Pipeline to be added
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
new file mode 100644
index 0000000..cb9954d
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMCommonPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Pipeline placement policy that choose datanodes based on load balancing
+ * and network topology to supply pipeline creation.
+ * <p>
+ * 1. get a list of healthy nodes
+ * 2. filter out nodes that are not too heavily engaged in other pipelines
+ * 3. Choose an anchor node among the viable nodes.
+ * 4. Choose other nodes around the anchor node based on network topology
+ */
+public final class PipelinePlacementPolicy extends SCMCommonPolicy {
+  @VisibleForTesting
+  static final Logger LOG =
+      LoggerFactory.getLogger(PipelinePlacementPolicy.class);
+  private final NodeManager nodeManager;
+  private final Configuration conf;
+  private final int heavyNodeCriteria;
+
+  /**
+   * Constructs a pipeline placement with considering network topology,
+   * load balancing and rack awareness.
+   *
+   * @param nodeManager Node Manager
+   * @param conf        Configuration
+   */
+  public PipelinePlacementPolicy(
+      final NodeManager nodeManager, final Configuration conf) {
+    super(nodeManager, conf);
+    this.nodeManager = nodeManager;
+    this.conf = conf;
+    heavyNodeCriteria = conf.getInt(
+        ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+        ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
+  }
+
+  /**
+   * Returns true if this node meets the criteria.
+   *
+   * @param datanodeDetails DatanodeDetails
+   * @return true if we have enough space.
+   */
+  @VisibleForTesting
+  boolean meetCriteria(DatanodeDetails datanodeDetails, long heavyNodeLimit) {
+    return (nodeManager.getPipelinesCount(datanodeDetails) <= heavyNodeLimit);
+  }
+
+  /**
+   * Filter out viable nodes based on
+   * 1. nodes that are healthy
+   * 2. nodes that are not too heavily engaged in other pipelines
+   *
+   * @param excludedNodes - excluded nodes
+   * @param nodesRequired - number of datanodes required.
+   * @return a list of viable nodes
+   * @throws SCMException when viable nodes are not enough in numbers
+   */
+  List<DatanodeDetails> filterViableNodes(
+      List<DatanodeDetails> excludedNodes, int nodesRequired)
+      throws SCMException {
+    // get nodes in HEALTHY state
+    List<DatanodeDetails> healthyNodes =
+        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+    if (excludedNodes != null) {
+      healthyNodes.removeAll(excludedNodes);
+    }
+    String msg;
+    if (healthyNodes.size() == 0) {
+      msg = "No healthy node found to allocate pipeline.";
+      LOG.error(msg);
+      throw new SCMException(msg, SCMException.ResultCodes
+          .FAILED_TO_FIND_HEALTHY_NODES);
+    }
+
+    if (healthyNodes.size() < nodesRequired) {
+      msg = String.format("Not enough healthy nodes to allocate pipeline. %d "
+              + " datanodes required. Found %d",
+          nodesRequired, healthyNodes.size());
+      LOG.error(msg);
+      throw new SCMException(msg,
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+
+    // filter nodes that meet the size and pipeline engagement criteria.
+    // Pipeline placement doesn't take node space left into account.
+    List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d ->
+        meetCriteria(d, heavyNodeCriteria)).collect(Collectors.toList());
+
+    if (healthyList.size() < nodesRequired) {
+      msg = String.format("Unable to find enough nodes that meet " +
+              "the criteria that cannot engage in more than %d pipelines." +
+              " Nodes required: %d Found: %d",
+          heavyNodeCriteria, nodesRequired, healthyList.size());
+      LOG.error(msg);
+      throw new SCMException(msg,
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+    return healthyList;
+  }
+
+  /**
+   * Pipeline placement choose datanodes to join the pipeline.
+   *
+   * @param excludedNodes - excluded nodes
+   * @param favoredNodes  - list of nodes preferred.
+   * @param nodesRequired - number of datanodes required.
+   * @param sizeRequired  - size required for the container or block.
+   * @return a list of chosen datanodeDetails
+   * @throws SCMException when chosen nodes are not enough in numbers
+   */
+  @Override
+  public List<DatanodeDetails> chooseDatanodes(
+      List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes,
+      int nodesRequired, final long sizeRequired) throws SCMException {
+    // get a list of viable nodes based on criteria
+    List<DatanodeDetails> healthyNodes =
+        filterViableNodes(excludedNodes, nodesRequired);
+
+    List<DatanodeDetails> results = new ArrayList<>();
+
+    // Randomly picks nodes when all nodes are equal.
+    // This happens when network topology is absent or
+    // all nodes are on the same rack.
+    if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) {
+      LOG.info("All nodes are considered equal. Now randomly pick nodes. " +
+          "Required nodes: {}", nodesRequired);
+      results = super.getResultSet(nodesRequired, healthyNodes);
+      if (results.size() < nodesRequired) {
+        LOG.error("Unable to find the required number of healthy nodes that " +
+                "meet the criteria. Required nodes: {}, Found nodes: {}",
+            nodesRequired, results.size());
+        throw new SCMException("Unable to find required number of nodes.",
+            SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+      }
+      return results;
+    }
+
+    // Since nodes are widely distributed, the results should be selected
+    // base on distance in topology, rack awareness and load balancing.
+    List<DatanodeDetails> exclude = new ArrayList<>();
+    exclude.addAll(excludedNodes);
+    // First choose an anchor nodes randomly
+    DatanodeDetails anchor = chooseNode(healthyNodes);
+    if (anchor == null) {
+      LOG.error("Unable to find the first healthy nodes that " +
+              "meet the criteria. Required nodes: {}, Found nodes: {}",
+          nodesRequired, results.size());
+      throw new SCMException("Unable to find required number of nodes.",
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+
+    results.add(anchor);
+    exclude.add(anchor);
+    nodesRequired--;
+
+    // Choose the second node on different racks from anchor.
+    DatanodeDetails nodeOnDifferentRack = chooseNodeBasedOnRackAwareness(
+        healthyNodes, excludedNodes,
+        nodeManager.getClusterNetworkTopologyMap(), anchor);
+    if (nodeOnDifferentRack == null) {
+      LOG.error("Unable to find nodes on different racks that " +
+              "meet the criteria. Required nodes: {}, Found nodes: {}",
+          nodesRequired, results.size());
+      throw new SCMException("Unable to find required number of nodes.",
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+
+    results.add(nodeOnDifferentRack);
+    exclude.add(nodeOnDifferentRack);
+    nodesRequired--;
+
+    // Then choose nodes close to anchor based on network topology
+    for (int x = 0; x < nodesRequired; x++) {
+      // invoke the choose function defined in the derived classes.
+      DatanodeDetails pick = chooseNodeFromNetworkTopology(
+          nodeManager.getClusterNetworkTopologyMap(), anchor, exclude);
+      if (pick != null) {
+        results.add(pick);
+        // exclude the picked node for next time
+        exclude.add(pick);
+      }
+    }
+
+    if (results.size() < nodesRequired) {
+      LOG.error("Unable to find the required number of healthy nodes that " +
+              "meet the criteria. Required nodes: {}, Found nodes: {}",
+          nodesRequired, results.size());
+      throw new SCMException("Unable to find required number of nodes.",
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+    return results;
+  }
+
+  /**
+   * Find a node from the healthy list and return it after removing it from the
+   * list that we are operating on.
+   *
+   * @param healthyNodes - Set of healthy nodes we can choose from.
+   * @return chosen datanodDetails
+   */
+  @Override
+  public DatanodeDetails chooseNode(
+      List<DatanodeDetails> healthyNodes) {
+    int firstNodeNdx = getRand().nextInt(healthyNodes.size());
+    int secondNodeNdx = getRand().nextInt(healthyNodes.size());
+
+    DatanodeDetails datanodeDetails;
+    // There is a possibility that both numbers will be same.
+    // if that is so, we just return the node.
+    if (firstNodeNdx == secondNodeNdx) {
+      datanodeDetails = healthyNodes.get(firstNodeNdx);
+    } else {
+      DatanodeDetails firstNodeDetails = healthyNodes.get(firstNodeNdx);
+      DatanodeDetails secondNodeDetails = healthyNodes.get(secondNodeNdx);
+      SCMNodeMetric firstNodeMetric =
+          nodeManager.getNodeStat(firstNodeDetails);
+      SCMNodeMetric secondNodeMetric =
+          nodeManager.getNodeStat(secondNodeDetails);
+      datanodeDetails = firstNodeMetric.isGreater(secondNodeMetric.get())
+          ? firstNodeDetails : secondNodeDetails;
+    }
+    // the pick is decided and it should be removed from candidates.
+    healthyNodes.remove(datanodeDetails);
+    return datanodeDetails;
+  }
+
+  /**
+   * Choose node on different racks as anchor is on based on rack awareness.
+   * If a node on different racks cannot be found, then return a random node.
+   * @param healthyNodes healthy nodes
+   * @param excludedNodes excluded nodes
+   * @param networkTopology network topology
+   * @param anchor anchor node
+   * @return a node on different rack
+   */
+  @VisibleForTesting
+  protected DatanodeDetails chooseNodeBasedOnRackAwareness(
+      List<DatanodeDetails> healthyNodes,  List<DatanodeDetails> excludedNodes,
+      NetworkTopology networkTopology, DatanodeDetails anchor) {
+    Preconditions.checkArgument(networkTopology != null);
+    if (checkAllNodesAreEqual(networkTopology)) {
+      return null;
+    }
+
+    for (DatanodeDetails node : healthyNodes) {
+      if (excludedNodes.contains(node)
+          || networkTopology.isSameParent(anchor, node)) {
+        continue;
+      } else {
+        // the pick is decided and it should be removed from candidates.
+        healthyNodes.remove(node);
+        return node;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Check if all nodes are equal in topology.
+   * They are equal when network topology is absent or there are on
+   * the same rack.
+   * @param topology network topology
+   * @return true when all nodes are equal
+   */
+  private boolean checkAllNodesAreEqual(NetworkTopology topology) {
+    if (topology == null) {
+      return true;
+    }
+    return (topology.getNumOfNodes(topology.getMaxLevel() - 1) == 1);
+  }
+
+  /**
+   * Choose node based on network topology.
+   * @param networkTopology network topology
+   * @param anchor anchor datanode to start with
+   * @param excludedNodes excluded datanodes
+   * @return chosen datanode
+   */
+  @VisibleForTesting
+  protected DatanodeDetails chooseNodeFromNetworkTopology(
+      NetworkTopology networkTopology, DatanodeDetails anchor,
+      List<DatanodeDetails> excludedNodes) {
+    Preconditions.checkArgument(networkTopology != null);
+
+    Collection<Node> excluded = new ArrayList<>();
+    if (excludedNodes != null && excludedNodes.size() != 0) {
+      excluded.addAll(excludedNodes);
+    }
+    excluded.add(anchor);
+
+    Node pick = networkTopology.chooseRandom(
+        anchor.getNetworkLocation(), excluded);
+    DatanodeDetails pickedNode = (DatanodeDetails) pick;
+    // exclude the picked node for next time
+    if (excludedNodes != null) {
+      excludedNodes.add(pickedNode);
+    }
+    return pickedNode;
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 87cc177..613146d 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -16,11 +16,13 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.net.NetConstants;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
 import org.apache.hadoop.hdds.scm.net.Node;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -86,7 +88,7 @@ public class MockNodeManager implements NodeManager {
   private final SCMNodeStat aggregateStat;
   private boolean safemode;
   private final Map<UUID, List<SCMCommand>> commandMap;
-  private final Node2PipelineMap node2PipelineMap;
+  private Node2PipelineMap node2PipelineMap;
   private final Node2ContainerMap node2ContainerMap;
   private NetworkTopology clusterMap;
   private ConcurrentMap<String, Set<String>> dnsToUuidMap;
@@ -100,6 +102,7 @@ public class MockNodeManager implements NodeManager {
     this.node2ContainerMap = new Node2ContainerMap();
     this.dnsToUuidMap = new ConcurrentHashMap<>();
     aggregateStat = new SCMNodeStat();
+    clusterMap = new NetworkTopologyImpl(new Configuration());
     if (initializeFakeNodes) {
       for (int x = 0; x < nodeCount; x++) {
         DatanodeDetails dd = TestUtils.randomDatanodeDetails();
@@ -251,6 +254,16 @@ public class MockNodeManager implements NodeManager {
   }
 
   /**
+   * Get the count of pipelines a datanodes is associated with.
+   * @param datanodeDetails DatanodeDetails
+   * @return The number of pipelines
+   */
+  @Override
+  public int getPipelinesCount(DatanodeDetails datanodeDetails) {
+    return node2PipelineMap.getPipelinesCount(datanodeDetails.getUuid());
+  }
+
+  /**
    * Add pipeline information in the NodeManager.
    * @param pipeline - Pipeline to be added
    */
@@ -260,6 +273,22 @@ public class MockNodeManager implements NodeManager {
   }
 
   /**
+   * Get the entire Node2PipelineMap.
+   * @return Node2PipelineMap
+   */
+  public Node2PipelineMap getNode2PipelineMap() {
+    return node2PipelineMap;
+  }
+
+  /**
+   * Set the Node2PipelineMap.
+   * @param node2PipelineMap Node2PipelineMap
+   */
+  public void setNode2PipelineMap(Node2PipelineMap node2PipelineMap) {
+    this.node2PipelineMap = node2PipelineMap;
+  }
+
+  /**
    * Remove a pipeline information from the NodeManager.
    * @param pipeline - Pipeline to be removed
    */
@@ -517,6 +546,11 @@ public class MockNodeManager implements NodeManager {
     return results;
   }
 
+  @Override
+  public NetworkTopology getClusterNetworkTopologyMap() {
+    return clusterMap;
+  }
+
   public void setNetworkTopology(NetworkTopology topology) {
     this.clusterMap = topology;
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
new file mode 100644
index 0000000..2e0d0b1
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.*;
+import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * Test for PipelinePlacementPolicy.
+ */
+public class TestPipelinePlacementPolicy {
+  private MockNodeManager nodeManager;
+  private PipelinePlacementPolicy placementPolicy;
+  private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10;
+
+  @Before
+  public void init() throws Exception {
+    nodeManager = new MockNodeManager(true,
+        PIPELINE_PLACEMENT_MAX_NODES_COUNT);
+    placementPolicy =
+        new PipelinePlacementPolicy(nodeManager, new OzoneConfiguration());
+  }
+
+  @Test
+  public void testChooseNodeBasedOnNetworkTopology() {
+    List<DatanodeDetails> healthyNodes =
+        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+    DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes);
+    // anchor should be removed from healthyNodes after being chosen.
+    Assert.assertFalse(healthyNodes.contains(anchor));
+
+    List<DatanodeDetails> excludedNodes =
+        new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT);
+    DatanodeDetails nextNode = placementPolicy.chooseNodeFromNetworkTopology(
+        nodeManager.getClusterNetworkTopologyMap(), anchor, excludedNodes);
+    // excludedNodes should contain nextNode after being chosen.
+    Assert.assertTrue(excludedNodes.contains(nextNode));
+    // nextNode should not be the same as anchor.
+    Assert.assertTrue(anchor.getUuid() != nextNode.getUuid());
+  }
+
+  @Test
+  public void testChooseNodeBasedOnRackAwareness() {
+    List<DatanodeDetails> healthyNodes = overWriteLocationInNodes(
+        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY));
+    DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes);
+    NetworkTopology topologyWithDifRacks =
+        createNetworkTopologyOnDifRacks();
+    DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnRackAwareness(
+        healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+        topologyWithDifRacks, anchor);
+    Assert.assertFalse(topologyWithDifRacks.isSameParent(anchor, nextNode));
+  }
+
+  private final static Node[] NODES = new NodeImpl[] {
+      new NodeImpl("h1", "/r1", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h2", "/r1", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h3", "/r1", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h4", "/r1", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h5", "/r2", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h6", "/r2", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h7", "/r2", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h8", "/r2", NetConstants.NODE_COST_DEFAULT),
+  };
+
+
+  private NetworkTopology createNetworkTopologyOnDifRacks() {
+    NetworkTopology topology = new NetworkTopologyImpl(new Configuration());
+    for (Node n : NODES) {
+      topology.add(n);
+    }
+    return topology;
+  }
+
+  private List<DatanodeDetails> overWriteLocationInNodes(
+      List<DatanodeDetails> datanodes) {
+    List<DatanodeDetails> results = new ArrayList<>(datanodes.size());
+    for (int i = 0; i < datanodes.size(); i++) {
+      DatanodeDetails datanode = datanodes.get(i);
+      DatanodeDetails result = DatanodeDetails.newBuilder()
+          .setUuid(datanode.getUuidString())
+          .setHostName(datanode.getHostName())
+          .setIpAddress(datanode.getIpAddress())
+          .addPort(datanode.getPort(DatanodeDetails.Port.Name.STANDALONE))
+          .addPort(datanode.getPort(DatanodeDetails.Port.Name.RATIS))
+          .addPort(datanode.getPort(DatanodeDetails.Port.Name.REST))
+          .setNetworkLocation(NODES[i].getNetworkLocation()).build();
+      results.add(result);
+    }
+    return results;
+  }
+
+  @Test
+  public void testHeavyNodeShouldBeExcluded() throws SCMException{
+    List<DatanodeDetails> healthyNodes =
+        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+    int nodesRequired = healthyNodes.size()/2;
+    // only minority of healthy NODES are heavily engaged in pipelines.
+    int minorityHeavy = healthyNodes.size()/2 - 1;
+    List<DatanodeDetails> pickedNodes1 = placementPolicy.chooseDatanodes(
+        new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+        new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+        nodesRequired, 0);
+    // modify node to pipeline mapping.
+    insertHeavyNodesIntoNodeManager(healthyNodes, minorityHeavy);
+    // NODES should be sufficient.
+    Assert.assertEquals(nodesRequired, pickedNodes1.size());
+    // make sure pipeline placement policy won't select duplicated NODES.
+    Assert.assertTrue(checkDuplicateNodesUUID(pickedNodes1));
+
+    // majority of healthy NODES are heavily engaged in pipelines.
+    int majorityHeavy = healthyNodes.size()/2 + 2;
+    insertHeavyNodesIntoNodeManager(healthyNodes, majorityHeavy);
+    boolean thrown = false;
+    List<DatanodeDetails> pickedNodes2 = null;
+    try {
+      pickedNodes2 = placementPolicy.chooseDatanodes(
+          new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+          new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+          nodesRequired, 0);
+    } catch (SCMException e) {
+      Assert.assertFalse(thrown);
+      thrown = true;
+    }
+    // NODES should NOT be sufficient and exception should be thrown.
+    Assert.assertNull(pickedNodes2);
+    Assert.assertTrue(thrown);
+  }
+
+  private boolean checkDuplicateNodesUUID(List<DatanodeDetails> nodes) {
+    HashSet<UUID> uuids = nodes.stream().
+        map(DatanodeDetails::getUuid).
+        collect(Collectors.toCollection(HashSet::new));
+    return uuids.size() == nodes.size();
+  }
+
+  private Set<PipelineID> mockPipelineIDs(int count) {
+    Set<PipelineID> pipelineIDs = new HashSet<>(count);
+    for (int i = 0; i < count; i++) {
+      pipelineIDs.add(PipelineID.randomId());
+    }
+    return pipelineIDs;
+  }
+
+  private void insertHeavyNodesIntoNodeManager(
+      List<DatanodeDetails> nodes, int heavyNodeCount) throws SCMException{
+    if (nodes == null) {
+      throw new SCMException("",
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+
+    int considerHeavyCount =
+        ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT + 1;
+
+    Node2PipelineMap mockMap = new Node2PipelineMap();
+    for (DatanodeDetails node : nodes) {
+      // mock heavy node
+      if (heavyNodeCount > 0) {
+        mockMap.insertNewDatanode(
+            node.getUuid(), mockPipelineIDs(considerHeavyCount));
+        heavyNodeCount--;
+      } else {
+        mockMap.insertNewDatanode(node.getUuid(), mockPipelineIDs(1));
+      }
+    }
+    nodeManager.setNode2PipelineMap(mockMap);
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 0ecff3f..7e8ec52 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -20,6 +20,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -166,6 +167,16 @@ public class ReplicationNodeManagerMock implements NodeManager {
   }
 
   /**
+   * Get the count of pipelines a datanodes is associated with.
+   * @param dnId DatanodeDetails
+   * @return The number of pipelines
+   */
+  @Override
+  public int getPipelinesCount(DatanodeDetails dnId) {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  /**
    * Add pipeline information in the NodeManager.
    * @param pipeline - Pipeline to be added
    */
@@ -327,4 +338,9 @@ public class ReplicationNodeManagerMock implements NodeManager {
   public List<DatanodeDetails> getNodesByAddress(String address) {
     return new LinkedList<>();
   }
+
+  @Override
+  public NetworkTopology getClusterNetworkTopologyMap() {
+    return null;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org