You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2021/07/14 22:25:54 UTC

[ozone] branch master updated: HDDS-5296. Replication Manager should not sent replicate command to DN which is not IN_SERVICE (#2377)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new c41d08f  HDDS-5296. Replication Manager should not sent replicate command to DN which is not IN_SERVICE (#2377)
c41d08f is described below

commit c41d08f40f56d10b2a67ca02c178813ea49cd7a1
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Thu Jul 15 06:25:31 2021 +0800

    HDDS-5296. Replication Manager should not sent replicate command to DN which is not IN_SERVICE (#2377)
---
 .../common/statemachine/DatanodeStateMachine.java  |  2 +-
 .../replication/ReplicationSupervisor.java         | 33 +++++++--
 .../replication/TestReplicationSupervisor.java     |  4 +-
 .../algorithms/SCMContainerPlacementCapacity.java  |  2 +-
 .../algorithms/SCMContainerPlacementRackAware.java | 55 +++++++++------
 .../algorithms/SCMContainerPlacementRandom.java    |  2 +-
 .../apache/hadoop/hdds/scm/node/NodeStatus.java    |  5 ++
 .../algorithms/TestContainerPlacementFactory.java  | 35 ++++++----
 .../TestSCMContainerPlacementRackAware.java        | 81 +++++++++++++---------
 .../hdds/scm/node/states/TestNodeStateMap.java     | 14 ++--
 .../ozone/container/TestContainerReplication.java  | 81 +++++++++++++++++++++-
 .../apache/hadoop/ozone/container/TestHelper.java  |  2 +-
 12 files changed, 224 insertions(+), 92 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index a08213f..5446ec5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -164,7 +164,7 @@ public class DatanodeStateMachine implements Closeable {
     replicatorMetrics = new MeasuredReplicator(replicator);
 
     supervisor =
-        new ReplicationSupervisor(container.getContainerSet(),
+        new ReplicationSupervisor(container.getContainerSet(), context,
             replicatorMetrics, dnConf.getReplicationMaxStreams());
 
     // When we add new handlers just adding a new handler here should do the
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index b049171..cb0ce56 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -25,7 +25,10 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -44,6 +47,7 @@ public class ReplicationSupervisor {
   private final ContainerSet containerSet;
   private final ContainerReplicator replicator;
   private final ExecutorService executor;
+  private final StateContext context;
 
   private final AtomicLong requestCounter = new AtomicLong();
   private final AtomicLong successCounter = new AtomicLong();
@@ -58,20 +62,19 @@ public class ReplicationSupervisor {
 
   @VisibleForTesting
   ReplicationSupervisor(
-      ContainerSet containerSet, ContainerReplicator replicator,
-      ExecutorService executor
-  ) {
+      ContainerSet containerSet, StateContext context,
+      ContainerReplicator replicator, ExecutorService executor) {
     this.containerSet = containerSet;
     this.replicator = replicator;
     this.containersInFlight = ConcurrentHashMap.newKeySet();
     this.executor = executor;
+    this.context = context;
   }
 
   public ReplicationSupervisor(
-      ContainerSet containerSet,
-      ContainerReplicator replicator, int poolSize
-  ) {
-    this(containerSet, replicator, new ThreadPoolExecutor(
+      ContainerSet containerSet, StateContext context,
+      ContainerReplicator replicator, int poolSize) {
+    this(containerSet, context, replicator, new ThreadPoolExecutor(
         poolSize, poolSize, 60, TimeUnit.SECONDS,
         new LinkedBlockingQueue<>(),
         new ThreadFactoryBuilder().setDaemon(true)
@@ -79,6 +82,11 @@ public class ReplicationSupervisor {
             .build()));
   }
 
+  public ReplicationSupervisor(ContainerSet containerSet,
+      ContainerReplicator replicator, int poolSize) {
+    this(containerSet, null, replicator, poolSize);
+  }
+
   /**
    * Queue an asynchronous download of the given container.
    */
@@ -133,6 +141,17 @@ public class ReplicationSupervisor {
       try {
         requestCounter.incrementAndGet();
 
+        if (context != null) {
+          DatanodeDetails dn = context.getParent().getDatanodeDetails();
+          if (dn.getPersistedOpState() !=
+              HddsProtos.NodeOperationalState.IN_SERVICE) {
+            LOG.info("Dn is of {} state. Ignore this replicate container " +
+                "command for container {}", dn.getPersistedOpState(),
+                containerId);
+            return;
+          }
+        }
+
         if (containerSet.getContainer(task.getContainerId()) != null) {
           LOG.debug("Container {} has already been downloaded.", containerId);
           return;
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index f8eb25e..27c6065 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -181,7 +181,7 @@ public class TestReplicationSupervisor {
   @Test
   public void testDownloadAndImportReplicatorFailure() {
     ReplicationSupervisor supervisor =
-        new ReplicationSupervisor(set, mutableReplicator,
+        new ReplicationSupervisor(set, null, mutableReplicator,
             newDirectExecutorService());
 
     // Mock to fetch an exception in the importContainer method.
@@ -216,7 +216,7 @@ public class TestReplicationSupervisor {
       Function<ReplicationSupervisor, ContainerReplicator> replicatorFactory,
       ExecutorService executor) {
     ReplicationSupervisor supervisor =
-        new ReplicationSupervisor(set, mutableReplicator, executor);
+        new ReplicationSupervisor(set, null, mutableReplicator, executor);
     replicatorRef.set(replicatorFactory.apply(supervisor));
     return supervisor;
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
index 7d2db05..47806ed 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
@@ -69,7 +69,7 @@ import org.slf4j.LoggerFactory;
 public final class SCMContainerPlacementCapacity
     extends SCMCommonPlacementPolicy {
   @VisibleForTesting
-  static final Logger LOG =
+  public static final Logger LOG =
       LoggerFactory.getLogger(SCMContainerPlacementCapacity.class);
 
   /**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
index 113b518..d3eb136 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.net.NetConstants;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,7 +50,7 @@ import java.util.List;
 public final class SCMContainerPlacementRackAware
     extends SCMCommonPlacementPolicy {
   @VisibleForTesting
-  static final Logger LOG =
+  public static final Logger LOG =
       LoggerFactory.getLogger(SCMContainerPlacementRackAware.class);
   private final NetworkTopology networkTopology;
   private boolean fallback;
@@ -105,8 +106,8 @@ public final class SCMContainerPlacementRackAware
     if (datanodeCount < nodesRequired + excludedNodesCount) {
       throw new SCMException("No enough datanodes to choose. " +
           "TotalNode = " + datanodeCount +
-          "RequiredNode = " + nodesRequired +
-          "ExcludedNode = " + excludedNodesCount, null);
+          " RequiredNode = " + nodesRequired +
+          " ExcludedNode = " + excludedNodesCount, null);
     }
     List<DatanodeDetails> mutableFavoredNodes = favoredNodes;
     // sanity check of favoredNodes
@@ -275,28 +276,38 @@ public final class SCMContainerPlacementRackAware
         throw new SCMException("No satisfied datanode to meet the" +
             " excludedNodes and affinityNode constrains.", null);
       }
-      if (super.hasEnoughSpace((DatanodeDetails)node, sizeRequired)) {
-        LOG.debug("Datanode {} is chosen. Required size is {}",
-            node.toString(), sizeRequired);
-        metrics.incrDatanodeChooseSuccessCount();
-        if (isFallbacked) {
-          metrics.incrDatanodeChooseFallbackCount();
-        }
-        return node;
+
+      DatanodeDetails datanodeDetails = (DatanodeDetails)node;
+      DatanodeInfo datanodeInfo = (DatanodeInfo)getNodeManager()
+          .getNodeByUuid(datanodeDetails.getUuidString());
+      if (datanodeInfo == null) {
+        LOG.error("Failed to find the DatanodeInfo for datanode {}",
+            datanodeDetails);
       } else {
-        maxRetry--;
-        if (maxRetry == 0) {
-          // avoid the infinite loop
-          String errMsg = "No satisfied datanode to meet the space constrains. "
-              + " sizeRequired: " + sizeRequired;
-          LOG.info(errMsg);
-          throw new SCMException(errMsg, null);
-        }
-        if (excludedNodesForCapacity == null) {
-          excludedNodesForCapacity = new ArrayList<>();
+        if (datanodeInfo.getNodeStatus().isNodeWritable() &&
+            (super.hasEnoughSpace(datanodeInfo, sizeRequired))) {
+          LOG.debug("Datanode {} is chosen. Required storage size is {} bytes",
+              node, sizeRequired);
+          metrics.incrDatanodeChooseSuccessCount();
+          if (isFallbacked) {
+            metrics.incrDatanodeChooseFallbackCount();
+          }
+          return node;
         }
-        excludedNodesForCapacity.add(node.getNetworkFullPath());
       }
+
+      maxRetry--;
+      if (maxRetry == 0) {
+        // avoid the infinite loop
+        String errMsg = "No satisfied datanode to meet the space constrains. "
+            + " sizeRequired: " + sizeRequired;
+        LOG.info(errMsg);
+        throw new SCMException(errMsg, null);
+      }
+      if (excludedNodesForCapacity == null) {
+        excludedNodesForCapacity = new ArrayList<>();
+      }
+      excludedNodesForCapacity.add(node.getNetworkFullPath());
     }
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
index 4927517..b30f767 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
@@ -42,7 +42,7 @@ import java.util.List;
 public final class SCMContainerPlacementRandom extends SCMCommonPlacementPolicy
     implements PlacementPolicy {
   @VisibleForTesting
-  static final Logger LOG =
+  public static final Logger LOG =
       LoggerFactory.getLogger(SCMContainerPlacementRandom.class);
 
   /**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
index 153af92..a9164c7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
@@ -69,6 +69,11 @@ public class NodeStatus {
         HddsProtos.NodeState.DEAD);
   }
 
+  public boolean isNodeWritable() {
+    return health == HddsProtos.NodeState.HEALTHY &&
+        operationalState == HddsProtos.NodeOperationalState.IN_SERVICE;
+  }
+
   public HddsProtos.NodeState getHealth() {
     return health;
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
index f851287..c37da63 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
@@ -59,7 +59,8 @@ public class TestContainerPlacementFactory {
   // network topology cluster
   private NetworkTopology cluster;
   // datanodes array list
-  private List<DatanodeInfo> datanodes = new ArrayList<>();
+  private List<DatanodeDetails> datanodes = new ArrayList<>();
+  private List<DatanodeInfo> dnInfos = new ArrayList<>();
   // node storage capacity
   private static final long STORAGE_CAPACITY = 100L;
   // configuration
@@ -90,9 +91,10 @@ public class TestContainerPlacementFactory {
     String hostname = "node";
     for (int i = 0; i < 15; i++) {
       // Totally 3 racks, each has 5 datanodes
+      DatanodeDetails datanodeDetails = MockDatanodeDetails
+          .createDatanodeDetails(hostname + i, rack + (i / 5));
       DatanodeInfo datanodeInfo = new DatanodeInfo(
-          MockDatanodeDetails.createDatanodeDetails(
-          hostname + i, rack + (i / 5)), NodeStatus.inServiceHealthy(),
+          datanodeDetails, NodeStatus.inServiceHealthy(),
           UpgradeUtils.defaultLayoutVersionProto());
 
       StorageReportProto storage1 = TestUtils.createStorageReport(
@@ -107,33 +109,38 @@ public class TestContainerPlacementFactory {
       datanodeInfo.updateMetaDataStorageReports(
           new ArrayList<>(Arrays.asList(metaStorage1)));
 
-      datanodes.add(datanodeInfo);
-      cluster.add(datanodeInfo);
+      datanodes.add(datanodeDetails);
+      cluster.add(datanodeDetails);
+      dnInfos.add(datanodeInfo);
     }
 
     StorageReportProto storage2 = TestUtils.createStorageReport(
-        datanodes.get(2).getUuid(),
-        "/data1-" + datanodes.get(2).getUuidString(),
+        dnInfos.get(2).getUuid(),
+        "/data1-" + dnInfos.get(2).getUuidString(),
         STORAGE_CAPACITY, 90L, 10L, null);
-    datanodes.get(2).updateStorageReports(
+    dnInfos.get(2).updateStorageReports(
         new ArrayList<>(Arrays.asList(storage2)));
     StorageReportProto storage3 = TestUtils.createStorageReport(
-        datanodes.get(3).getUuid(),
-        "/data1-" + datanodes.get(3).getUuidString(),
+        dnInfos.get(3).getUuid(),
+        "/data1-" + dnInfos.get(3).getUuidString(),
         STORAGE_CAPACITY, 80L, 20L, null);
-    datanodes.get(3).updateStorageReports(
+    dnInfos.get(3).updateStorageReports(
         new ArrayList<>(Arrays.asList(storage3)));
     StorageReportProto storage4 = TestUtils.createStorageReport(
-        datanodes.get(4).getUuid(),
-        "/data1-" + datanodes.get(4).getUuidString(),
+        dnInfos.get(4).getUuid(),
+        "/data1-" + dnInfos.get(4).getUuidString(),
         STORAGE_CAPACITY, 70L, 30L, null);
-    datanodes.get(4).updateStorageReports(
+    dnInfos.get(4).updateStorageReports(
         new ArrayList<>(Arrays.asList(storage4)));
 
     // create mock node manager
     nodeManager = Mockito.mock(NodeManager.class);
     when(nodeManager.getNodes(NodeStatus.inServiceHealthy()))
         .thenReturn(new ArrayList<>(datanodes));
+    for (DatanodeInfo dn: dnInfos) {
+      when(nodeManager.getNodeByUuid(dn.getUuidString()))
+          .thenReturn(dn);
+    }
 
     PlacementPolicy policy = ContainerPlacementPolicyFactory
         .getPolicy(conf, nodeManager, cluster, true,
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
index 611c204..87e77ff 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
@@ -53,6 +53,7 @@ import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
 import org.hamcrest.MatcherAssert;
+
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -70,7 +71,8 @@ public class TestSCMContainerPlacementRackAware {
   private OzoneConfiguration conf;
   private NodeManager nodeManager;
   private final Integer datanodeCount;
-  private final List<DatanodeInfo> datanodes = new ArrayList<>();
+  private final List<DatanodeDetails> datanodes = new ArrayList<>();
+  private final List<DatanodeInfo> dnInfos = new ArrayList<>();
   // policy with fallback capability
   private SCMContainerPlacementRackAware policy;
   // policy prohibit fallback
@@ -107,10 +109,11 @@ public class TestSCMContainerPlacementRackAware {
     String hostname = "node";
     for (int i = 0; i < datanodeCount; i++) {
       // Totally 3 racks, each has 5 datanodes
-      DatanodeInfo datanodeInfo = new DatanodeInfo(
+      DatanodeDetails datanodeDetails =
           MockDatanodeDetails.createDatanodeDetails(
-              hostname + i, rack + (i / NODE_PER_RACK)),
-          NodeStatus.inServiceHealthy(),
+          hostname + i, rack + (i / NODE_PER_RACK));
+      DatanodeInfo datanodeInfo = new DatanodeInfo(
+          datanodeDetails, NodeStatus.inServiceHealthy(),
           UpgradeUtils.defaultLayoutVersionProto());
 
       StorageReportProto storage1 = TestUtils.createStorageReport(
@@ -125,48 +128,49 @@ public class TestSCMContainerPlacementRackAware {
       datanodeInfo.updateMetaDataStorageReports(
           new ArrayList<>(Arrays.asList(metaStorage1)));
 
-      datanodes.add(datanodeInfo);
-      cluster.add(datanodeInfo);
+      datanodes.add(datanodeDetails);
+      cluster.add(datanodeDetails);
+      dnInfos.add(datanodeInfo);
     }
 
     if (datanodeCount > 4) {
       StorageReportProto storage2 = TestUtils.createStorageReport(
-          datanodes.get(2).getUuid(),
+          dnInfos.get(2).getUuid(),
           "/data1-" + datanodes.get(2).getUuidString(),
           STORAGE_CAPACITY, 90L, 10L, null);
-      datanodes.get(2).updateStorageReports(
+      dnInfos.get(2).updateStorageReports(
           new ArrayList<>(Arrays.asList(storage2)));
       StorageReportProto storage3 = TestUtils.createStorageReport(
-          datanodes.get(3).getUuid(),
-          "/data1-" + datanodes.get(3).getUuidString(),
+          dnInfos.get(3).getUuid(),
+          "/data1-" + dnInfos.get(3).getUuidString(),
           STORAGE_CAPACITY, 80L, 20L, null);
-      datanodes.get(3).updateStorageReports(
+      dnInfos.get(3).updateStorageReports(
           new ArrayList<>(Arrays.asList(storage3)));
       StorageReportProto storage4 = TestUtils.createStorageReport(
-          datanodes.get(4).getUuid(),
-          "/data1-" + datanodes.get(4).getUuidString(),
+          dnInfos.get(4).getUuid(),
+          "/data1-" + dnInfos.get(4).getUuidString(),
           STORAGE_CAPACITY, 70L, 30L, null);
-      datanodes.get(4).updateStorageReports(
+      dnInfos.get(4).updateStorageReports(
           new ArrayList<>(Arrays.asList(storage4)));
     } else if (datanodeCount > 3) {
       StorageReportProto storage2 = TestUtils.createStorageReport(
-          datanodes.get(2).getUuid(),
-          "/data1-" + datanodes.get(2).getUuidString(),
+          dnInfos.get(2).getUuid(),
+          "/data1-" + dnInfos.get(2).getUuidString(),
           STORAGE_CAPACITY, 90L, 10L, null);
-      datanodes.get(2).updateStorageReports(
+      dnInfos.get(2).updateStorageReports(
           new ArrayList<>(Arrays.asList(storage2)));
       StorageReportProto storage3 = TestUtils.createStorageReport(
-          datanodes.get(3).getUuid(),
-          "/data1-" + datanodes.get(3).getUuidString(),
+          dnInfos.get(3).getUuid(),
+          "/data1-" + dnInfos.get(3).getUuidString(),
           STORAGE_CAPACITY, 80L, 20L, null);
-      datanodes.get(3).updateStorageReports(
+      dnInfos.get(3).updateStorageReports(
           new ArrayList<>(Arrays.asList(storage3)));
     } else if (datanodeCount > 2) {
       StorageReportProto storage2 = TestUtils.createStorageReport(
-          datanodes.get(2).getUuid(),
-          "/data1-" + datanodes.get(2).getUuidString(),
+          dnInfos.get(2).getUuid(),
+          "/data1-" + dnInfos.get(2).getUuidString(),
           STORAGE_CAPACITY, 84L, 16L, null);
-      datanodes.get(2).updateStorageReports(
+      dnInfos.get(2).updateStorageReports(
           new ArrayList<>(Arrays.asList(storage2)));
     }
 
@@ -174,6 +178,10 @@ public class TestSCMContainerPlacementRackAware {
     nodeManager = Mockito.mock(NodeManager.class);
     when(nodeManager.getNodes(NodeStatus.inServiceHealthy()))
         .thenReturn(new ArrayList<>(datanodes));
+    for (DatanodeInfo dn: dnInfos) {
+      when(nodeManager.getNodeByUuid(dn.getUuidString()))
+          .thenReturn(dn);
+    }
     when(nodeManager.getClusterNetworkTopologyMap())
         .thenReturn(cluster);
 
@@ -400,33 +408,40 @@ public class TestSCMContainerPlacementRackAware {
   @Test
   public void testDatanodeWithDefaultNetworkLocation() throws SCMException {
     String hostname = "node";
-    List<DatanodeInfo> dataList = new ArrayList<>();
+    List<DatanodeInfo> dnInfoList = new ArrayList<>();
+    List<DatanodeDetails> dataList = new ArrayList<>();
     NetworkTopology clusterMap =
         new NetworkTopologyImpl(NodeSchemaManager.getInstance());
     for (int i = 0; i < 15; i++) {
       // Totally 3 racks, each has 5 datanodes
-      DatanodeInfo datanodeInfo = new DatanodeInfo(
-          MockDatanodeDetails.createDatanodeDetails(
-          hostname + i, null), NodeStatus.inServiceHealthy(),
+      DatanodeDetails dn = MockDatanodeDetails.createDatanodeDetails(
+          hostname + i, null);
+      DatanodeInfo dnInfo = new DatanodeInfo(
+          dn, NodeStatus.inServiceHealthy(),
           UpgradeUtils.defaultLayoutVersionProto());
 
       StorageReportProto storage1 = TestUtils.createStorageReport(
-          datanodeInfo.getUuid(), "/data1-" + datanodeInfo.getUuidString(),
+          dnInfo.getUuid(), "/data1-" + dnInfo.getUuidString(),
           STORAGE_CAPACITY, 0, 100L, null);
       MetadataStorageReportProto metaStorage1 =
           TestUtils.createMetadataStorageReport(
-          "/metadata1-" + datanodeInfo.getUuidString(),
+          "/metadata1-" + dnInfo.getUuidString(),
           STORAGE_CAPACITY, 0, 100L, null);
-      datanodeInfo.updateStorageReports(
+      dnInfo.updateStorageReports(
           new ArrayList<>(Arrays.asList(storage1)));
-      datanodeInfo.updateMetaDataStorageReports(
+      dnInfo.updateMetaDataStorageReports(
           new ArrayList<>(Arrays.asList(metaStorage1)));
 
-      dataList.add(datanodeInfo);
-      clusterMap.add(datanodeInfo);
+      dataList.add(dn);
+      clusterMap.add(dn);
+      dnInfoList.add(dnInfo);
     }
     Assert.assertEquals(dataList.size(), StringUtils.countMatches(
         clusterMap.toString(), NetConstants.DEFAULT_RACK));
+    for (DatanodeInfo dn: dnInfoList) {
+      when(nodeManager.getNodeByUuid(dn.getUuidString()))
+          .thenReturn(dn);
+    }
 
     // choose nodes to host 3 replica
     int nodeNum = 3;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java
index b59f806..4de5eea 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java
@@ -127,18 +127,16 @@ public class TestNodeStateMap {
    */
   @Test
   public void testConcurrency() throws Exception {
-    NodeStateMap nodeStateMap = new NodeStateMap();
-
     final DatanodeDetails datanodeDetails =
         MockDatanodeDetails.randomDatanodeDetails();
 
-    nodeStateMap.addNode(datanodeDetails, NodeStatus.inServiceHealthy(), null);
+    map.addNode(datanodeDetails, NodeStatus.inServiceHealthy(), null);
 
     UUID dnUuid = datanodeDetails.getUuid();
 
-    nodeStateMap.addContainer(dnUuid, ContainerID.valueOf(1L));
-    nodeStateMap.addContainer(dnUuid, ContainerID.valueOf(2L));
-    nodeStateMap.addContainer(dnUuid, ContainerID.valueOf(3L));
+    map.addContainer(dnUuid, ContainerID.valueOf(1L));
+    map.addContainer(dnUuid, ContainerID.valueOf(2L));
+    map.addContainer(dnUuid, ContainerID.valueOf(3L));
 
     CountDownLatch elementRemoved = new CountDownLatch(1);
     CountDownLatch loopStarted = new CountDownLatch(1);
@@ -146,7 +144,7 @@ public class TestNodeStateMap {
     new Thread(() -> {
       try {
         loopStarted.await();
-        nodeStateMap.removeContainer(dnUuid, ContainerID.valueOf(1L));
+        map.removeContainer(dnUuid, ContainerID.valueOf(1L));
         elementRemoved.countDown();
       } catch (Exception e) {
         e.printStackTrace();
@@ -155,7 +153,7 @@ public class TestNodeStateMap {
     }).start();
 
     boolean first = true;
-    for (ContainerID key : nodeStateMap.getContainers(dnUuid)) {
+    for (ContainerID key : map.getContainers(dnUuid)) {
       if (first) {
         loopStarted.countDown();
         elementRemoved.await();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
index 8562554..f0cdc49 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
@@ -20,21 +20,31 @@ package org.apache.hadoop.ozone.container;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 import static org.apache.hadoop.ozone.container.TestHelper.waitForContainerClose;
 import static org.apache.hadoop.ozone.container.TestHelper.waitForReplicaCount;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackAware;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.node.NodeDecommissionManager;
+import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
@@ -46,22 +56,27 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 
+import org.apache.ozone.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.event.Level;
 
 /**
  * Tests ozone containers replication.
  */
+@RunWith(Parameterized.class)
 public class TestContainerReplication {
   /**
    * Set the timeout for every test.
    */
   @Rule
-  public Timeout testTimeout = Timeout.seconds(300);;
+  public Timeout testTimeout = Timeout.seconds(300);
 
   private static final String VOLUME = "vol1";
   private static final String BUCKET = "bucket1";
@@ -69,12 +84,32 @@ public class TestContainerReplication {
 
   private MiniOzoneCluster cluster;
   private OzoneClient client;
+  private String placementPolicyClass;
+
+  @Parameterized.Parameters
+  public static List<String> parameters() {
+    List<String> classes = new ArrayList<>();
+    classes.add(SCMContainerPlacementRackAware.class.getCanonicalName());
+    classes.add(SCMContainerPlacementCapacity.class.getCanonicalName());
+    classes.add(SCMContainerPlacementRandom.class.getCanonicalName());
+    return classes;
+  }
+
+  public TestContainerReplication(String placementPolicy) {
+    this.placementPolicyClass = placementPolicy;
+  }
 
   @Before
   public void setUp() throws Exception {
+    GenericTestUtils.setLogLevel(SCMContainerPlacementRandom.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(SCMContainerPlacementCapacity.LOG,
+        Level.DEBUG);
+    GenericTestUtils.setLogLevel(SCMContainerPlacementRackAware.LOG,
+        Level.DEBUG);
     OzoneConfiguration conf = createConfiguration();
+    conf.set(OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, placementPolicyClass);
 
-    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4).build();
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
     cluster.waitForClusterToBeReady();
 
     client = OzoneClientFactory.getRpcClient(conf);
@@ -103,6 +138,48 @@ public class TestContainerReplication {
     waitForReplicaCount(containerID, 3, cluster);
   }
 
+  @Test
+  public void testSkipDecommissionAndMaintenanceNode() throws Exception {
+    List<OmKeyLocationInfo> keyLocations = lookupKey(cluster);
+    assertFalse(keyLocations.isEmpty());
+
+    OmKeyLocationInfo keyLocation = keyLocations.get(0);
+    long containerID = keyLocation.getContainerID();
+    waitForContainerClose(cluster, containerID);
+
+    // Mark other two DN Decommission and Maintenance
+    NodeDecommissionManager decommissionManager =
+        cluster.getStorageContainerManager().getScmDecommissionManager();
+    boolean deCommission = true;
+    for (HddsDatanodeService d1 : cluster.getHddsDatanodes()) {
+      boolean match = false;
+      for (DatanodeDetails d2 : keyLocations.get(0).getPipeline().getNodes()) {
+        if (d1.getDatanodeDetails().equals(d2)) {
+          match = true;
+          break;
+        }
+      }
+      if (!match) {
+        if (deCommission) {
+          decommissionManager.startDecommission(d1.getDatanodeDetails());
+          deCommission = false;
+        } else {
+          decommissionManager.startMaintenance(d1.getDatanodeDetails(), 1);
+        }
+      }
+    }
+
+    cluster.shutdownHddsDatanode(keyLocation.getPipeline().getFirstNode());
+
+    waitForReplicaCount(containerID, 2, cluster);
+    try {
+      waitForReplicaCount(containerID, 3, cluster);
+      fail("Replication should not succeed without extra IN_SERVICE nodes");
+    } catch (TimeoutException e) {
+      Assert.assertTrue(TestHelper.countReplicas(containerID, cluster) == 2);
+    }
+  }
+
   private static OzoneConfiguration createConfiguration() {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
index 48a9a09..aa50234 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
@@ -382,6 +382,6 @@ public final class TestHelper {
   public static void waitForReplicaCount(long containerID, int count,
       MiniOzoneCluster cluster) throws TimeoutException, InterruptedException {
     GenericTestUtils.waitFor(() -> countReplicas(containerID, cluster) == count,
-        1000, 30_000);
+        1000, 30000);
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org