You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2019/10/13 06:52:06 UTC

[hadoop-ozone] 01/02: HDDS-1569 Support creating multiple pipelines with same datanode.

This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch HDDS-1569
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit fbef6c3d732e20ae45f9c5d43cd3d57670b90dd3
Author: timmycheng <ti...@tencent.com>
AuthorDate: Wed Sep 25 11:40:12 2019 +0800

    HDDS-1569 Support creating multiple pipelines with same datanode.
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |  3 +-
 .../ContainerPlacementPolicyFactory.java           |  8 +--
 .../hdds/scm/node/states/Node2PipelineMap.java     |  4 ++
 .../hdds/scm/pipeline/PipelineActionHandler.java   |  2 +-
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java | 52 +++++++++++------
 .../hadoop/hdds/scm/pipeline/PipelineStateMap.java |  5 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   | 68 ++--------------------
 .../hdds/scm/pipeline/SCMPipelineManager.java      |  1 +
 .../scm/safemode/HealthyPipelineSafeModeRule.java  | 65 ++++++++++-----------
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |  3 +
 .../scm/pipeline/TestPipelinePlacementPolicy.java  | 11 +++-
 .../hdds/scm/pipeline/TestPipelineClose.java       |  2 +-
 .../scm/pipeline/TestRatisPipelineProvider.java    | 32 +++++-----
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  | 11 ++--
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |  4 ++
 15 files changed, 122 insertions(+), 149 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index ad7073e..fe51f51 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -317,7 +317,8 @@ public final class ScmConfigKeys {
   // the max number of pipelines can a single datanode be engaged in.
   public static final String OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT =
           "ozone.scm.datanode.max.pipeline.engagement";
-  public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 5;
+  // Setting to zero by default means this limit doesn't take effect.
+  public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 0;
 
   public static final String
       OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
index adaeb87..74431f9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
@@ -43,10 +43,10 @@ public final class ContainerPlacementPolicyFactory {
   }
 
 
-  public static PlacementPolicy getPolicy(Configuration conf,
-    final NodeManager nodeManager, NetworkTopology clusterMap,
-    final boolean fallback, SCMContainerPlacementMetrics metrics)
-    throws SCMException{
+  public static PlacementPolicy getPolicy(
+      Configuration conf, final NodeManager nodeManager,
+      NetworkTopology clusterMap, final boolean fallback,
+      SCMContainerPlacementMetrics metrics) throws SCMException{
     final Class<? extends PlacementPolicy> placementClass = conf
         .getClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
             OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
index 714188d..496b9e7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
@@ -71,6 +71,10 @@ public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
       UUID dnId = details.getUuid();
       dn2ObjectMap.computeIfAbsent(dnId, k -> ConcurrentHashMap.newKeySet())
           .add(pipeline.getId());
+      dn2ObjectMap.computeIfPresent(dnId, (k, v) -> {
+        v.add(pipeline.getId());
+        return v;
+      });
     }
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
index 8d497fa..8d040f1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
@@ -57,7 +57,7 @@ public class PipelineActionHandler
           pipelineID = PipelineID.
               getFromProtobuf(action.getClosePipeline().getPipelineID());
           Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
-          LOG.error("Received pipeline action {} for {} from datanode {}. " +
+          LOG.info("Received pipeline action {} for {} from datanode {}. " +
                   "Reason : {}", action.getAction(), pipeline,
               report.getDatanodeDetails(),
               action.getClosePipeline().getDetailedReason());
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 1983ed6..df46fad 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -79,8 +79,20 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
    * @return true if we have enough space.
    */
   @VisibleForTesting
-  boolean meetCriteria(DatanodeDetails datanodeDetails, long heavyNodeLimit) {
-    return (nodeManager.getPipelinesCount(datanodeDetails) <= heavyNodeLimit);
+  boolean meetCriteria(DatanodeDetails datanodeDetails) {
+    if (heavyNodeCriteria == 0) {
+      // no limit applied.
+      return true;
+    }
+    boolean meet = (nodeManager.getPipelinesCount(datanodeDetails)
+        < heavyNodeCriteria);
+    if (!meet) {
+      LOG.info("Pipeline Placement: can't place more pipeline on heavy " +
+          "datanodeļ¼š " + datanodeDetails.getUuid().toString() + " Heaviness: " +
+          nodeManager.getPipelinesCount(datanodeDetails) + " limit: " +
+          heavyNodeCriteria);
+    }
+    return meet;
   }
 
   /**
@@ -102,18 +114,19 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     if (excludedNodes != null) {
       healthyNodes.removeAll(excludedNodes);
     }
+    int initialHealthyNodesCount = healthyNodes.size();
     String msg;
-    if (healthyNodes.size() == 0) {
+    if (initialHealthyNodesCount == 0) {
       msg = "No healthy node found to allocate pipeline.";
       LOG.error(msg);
       throw new SCMException(msg, SCMException.ResultCodes
           .FAILED_TO_FIND_HEALTHY_NODES);
     }
 
-    if (healthyNodes.size() < nodesRequired) {
+    if (initialHealthyNodesCount < nodesRequired) {
       msg = String.format("Not enough healthy nodes to allocate pipeline. %d "
               + " datanodes required. Found %d",
-          nodesRequired, healthyNodes.size());
+          nodesRequired, initialHealthyNodesCount);
       LOG.error(msg);
       throw new SCMException(msg,
           SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -122,13 +135,15 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     // filter nodes that meet the size and pipeline engagement criteria.
     // Pipeline placement doesn't take node space left into account.
     List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d ->
-        meetCriteria(d, heavyNodeCriteria)).collect(Collectors.toList());
+        meetCriteria(d)).collect(Collectors.toList());
 
     if (healthyList.size() < nodesRequired) {
       msg = String.format("Unable to find enough nodes that meet " +
               "the criteria that cannot engage in more than %d pipelines." +
-              " Nodes required: %d Found: %d",
-          heavyNodeCriteria, nodesRequired, healthyList.size());
+              " Nodes required: %d Found: %d, healthy nodes count in" +
+              "NodeManager: %d.",
+          heavyNodeCriteria, nodesRequired, healthyList.size(),
+          initialHealthyNodesCount);
       LOG.error(msg);
       throw new SCMException(msg,
           SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -155,12 +170,11 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     List<DatanodeDetails> healthyNodes =
         filterViableNodes(excludedNodes, nodesRequired);
     
-    // Randomly picks nodes when all nodes are equal.
+    // Randomly picks nodes when all nodes are equal or factor is ONE.
     // This happens when network topology is absent or
     // all nodes are on the same rack.
-    if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) {
-      LOG.info("All nodes are considered equal. Now randomly pick nodes. " +
-          "Required nodes: {}", nodesRequired);
+    if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())
+        || nodesRequired == HddsProtos.ReplicationFactor.ONE.getNumber()) {
       return super.getResultSet(nodesRequired, healthyNodes);
     } else {
       // Since topology and rack awareness are available, picks nodes
@@ -188,8 +202,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     // First choose an anchor nodes randomly
     DatanodeDetails anchor = chooseNode(healthyNodes);
     if (anchor == null) {
-      LOG.error("Unable to find the first healthy nodes that " +
-              "meet the criteria. Required nodes: {}, Found nodes: {}",
+      LOG.error("Pipeline Placement: Unable to find the first healthy nodes " +
+              "that meet the criteria. Required nodes: {}, Found nodes: {}",
           nodesRequired, results.size());
       throw new SCMException("Unable to find required number of nodes.",
           SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -204,8 +218,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
         healthyNodes, exclude,
         nodeManager.getClusterNetworkTopologyMap(), anchor);
     if (nodeOnDifferentRack == null) {
-      LOG.error("Unable to find nodes on different racks that " +
-              "meet the criteria. Required nodes: {}, Found nodes: {}",
+      LOG.error("Pipeline Placement: Unable to find nodes on different racks " +
+              " that meet the criteria. Required nodes: {}, Found nodes: {}",
           nodesRequired, results.size());
       throw new SCMException("Unable to find required number of nodes.",
           SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -228,9 +242,9 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     }
 
     if (results.size() < nodesRequired) {
-      LOG.error("Unable to find the required number of healthy nodes that " +
-              "meet the criteria. Required nodes: {}, Found nodes: {}",
-          nodesRequired, results.size());
+      LOG.error("Pipeline Placement: Unable to find the required number of " +
+              "healthy nodes that  meet the criteria. Required nodes: {}, " +
+              "Found nodes: {}", nodesRequired, results.size());
       throw new SCMException("Unable to find required number of nodes.",
           SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
     }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
index 443378c..8e0f32d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -52,8 +53,8 @@ class PipelineStateMap {
   PipelineStateMap() {
 
     // TODO: Use TreeMap for range operations?
-    pipelineMap = new HashMap<>();
-    pipeline2container = new HashMap<>();
+    pipelineMap = new ConcurrentHashMap<>();
+    pipeline2container = new ConcurrentHashMap<>();
     query2OpenPipelines = new HashMap<>();
     initializeQueryMap();
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 9409728..6a51957 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -22,11 +22,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.PlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
 import org.apache.hadoop.io.MultipleIOException;
@@ -44,19 +41,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinWorkerThread;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 /**
  * Implements Api for creating ratis pipelines.
@@ -69,6 +61,7 @@ public class RatisPipelineProvider implements PipelineProvider {
   private final NodeManager nodeManager;
   private final PipelineStateManager stateManager;
   private final Configuration conf;
+  private final PipelinePlacementPolicy placementPolicy;
 
   // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
   private final int parallelismForPool = 3;
@@ -92,66 +85,13 @@ public class RatisPipelineProvider implements PipelineProvider {
     this.stateManager = stateManager;
     this.conf = conf;
     this.tlsConfig = tlsConfig;
-  }
-
-
-  /**
-   * Create pluggable container placement policy implementation instance.
-   *
-   * @param nodeManager - SCM node manager.
-   * @param conf - configuration.
-   * @return SCM container placement policy implementation instance.
-   */
-  @SuppressWarnings("unchecked")
-  // TODO: should we rename PlacementPolicy to PipelinePlacementPolicy?
-  private static PlacementPolicy createContainerPlacementPolicy(
-      final NodeManager nodeManager, final Configuration conf) {
-    Class<? extends PlacementPolicy> implClass =
-        (Class<? extends PlacementPolicy>) conf.getClass(
-            ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
-            SCMContainerPlacementRandom.class);
-
-    try {
-      Constructor<? extends PlacementPolicy> ctor =
-          implClass.getDeclaredConstructor(NodeManager.class,
-              Configuration.class);
-      return ctor.newInstance(nodeManager, conf);
-    } catch (RuntimeException e) {
-      throw e;
-    } catch (InvocationTargetException e) {
-      throw new RuntimeException(implClass.getName()
-          + " could not be constructed.", e.getCause());
-    } catch (Exception e) {
-//      LOG.error("Unhandled exception occurred, Placement policy will not " +
-//          "be functional.");
-      throw new IllegalArgumentException("Unable to load " +
-          "PlacementPolicy", e);
-    }
+    this.placementPolicy = new PipelinePlacementPolicy(nodeManager, conf);
   }
 
   @Override
   public Pipeline create(ReplicationFactor factor) throws IOException {
-    // Get set of datanodes already used for ratis pipeline
-    Set<DatanodeDetails> dnsUsed = new HashSet<>();
-    stateManager.getPipelines(ReplicationType.RATIS, factor).stream().filter(
-        p -> p.getPipelineState().equals(PipelineState.OPEN) ||
-            p.getPipelineState().equals(PipelineState.DORMANT) ||
-            p.getPipelineState().equals(PipelineState.ALLOCATED))
-        .forEach(p -> dnsUsed.addAll(p.getNodes()));
-
-    // Get list of healthy nodes
-    List<DatanodeDetails> dns =
-        nodeManager.getNodes(NodeState.HEALTHY)
-            .parallelStream()
-            .filter(dn -> !dnsUsed.contains(dn))
-            .limit(factor.getNumber())
-            .collect(Collectors.toList());
-    if (dns.size() < factor.getNumber()) {
-      String e = String
-          .format("Cannot create pipeline of factor %d using %d nodes.",
-              factor.getNumber(), dns.size());
-      throw new InsufficientDatanodesException(e);
-    }
+    List<DatanodeDetails> dns = placementPolicy.chooseDatanodes(null,
+        null, factor.getNumber(), 0);
 
     Pipeline pipeline = Pipeline.newBuilder()
         .setId(PipelineID.randomId())
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 0964f6d..a927d56 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -164,6 +164,7 @@ public class SCMPipelineManager implements PipelineManager {
       throw idEx;
     } catch (IOException ex) {
       metrics.incNumPipelineCreationFailed();
+      LOG.error("Pipeline creation failed.", ex);
       throw ex;
     } finally {
       lock.writeLock().unlock();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 7a00d76..b3aac5e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.safemode;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
@@ -56,7 +55,7 @@ public class HealthyPipelineSafeModeRule
   private final PipelineManager pipelineManager;
   private final int healthyPipelineThresholdCount;
   private int currentHealthyPipelineCount = 0;
-  private final Set<DatanodeDetails> processedDatanodeDetails =
+  private final Set<PipelineID> processedPipelineIDs =
       new HashSet<>();
 
   HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
@@ -116,46 +115,46 @@ public class HealthyPipelineSafeModeRule
     // processed report event, we should not consider this pipeline report
     // from datanode again during threshold calculation.
     Preconditions.checkNotNull(pipelineReportFromDatanode);
-    DatanodeDetails dnDetails = pipelineReportFromDatanode.getDatanodeDetails();
-    if (!processedDatanodeDetails.contains(
-        pipelineReportFromDatanode.getDatanodeDetails())) {
-
-      Pipeline pipeline;
-      PipelineReportsProto pipelineReport =
-          pipelineReportFromDatanode.getReport();
-
-      for (PipelineReport report : pipelineReport.getPipelineReportList()) {
-        PipelineID pipelineID = PipelineID
-            .getFromProtobuf(report.getPipelineID());
-        try {
-          pipeline = pipelineManager.getPipeline(pipelineID);
-        } catch (PipelineNotFoundException e) {
-          continue;
-        }
-
-        if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
-            pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
-          // If the pipeline is open state mean, all 3 datanodes are reported
-          // for this pipeline.
-          currentHealthyPipelineCount++;
-          getSafeModeMetrics().incCurrentHealthyPipelinesCount();
-        }
+
+    Pipeline pipeline;
+    PipelineReportsProto pipelineReport =
+        pipelineReportFromDatanode.getReport();
+
+    for (PipelineReport report : pipelineReport.getPipelineReportList()) {
+      PipelineID pipelineID = PipelineID
+          .getFromProtobuf(report.getPipelineID());
+      if (processedPipelineIDs.contains(pipelineID)) {
+        continue;
+      }
+
+      try {
+        pipeline = pipelineManager.getPipeline(pipelineID);
+      } catch (PipelineNotFoundException e) {
+        continue;
       }
-      if (scmInSafeMode()) {
-        SCMSafeModeManager.getLogger().info(
-            "SCM in safe mode. Healthy pipelines reported count is {}, " +
-                "required healthy pipeline reported count is {}",
-            currentHealthyPipelineCount, healthyPipelineThresholdCount);
+
+      if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
+          pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
+        // If the pipeline is open state mean, all 3 datanodes are reported
+        // for this pipeline.
+        currentHealthyPipelineCount++;
+        getSafeModeMetrics().incCurrentHealthyPipelinesCount();
       }
 
-      processedDatanodeDetails.add(dnDetails);
+      processedPipelineIDs.add(pipelineID);
     }
 
+    if (scmInSafeMode()) {
+      SCMSafeModeManager.getLogger().info(
+          "SCM in safe mode. Healthy pipelines reported count is {}, " +
+              "required healthy pipeline reported count is {}",
+          currentHealthyPipelineCount, healthyPipelineThresholdCount);
+    }
   }
 
   @Override
   protected void cleanup() {
-    processedDatanodeDetails.clear();
+    processedPipelineIDs.clear();
   }
 
   @VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 7657b54..20cc3cf 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -64,6 +64,8 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
 /**
  * Test DeadNodeHandler.
  */
@@ -84,6 +86,7 @@ public class TestDeadNodeHandler {
     storageDir = GenericTestUtils.getTempPath(
         TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0);
     eventQueue = new EventQueue();
     scm = HddsTestUtils.getScm(conf);
     nodeManager = (SCMNodeManager) scm.getScmNodeManager();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index 2e0d0b1..e200d6f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -34,11 +34,14 @@ import org.junit.Test;
 import java.util.*;
 import java.util.stream.Collectors;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
 /**
  * Test for PipelinePlacementPolicy.
  */
 public class TestPipelinePlacementPolicy {
   private MockNodeManager nodeManager;
+  private OzoneConfiguration conf;
   private PipelinePlacementPolicy placementPolicy;
   private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10;
 
@@ -46,8 +49,10 @@ public class TestPipelinePlacementPolicy {
   public void init() throws Exception {
     nodeManager = new MockNodeManager(true,
         PIPELINE_PLACEMENT_MAX_NODES_COUNT);
+    conf = new OzoneConfiguration();
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5);
     placementPolicy =
-        new PipelinePlacementPolicy(nodeManager, new OzoneConfiguration());
+        new PipelinePlacementPolicy(nodeManager, conf);
   }
 
   @Test
@@ -179,7 +184,9 @@ public class TestPipelinePlacementPolicy {
     }
 
     int considerHeavyCount =
-        ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT + 1;
+        conf.getInt(
+            ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+            ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT) + 1;
 
     Node2PipelineMap mockMap = new Node2PipelineMap();
     for (DatanodeDetails node : nodes) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index c583559..9bccb1a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -169,7 +169,7 @@ public class TestPipelineClose {
         new PipelineActionHandler(pipelineManager, conf);
     pipelineActionHandler
         .onMessage(pipelineActionsFromDatanode, new EventQueue());
-    Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 1.2));
+    Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 10));
     OzoneContainer ozoneContainer =
         cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
             .getContainer();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 00144e4..0a8c5ad 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hdds.scm.pipeline;
 
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -33,6 +32,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
 /**
  * Test for RatisPipelineProvider.
  */
@@ -46,14 +47,17 @@ public class TestRatisPipelineProvider {
   public void init() throws Exception {
     nodeManager = new MockNodeManager(true, 10);
     stateManager = new PipelineStateManager(new OzoneConfiguration());
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
     provider = new MockRatisPipelineProvider(nodeManager,
-        stateManager, new OzoneConfiguration());
+        stateManager, conf);
   }
 
   private void createPipelineAndAssertions(
           HddsProtos.ReplicationFactor factor) throws IOException {
     Pipeline pipeline = provider.create(factor);
     stateManager.addPipeline(pipeline);
+    nodeManager.addPipeline(pipeline);
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(pipeline.getPipelineState(),
@@ -61,10 +65,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
     Pipeline pipeline1 = provider.create(factor);
     stateManager.addPipeline(pipeline1);
-    // New pipeline should not overlap with the previous created pipeline
-    Assert.assertTrue(
-        CollectionUtils.intersection(pipeline.getNodes(), pipeline1.getNodes())
-            .isEmpty());
+    nodeManager.addPipeline(pipeline1);
     Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline1.getFactor(), factor);
     Assert.assertEquals(pipeline1.getPipelineState(),
@@ -77,6 +78,7 @@ public class TestRatisPipelineProvider {
     HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
     Pipeline pipeline = provider.create(factor);
     stateManager.addPipeline(pipeline);
+    nodeManager.addPipeline(pipeline);
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(pipeline.getPipelineState(),
@@ -86,11 +88,7 @@ public class TestRatisPipelineProvider {
     factor = HddsProtos.ReplicationFactor.ONE;
     Pipeline pipeline1 = provider.create(factor);
     stateManager.addPipeline(pipeline1);
-    // New pipeline should overlap with the previous created pipeline,
-    // and one datanode should overlap between the two types.
-    Assert.assertEquals(
-        CollectionUtils.intersection(pipeline.getNodes(),
-            pipeline1.getNodes()).size(), 1);
+    nodeManager.addPipeline(pipeline1);
     Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline1.getFactor(), factor);
     Assert.assertEquals(pipeline1.getPipelineState(),
@@ -154,6 +152,10 @@ public class TestRatisPipelineProvider {
         .build();
 
     stateManager.addPipeline(openPipeline);
+    nodeManager.addPipeline(openPipeline);
+    for (DatanodeDetails node : openPipeline.getNodes()) {
+      System.out.println("open pipeline contains " + node.getUuid());
+    }
 
     // Use up next 3 DNs also for an open pipeline.
     List<DatanodeDetails> moreOpenPiplineDns = nodeManager.getAllNodes()
@@ -166,6 +168,7 @@ public class TestRatisPipelineProvider {
         .setId(PipelineID.randomId())
         .build();
     stateManager.addPipeline(anotherOpenPipeline);
+    nodeManager.addPipeline(anotherOpenPipeline);
 
     // Use up next 3 DNs also for a closed pipeline.
     List<DatanodeDetails> closedPiplineDns = nodeManager.getAllNodes()
@@ -178,6 +181,7 @@ public class TestRatisPipelineProvider {
         .setId(PipelineID.randomId())
         .build();
     stateManager.addPipeline(anotherClosedPipeline);
+    nodeManager.addPipeline(anotherClosedPipeline);
 
     Pipeline pipeline = provider.create(factor);
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
@@ -187,12 +191,6 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
     List<DatanodeDetails> pipelineNodes = pipeline.getNodes();
 
-    // Pipline nodes cannot be from open pipelines.
-    Assert.assertTrue(
-        pipelineNodes.parallelStream().filter(dn ->
-        (openPiplineDns.contains(dn) || moreOpenPiplineDns.contains(dn)))
-        .count() == 0);
-
     // Since we have only 10 DNs, at least 1 pipeline node should have been
     // from the closed pipeline DN list.
     Assert.assertTrue(pipelineNodes.parallelStream().filter(
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 2a486b1..9d59960 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.pipeline;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 
@@ -28,6 +29,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -58,6 +60,7 @@ public class TestSCMPipelineManager {
   @Before
   public void setUp() throws Exception {
     conf = new OzoneConfiguration();
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
     testDir = GenericTestUtils
         .getTestDir(TestSCMPipelineManager.class.getSimpleName());
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
@@ -253,10 +256,8 @@ public class TestSCMPipelineManager {
       pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
           HddsProtos.ReplicationFactor.THREE);
       Assert.fail();
-    } catch (InsufficientDatanodesException idEx) {
-      Assert.assertEquals(
-          "Cannot create pipeline of factor 3 using 1 nodes.",
-          idEx.getMessage());
+    } catch (SCMException idEx) {
+      // pipeline creation failed this time.
     }
 
     metrics = getMetrics(
@@ -266,7 +267,7 @@ public class TestSCMPipelineManager {
 
     numPipelineCreateFailed = getLongCounter(
         "NumPipelineCreationFailed", metrics);
-    Assert.assertTrue(numPipelineCreateFailed == 0);
+    Assert.assertTrue(numPipelineCreateFailed == 1);
   }
 
   @Test
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 0aba968..19c1406 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -36,6 +36,8 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
 /**
  * Interface used for MiniOzoneClusters.
  */
@@ -269,6 +271,8 @@ public interface MiniOzoneCluster {
 
     protected Builder(OzoneConfiguration conf) {
       this.conf = conf;
+      // MiniOzoneCluster doesn't have pipeline engagement limit.
+      conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0);
       setClusterId(UUID.randomUUID().toString());
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: hdfs-commits-help@hadoop.apache.org