You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2019/11/28 08:38:06 UTC

[hadoop-ozone] branch HDDS-1880-Decom updated: HDDS-2459. Refactor ReplicationManager to consider maintenance states

This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch HDDS-1880-Decom
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/HDDS-1880-Decom by this push:
     new 920b8c5  HDDS-2459. Refactor ReplicationManager to consider maintenance states
920b8c5 is described below

commit 920b8c5eddc6f25138692636a66270ee22c8dfa4
Author: S O'Donnell <so...@cloudera.com>
AuthorDate: Thu Nov 28 09:32:13 2019 +0100

    HDDS-2459. Refactor ReplicationManager to consider maintenance states
    
    Closes #262
---
 .../proto/StorageContainerDatanodeProtocol.proto   |   2 +
 .../hdds/scm/container/ContainerReplicaCount.java  | 241 ++++++++++++
 .../hdds/scm/container/ReplicationManager.java     | 224 +++++++----
 .../apache/hadoop/hdds/scm/node/NodeStatus.java    |  69 ++++
 .../hdds/scm/server/StorageContainerManager.java   |   3 +-
 .../hdds/scm/container/TestReplicationManager.java | 425 ++++++++++++++++++++-
 .../TestSCMContainerPlacementRackAware.java        |   6 -
 .../states/TestContainerReplicaCount.java          | 333 ++++++++++++++++
 .../hdds/scm/node/TestDatanodeAdminMonitor.java    |   3 +
 .../hdds/scm/safemode/TestSafeModeHandler.java     |   6 +-
 .../scm/node/TestDecommissionAndMaintenance.java   |   1 -
 11 files changed, 1229 insertions(+), 84 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 45a1db6..6c30ba1 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -185,6 +185,8 @@ message ContainerReplicaProto {
     CLOSED = 4;
     UNHEALTHY = 5;
     INVALID = 6;
+    DECOMMISSIONED = 7;
+    MAINTENANCE = 8;
   }
   required int64 containerID = 1;
   required State state = 2;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java
new file mode 100644
index 0000000..a7ea56d
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import java.util.Set;
+
+/**
+ * Immutable object that is created with a set of ContainerReplica objects and
+ * the number of in flight replica add and deletes, the container replication
+ * factor and the min count which must be available for maintenance. This
+ * information can be used to determine if the container is over or under
+ * replicated and also how many additional replicas need created or removed.
+ */
+public class ContainerReplicaCount {
+
+  private int healthyCount = 0;
+  private int decommissionCount = 0;
+  private int maintenanceCount = 0;
+  private int inFlightAdd = 0;
+  private int inFlightDel = 0;
+  private int repFactor;
+  private int minHealthyForMaintenance;
+  private Set<ContainerReplica> replica;
+
+  public ContainerReplicaCount(Set<ContainerReplica> replica, int inFlightAdd,
+                               int inFlightDelete, int replicationFactor,
+                               int minHealthyForMaintenance) {
+    this.healthyCount = 0;
+    this.decommissionCount = 0;
+    this.maintenanceCount = 0;
+    this.inFlightAdd = inFlightAdd;
+    this.inFlightDel = inFlightDelete;
+    this.repFactor = replicationFactor;
+    this.replica = replica;
+    this.minHealthyForMaintenance
+        = Math.min(this.repFactor, minHealthyForMaintenance);
+
+    for (ContainerReplica cr : this.replica) {
+      ContainerReplicaProto.State state = cr.getState();
+      if (state == ContainerReplicaProto.State.DECOMMISSIONED) {
+        decommissionCount++;
+      } else if (state == ContainerReplicaProto.State.MAINTENANCE) {
+        maintenanceCount++;
+      } else {
+        healthyCount++;
+      }
+    }
+  }
+
+  public int getHealthyCount() {
+    return healthyCount;
+  }
+
+  public int getDecommissionCount() {
+    return decommissionCount;
+  }
+
+  public int getMaintenanceCount() {
+    return maintenanceCount;
+  }
+
+  public int getReplicationFactor() {
+    return repFactor;
+  }
+
+  public Set<ContainerReplica> getReplica() {
+    return replica;
+  }
+
+  @Override
+  public String toString() {
+    return "Replica Count: "+replica.size()+
+        " Healthy Count: "+healthyCount+
+        " Decommission Count: "+decommissionCount+
+        " Maintenance Count: "+maintenanceCount+
+        " inFlightAdd Count: "+inFlightAdd+
+        " inFightDel Count: "+inFlightDel+
+        " ReplicationFactor: "+repFactor+
+        " minMaintenance Count: "+minHealthyForMaintenance;
+  }
+
+  /**
+   * Calculates the the delta of replicas which need to be created or removed
+   * to ensure the container is correctly replicated when considered inflight
+   * adds and deletes.
+   *
+   * When considering inflight operations, it is assumed any operation will
+   * fail. However, to consider the worst case and avoid data loss, we always
+   * assume a delete will succeed and and add will fail. In this way, we will
+   * avoid scheduling too many deletes which could result in dataloss.
+   *
+   * Decisions around over-replication are made only on healthy replicas,
+   * ignoring any in maintenance and also any inflight adds. InFlight adds are
+   * ignored, as they may not complete, so if we have:
+   *
+   *     H, H, H, IN_FLIGHT_ADD
+   *
+   * And then schedule a delete, we could end up under-replicated (add fails,
+   * delete completes). It is better to let the inflight operations complete
+   * and then deal with any further over or under replication.
+   *
+   * For maintenance replicas, assuming replication factor 3, and minHealthy
+   * 2, it is possible for all 3 hosts to be put into maintenance, leaving the
+   * following (H = healthy, M = maintenance):
+   *
+   *     H, H, M, M, M
+   *
+   * Even though we are tracking 5 replicas, this is not over replicated as we
+   * ignore the maintenance copies. Later, the replicas could look like:
+   *
+   *     H, H, H, H, M
+   *
+   * At this stage, the container is over replicated by 1, so one replica can be
+   * removed.
+   *
+   * For containers which have replication factor healthy replica, we ignore any
+   * inflight add or deletes, as they may fail. Instead, wait for them to
+   * complete and then deal with any excess or deficit.
+   *
+   * For under replicated containers we do consider inflight add and delete to
+   * avoid scheduling more adds than needed. There is additional logic around
+   * containers with maintenance replica to ensure minHealthyForMaintenance
+   * replia are maintained.
+   *
+   * @return Delta of replicas needed. Negative indicates over replication and
+   *         containers should be removed. Positive indicates over replication
+   *         and zero indicates the containers has replicationFactor healthy
+   *         replica
+   */
+  public int additionalReplicaNeeded() {
+    int delta = missingReplicas();
+
+    if (delta < 0) {
+      // Over replicated, so may need to remove a container. Do not consider
+      // inFlightAdds, as they may fail, but do consider inFlightDel which
+      // will reduce the over-replication if it completes.
+      // Note this could make the delta positive if there are too many in flight
+      // deletes, which will result in an additional being scheduled.
+      return delta + inFlightDel;
+    } else {
+      // May be under or perfectly replicated.
+      // We must consider in flight add and delete when calculating the new
+      // containers needed, but we bound the lower limit at zero to allow
+      // inflight operations to complete before handling any potential over
+      // replication
+      return Math.max(0, delta - inFlightAdd + inFlightDel);
+    }
+  }
+
+  /**
+   * Returns the count of replicas which need to be created or removed to
+   * ensure the container is perfectly replicate. Inflight operations are not
+   * considered here, but the logic to determine the missing or excess counts
+   * for maintenance is present.
+   *
+   * Decisions around over-replication are made only on healthy replicas,
+   * ignoring any in maintenance. For example, if we have:
+   *
+   *     H, H, H, M, M
+   *
+   * This will not be consider over replicated until one of the Maintenance
+   * replicas moves to Healthy.
+   *
+   * If the container is perfectly replicated, zero will be return.
+   *
+   * If it is under replicated a positive value will be returned, indicating
+   * how many replicas must be added.
+   *
+   * If it is over replicated a negative value will be returned, indicating now
+   * many replicas to remove.
+   *
+   * @return Zero if the container is perfectly replicated, a positive value
+   *         for under replicated and a negative value for over replicated.
+   */
+  private int missingReplicas() {
+    int delta = repFactor - healthyCount;
+
+    if (delta < 0) {
+      // Over replicated, so may need to remove a container.
+      return delta;
+    } else if (delta > 0) {
+      // May be under-replicated, depending on maintenance.
+      delta = Math.max(0, delta - maintenanceCount);
+      int neededHealthy =
+          Math.max(0, minHealthyForMaintenance - healthyCount);
+      delta = Math.max(neededHealthy, delta);
+      return delta;
+    } else { // delta == 0
+      // We have exactly the number of healthy replicas needed.
+      return delta;
+    }
+  }
+
+  /**
+   * Return true if the container is sufficiently replicated. Decommissioning
+   * and Decommissioned containers are ignored in this check, assuming they will
+   * eventually be removed from the cluster.
+   * This check ignores inflight additions, as those replicas have not yet been
+   * created and the create could fail for some reason.
+   * The check does consider inflight deletes as there may be 3 healthy replicas
+   * now, but once the delete completes it will reduce to 2.
+   * We also assume a replica in Maintenance state cannot be removed, so the
+   * pending delete would affect only the healthy replica count.
+   *
+   * @return True if the container is sufficiently replicated and False
+   *         otherwise.
+   */
+  public boolean isSufficientlyReplicated() {
+    return missingReplicas() + inFlightDel <= 0;
+  }
+
+  /**
+   * Return true is the container is over replicated. Decommission and
+   * maintenance containers are ignored for this check.
+   * The check ignores inflight additions, as they may fail, but it does
+   * consider inflight deletes, as they would reduce the over replication when
+   * they complete.
+   *
+   * @return True if the container is over replicated, false otherwise.
+   */
+  public boolean isOverReplicated() {
+    return missingReplicas() + inFlightDel < 0;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index 37afd36..58e38a2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.metrics2.MetricsCollector;
 import org.apache.hadoop.metrics2.MetricsInfo;
@@ -98,6 +99,11 @@ public class ReplicationManager implements MetricsSource {
   private final LockManager<ContainerID> lockManager;
 
   /**
+   * Used to lookup the health of a nodes or the nodes operational state.
+   */
+  private final NodeManager nodeManager;
+
+  /**
    * This is used for tracking container replication commands which are issued
    * by ReplicationManager and not yet complete.
    */
@@ -127,6 +133,11 @@ public class ReplicationManager implements MetricsSource {
   private volatile boolean running;
 
   /**
+   * Minimum number of replica in a healthy state for maintenance.
+   */
+  private int minHealthyForMaintenance;
+
+  /**
    * Constructs ReplicationManager instance with the given configuration.
    *
    * @param conf OzoneConfiguration
@@ -138,15 +149,18 @@ public class ReplicationManager implements MetricsSource {
                             final ContainerManager containerManager,
                             final ContainerPlacementPolicy containerPlacement,
                             final EventPublisher eventPublisher,
-                            final LockManager<ContainerID> lockManager) {
+                            final LockManager<ContainerID> lockManager,
+                            final NodeManager nodeManager) {
     this.containerManager = containerManager;
     this.containerPlacement = containerPlacement;
     this.eventPublisher = eventPublisher;
     this.lockManager = lockManager;
+    this.nodeManager = nodeManager;
     this.conf = conf;
     this.running = false;
     this.inflightReplication = new ConcurrentHashMap<>();
     this.inflightDeletion = new ConcurrentHashMap<>();
+    this.minHealthyForMaintenance = conf.getMaintenanceReplicaMinimum();
   }
 
   /**
@@ -241,7 +255,7 @@ public class ReplicationManager implements MetricsSource {
    * @param id ContainerID
    */
   private void processContainer(ContainerID id) {
-    lockManager.lock(id);
+    lockManager.writeLock(id);
     try {
       final ContainerInfo container = containerManager.getContainer(id);
       final Set<ContainerReplica> replicas = containerManager
@@ -291,24 +305,15 @@ public class ReplicationManager implements MetricsSource {
           action -> replicas.stream()
               .noneMatch(r -> r.getDatanodeDetails().equals(action.datanode)));
 
-
-      /*
-       * We don't have to take any action if the container is healthy.
-       *
-       * According to ReplicationMonitor container is considered healthy if
-       * the container is either in QUASI_CLOSED or in CLOSED state and has
-       * exact number of replicas in the same state.
-       */
-      if (isContainerHealthy(container, replicas)) {
-        return;
-      }
+      ContainerReplicaCount replicaSet =
+          getContainerReplicaCount(container, replicas);
 
       /*
        * Check if the container is under replicated and take appropriate
        * action.
        */
-      if (isContainerUnderReplicated(container, replicas)) {
-        handleUnderReplicatedContainer(container, replicas);
+      if (!replicaSet.isSufficientlyReplicated()) {
+        handleUnderReplicatedContainer(container, replicaSet);
         return;
       }
 
@@ -316,22 +321,24 @@ public class ReplicationManager implements MetricsSource {
        * Check if the container is over replicated and take appropriate
        * action.
        */
-      if (isContainerOverReplicated(container, replicas)) {
-        handleOverReplicatedContainer(container, replicas);
+      if (replicaSet.isOverReplicated()) {
+        handleOverReplicatedContainer(container, replicaSet);
         return;
       }
 
       /*
-       * The container is neither under nor over replicated and the container
-       * is not healthy. This means that the container has unhealthy/corrupted
-       * replica.
+       If we get here, the container is not over replicated or under replicated
+       but it may be "unhealthy", which means it has one or more replica which
+       are not in the same state as the container itself.
        */
-      handleUnstableContainer(container, replicas);
+      if (!isContainerHealthy(container, replicas)) {
+        handleUnstableContainer(container, replicas);
+      }
 
     } catch (ContainerNotFoundException ex) {
       LOG.warn("Missing container {}.", id);
     } finally {
-      lockManager.unlock(id);
+      lockManager.writeUnlock(id);
     }
   }
 
@@ -361,7 +368,8 @@ public class ReplicationManager implements MetricsSource {
    * Returns true if the container is healthy according to ReplicationMonitor.
    *
    * According to ReplicationMonitor container is considered healthy if
-   * it has exact number of replicas in the same state as the container.
+   * all replica which are not in a decommission or maintenance state are in
+   * the same state as the container and in QUASI_CLOSED or in CLOSED state.
    *
    * @param container Container to check
    * @param replicas Set of ContainerReplicas
@@ -369,50 +377,76 @@ public class ReplicationManager implements MetricsSource {
    */
   private boolean isContainerHealthy(final ContainerInfo container,
                                      final Set<ContainerReplica> replicas) {
-    return container.getReplicationFactor().getNumber() == replicas.size() &&
-        replicas.stream().allMatch(
-            r -> compareState(container.getState(), r.getState()));
+    return (container.getState() == LifeCycleState.CLOSED
+        || container.getState() == LifeCycleState.QUASI_CLOSED)
+        && replicas.stream()
+        .filter(r -> r.getState() != State.DECOMMISSIONED)
+        .filter(r -> r.getState() != State.MAINTENANCE)
+        .allMatch(r -> compareState(container.getState(), r.getState()));
   }
 
   /**
-   * Checks if the container is under replicated or not.
-   *
-   * @param container Container to check
-   * @param replicas Set of ContainerReplicas
-   * @return true if the container is under replicated, false otherwise
+   * Returns the number replica which are pending creation for the given
+   * container ID.
+   * @param id The ContainerID for which to check the pending replica
+   * @return The number of inflight additions or zero if none
    */
-  private boolean isContainerUnderReplicated(final ContainerInfo container,
-      final Set<ContainerReplica> replicas) {
-    return container.getReplicationFactor().getNumber() >
-        getReplicaCount(container.containerID(), replicas);
+  private int getInflightAdd(final ContainerID id) {
+    return inflightReplication.getOrDefault(id, Collections.emptyList()).size();
   }
 
   /**
-   * Checks if the container is over replicated or not.
-   *
-   * @param container Container to check
-   * @param replicas Set of ContainerReplicas
-   * @return true if the container if over replicated, false otherwise
+   * Returns the number replica which are pending delete for the given
+   * container ID.
+   * @param id The ContainerID for which to check the pending replica
+   * @return The number of inflight deletes or zero if none
    */
-  private boolean isContainerOverReplicated(final ContainerInfo container,
-      final Set<ContainerReplica> replicas) {
-    return container.getReplicationFactor().getNumber() <
-        getReplicaCount(container.containerID(), replicas);
+  private int getInflightDel(final ContainerID id) {
+    return inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
   }
 
   /**
-   * Returns the replication count of the given container. This also
-   * considers inflight replication and deletion.
+   * Given a container, obtain the set of known replica for it, and return a
+   * ContainerReplicaCount object. This object will contain the set of replica
+   * as well as all information required to determine if the container is over
+   * or under replicated, including the delta of replica required to repair the
+   * over or under replication.
    *
-   * @param id ContainerID
-   * @param replicas Set of existing replicas
-   * @return number of estimated replicas for this container
-   */
-  private int getReplicaCount(final ContainerID id,
-                              final Set<ContainerReplica> replicas) {
-    return replicas.size()
-        + inflightReplication.getOrDefault(id, Collections.emptyList()).size()
-        - inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
+   * @param container The container to create a ContainerReplicaCount for
+   * @return ContainerReplicaCount representing the replicated state of the
+   *         container.
+   * @throws ContainerNotFoundException
+   */
+  public ContainerReplicaCount getContainerReplicaCount(ContainerInfo container)
+      throws ContainerNotFoundException {
+    lockManager.readLock(container.containerID());
+    try {
+      final Set<ContainerReplica> replica = containerManager
+          .getContainerReplicas(container.containerID());
+      return getContainerReplicaCount(container, replica);
+    } finally {
+      lockManager.readUnlock(container.containerID());
+    }
+  }
+
+  /**
+   * Given a container and its set of replicas, create and return a
+   * ContainerReplicaCount representing the container.
+   *
+   * @param container The container for which to construct a
+   *                  ContainerReplicaCount
+   * @param replica The set of existing replica for this container
+   * @return ContainerReplicaCount representing the current state of the
+   *         container
+   */
+  private ContainerReplicaCount getContainerReplicaCount(
+      ContainerInfo container, Set<ContainerReplica> replica) {
+    return new ContainerReplicaCount(
+        replica,
+        getInflightAdd(container.containerID()),
+        getInflightDel(container.containerID()),
+        container.getReplicationFactor().getNumber(),
+        minHealthyForMaintenance);
   }
 
   /**
@@ -478,13 +512,29 @@ public class ReplicationManager implements MetricsSource {
    * and send replicate container command to the identified datanode(s).
    *
    * @param container ContainerInfo
-   * @param replicas Set of ContainerReplicas
+   * @param replicaSet An instance of ContainerReplicaCount, containing the
+   *                   current replica count and inflight adds and deletes
    */
   private void handleUnderReplicatedContainer(final ContainerInfo container,
-      final Set<ContainerReplica> replicas) {
-    LOG.debug("Handling underreplicated container: {}",
+      final ContainerReplicaCount replicaSet) {
+    LOG.debug("Handling under replicated container: {}",
         container.getContainerID());
+    Set<ContainerReplica> replicas = replicaSet.getReplica();
     try {
+
+      if (replicaSet.isSufficientlyReplicated()) {
+        LOG.info("The container {} with replicas {} is sufficiently "+
+            "replicated", container.getContainerID(), replicaSet);
+        return;
+      }
+      int repDelta = replicaSet.additionalReplicaNeeded();
+      if (repDelta <= 0) {
+        LOG.info("The container {} with {} is not sufficiently " +
+            "replicated but no further replicas will be scheduled until "+
+            "in-flight operations complete",
+            container.getContainerID(), replicaSet);
+        return;
+      }
       final ContainerID id = container.containerID();
       final List<DatanodeDetails> deletionInFlight = inflightDeletion
           .getOrDefault(id, Collections.emptyList())
@@ -494,15 +544,19 @@ public class ReplicationManager implements MetricsSource {
       final List<DatanodeDetails> source = replicas.stream()
           .filter(r ->
               r.getState() == State.QUASI_CLOSED ||
-              r.getState() == State.CLOSED)
+              r.getState() == State.CLOSED ||
+              r.getState() == State.DECOMMISSIONED ||
+              r.getState() == State.MAINTENANCE)
+          // Exclude stale and dead nodes. This is particularly important for
+          // maintenance nodes, as the replicas will remain present in the
+          // container manager, even when they go dead.
+          .filter(r ->
+              nodeManager.getNodeStatus(r.getDatanodeDetails()).isHealthy())
           .filter(r -> !deletionInFlight.contains(r.getDatanodeDetails()))
           .sorted((r1, r2) -> r2.getSequenceId().compareTo(r1.getSequenceId()))
           .map(ContainerReplica::getDatanodeDetails)
           .collect(Collectors.toList());
       if (source.size() > 0) {
-        final int replicationFactor = container
-            .getReplicationFactor().getNumber();
-        final int delta = replicationFactor - getReplicaCount(id, replicas);
         final List<DatanodeDetails> excludeList = replicas.stream()
             .map(ContainerReplica::getDatanodeDetails)
             .collect(Collectors.toList());
@@ -511,13 +565,14 @@ public class ReplicationManager implements MetricsSource {
           actionList.stream().map(r -> r.datanode)
               .forEach(excludeList::add);
         }
+        // At this point we have all live source nodes and we have consider
         final List<DatanodeDetails> selectedDatanodes = containerPlacement
-            .chooseDatanodes(excludeList, null, delta,
+            .chooseDatanodes(excludeList, null, repDelta,
                 container.getUsedBytes());
 
         LOG.info("Container {} is under replicated. Expected replica count" +
-                " is {}, but found {}.", id, replicationFactor,
-            replicationFactor - delta);
+                " is {}, but found {}. An additional {} replica are needed",
+            id, replicaSet.getReplicationFactor(), replicaSet, repDelta);
 
         for (DatanodeDetails datanode : selectedDatanodes) {
           sendReplicateCommand(container, datanode, source);
@@ -538,17 +593,16 @@ public class ReplicationManager implements MetricsSource {
    * identified datanode(s).
    *
    * @param container ContainerInfo
-   * @param replicas Set of ContainerReplicas
+   * @param replicaSet An instance of ContainerReplicaCount, containing the
+   *                   current replica count and inflight adds and deletes
    */
   private void handleOverReplicatedContainer(final ContainerInfo container,
-      final Set<ContainerReplica> replicas) {
+      final ContainerReplicaCount replicaSet) {
 
+    final Set<ContainerReplica> replicas = replicaSet.getReplica();
     final ContainerID id = container.containerID();
     final int replicationFactor = container.getReplicationFactor().getNumber();
-    // Dont consider inflight replication while calculating excess here.
-    final int excess = replicas.size() - replicationFactor -
-        inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
-
+    final int excess = replicaSet.additionalReplicaNeeded() * -1;
     if (excess > 0) {
 
       LOG.info("Container {} is over replicated. Expected replica count" +
@@ -566,6 +620,11 @@ public class ReplicationManager implements MetricsSource {
       // Retain one healthy replica per origin node Id.
       final List<ContainerReplica> eligibleReplicas = new ArrayList<>(replicas);
       eligibleReplicas.removeAll(uniqueReplicas.values());
+      // Replica which are maintenance or decommissioned are not eligible to
+      // be removed, as they do not count toward over-replication and they also
+      // many not be available
+      eligibleReplicas.removeIf(r -> (r.getState() == State.MAINTENANCE
+          || r.getState() == State.DECOMMISSIONED));
 
       final List<ContainerReplica> unhealthyReplicas = eligibleReplicas
           .stream()
@@ -801,6 +860,12 @@ public class ReplicationManager implements MetricsSource {
      */
     private long eventTimeout = 10 * 60 * 1000;
 
+    /**
+     * The number of container replica which must be available for a node to
+     * enter maintenance.
+     */
+    private int maintenanceReplicaMinimum = 2;
+
     @Config(key = "thread.interval",
         type = ConfigType.TIME,
         defaultValue = "300s",
@@ -825,6 +890,19 @@ public class ReplicationManager implements MetricsSource {
       this.eventTimeout = eventTimeout;
     }
 
+    @Config(key = "maintenance.replica.minimum",
+        type = ConfigType.INT,
+        defaultValue = "2",
+        tags = {SCM, OZONE},
+        description = "The minimum number of container replicas which must " +
+            " be available for a node to enter maintenance. If putting a " +
+            " node into maintenance reduces the available replicas for any " +
+            " container below this level, the node will remain in the " +
+            " entering maintenance state until a new replica is created.")
+    public void setMaintenanceReplicaMinimum(int replicaCount) {
+      this.maintenanceReplicaMinimum = replicaCount;
+    }
+
     public long getInterval() {
       return interval;
     }
@@ -832,6 +910,10 @@ public class ReplicationManager implements MetricsSource {
     public long getEventTimeout() {
       return eventTimeout;
     }
+
+    public int getMaintenanceReplicaMinimum() {
+      return maintenanceReplicaMinimum;
+    }
   }
 
   /**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
index 0776c28..5c1adf7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java
@@ -61,6 +61,75 @@ public class NodeStatus {
     return operationalState;
   }
 
+  /**
+   * Returns true if the nodeStatus indicates the node is in any decommission
+   * state.
+   *
+   * @return True if the node is in any decommission state, false otherwise
+   */
+  public boolean isDecommission() {
+    return operationalState == HddsProtos.NodeOperationalState.DECOMMISSIONING
+        || operationalState == HddsProtos.NodeOperationalState.DECOMMISSIONED;
+  }
+
+  /**
+   * Returns true if the node is currently decommissioning.
+   *
+   * @return True if the node is decommissioning, false otherwise
+   */
+  public boolean isDecommissioning() {
+    return operationalState == HddsProtos.NodeOperationalState.DECOMMISSIONING;
+  }
+
+  /**
+   * Returns true if the node is decommissioned.
+   *
+   * @return True if the node is decommissioned, false otherwise
+   */
+  public boolean isDecommissioned() {
+    return operationalState == HddsProtos.NodeOperationalState.DECOMMISSIONED;
+  }
+
+  /**
+   * Returns true if the node is in any maintenance state.
+   *
+   * @return True if the node is in any maintenance state, false otherwise
+   */
+  public boolean isMaintenance() {
+    return operationalState
+        == HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE
+        || operationalState == HddsProtos.NodeOperationalState.IN_MAINTENANCE;
+  }
+
+  /**
+   * Returns true if the node is currently entering maintenance.
+   *
+   * @return True if the node is entering maintenance, false otherwise
+   */
+  public boolean isEnteringMaintenance() {
+    return operationalState
+        == HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
+  }
+
+  /**
+   * Returns true if the node is currently in maintenance.
+   *
+   * @return True if the node is in maintenance, false otherwise.
+   */
+  public boolean isInMaintenance() {
+    return operationalState == HddsProtos.NodeOperationalState.IN_MAINTENANCE;
+  }
+
+  /**
+   * Returns true if the nodeStatus is healthy (ie not stale or dead) and false
+   * otherwise.
+   *
+   * @return True if the node is Healthy, false otherwise
+   */
+  public boolean isHealthy() {
+    return health == HddsProtos.NodeState.HEALTHY;
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 4df38a3..d285e19 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -434,7 +434,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
           containerManager,
           containerPlacementPolicy,
           eventQueue,
-          new LockManager<>(conf));
+          new LockManager<>(conf),
+          scmNodeManager);
     }
     if(configurator.getScmSafeModeManager() != null) {
       scmSafeModeManager = configurator.getScmSafeModeManager();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index 1631447..158b1bd 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -21,21 +21,36 @@ package org.apache.hadoop.hdds.scm.container;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager
+    .ReplicationManagerConfiguration;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.lock.LockManager;
+import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -69,12 +84,16 @@ public class TestReplicationManager {
   private ContainerPlacementPolicy containerPlacementPolicy;
   private EventQueue eventQueue;
   private DatanodeCommandHandler datanodeCommandHandler;
+  private SimpleNodeManager nodeManager;
+  private ContainerManager containerManager;
+  private Configuration conf;
 
   @Before
   public void setup() throws IOException, InterruptedException {
-    final Configuration conf = new OzoneConfiguration();
-    final ContainerManager containerManager =
+    conf = new OzoneConfiguration();
+    containerManager =
         Mockito.mock(ContainerManager.class);
+    nodeManager = new SimpleNodeManager();
     eventQueue = new EventQueue();
     containerStateManager = new ContainerStateManager(conf);
 
@@ -106,12 +125,27 @@ public class TestReplicationManager {
               .collect(Collectors.toList());
         });
 
+    createReplicationManager(new ReplicationManagerConfiguration());
     replicationManager = new ReplicationManager(
         new ReplicationManagerConfiguration(),
         containerManager,
         containerPlacementPolicy,
         eventQueue,
-        new LockManager<>(conf));
+        new LockManager<>(conf),
+        nodeManager);
+    replicationManager.start();
+    Thread.sleep(100L);
+  }
+
+  private void createReplicationManager(ReplicationManagerConfiguration rmConf)
+      throws InterruptedException {
+    replicationManager = new ReplicationManager(
+        rmConf,
+        containerManager,
+        containerPlacementPolicy,
+        eventQueue,
+        new LockManager<>(conf),
+        nodeManager);
     replicationManager.start();
     Thread.sleep(100L);
   }
@@ -606,6 +640,213 @@ public class TestReplicationManager {
 
   }
 
+  /**
+   * ReplicationManager should replicate an additional replica if there are
+   * decommissioned replicas.
+   */
+  @Test
+  public void testUnderReplicatedDueToDecommission() throws
+      SCMException, ContainerNotFoundException, InterruptedException {
+    final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+        State.CLOSED, State.DECOMMISSIONED, State.DECOMMISSIONED);
+    assertReplicaScheduled(2);
+  }
+
+  /**
+   * ReplicationManager should replicate an additional replica when all copies
+   * are decommissioning.
+   */
+  @Test
+  public void testUnderReplicatedDueToAllDecommission() throws
+      SCMException, ContainerNotFoundException, InterruptedException {
+    final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+        State.DECOMMISSIONED, State.DECOMMISSIONED, State.DECOMMISSIONED);
+    assertReplicaScheduled(3);
+  }
+
+  /**
+   * ReplicationManager should not take any action when the container is
+   * correctly replicated with decommissioned replicas still present.
+   */
+  @Test
+  public void testCorrectlyReplicatedWithDecommission() throws
+      SCMException, ContainerNotFoundException, InterruptedException {
+    final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+        State.CLOSED, State.CLOSED, State.CLOSED, State.DECOMMISSIONED);
+    assertReplicaScheduled(0);
+  }
+
+  /**
+   * ReplicationManager should replicate an additional replica when min rep
+   * is not met for maintenance.
+   */
+  @Test
+  public void testUnderReplicatedDueToMaintenance() throws
+      SCMException, ContainerNotFoundException, InterruptedException {
+    final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+        State.CLOSED, State.MAINTENANCE, State.MAINTENANCE);
+    assertReplicaScheduled(1);
+  }
+
+  /**
+   * ReplicationManager should not replicate an additional replica when if
+   * min replica for maintenance is 1 and another replica is available.
+   */
+  @Test
+  public void testNotUnderReplicatedDueToMaintenanceMinRepOne() throws
+      SCMException, ContainerNotFoundException, InterruptedException {
+    replicationManager.stop();
+    ReplicationManagerConfiguration newConf =
+        new ReplicationManagerConfiguration();
+    newConf.setMaintenanceReplicaMinimum(1);
+    createReplicationManager(newConf);
+    final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+        State.CLOSED, State.MAINTENANCE, State.MAINTENANCE);
+    assertReplicaScheduled(0);
+  }
+
+  /**
+   * ReplicationManager should replicate an additional replica when all copies
+   * are going off line and min rep is 1.
+   */
+  @Test
+  public void testUnderReplicatedDueToMaintenanceMinRepOne() throws
+      SCMException, ContainerNotFoundException, InterruptedException {
+    replicationManager.stop();
+    ReplicationManagerConfiguration newConf =
+        new ReplicationManagerConfiguration();
+    newConf.setMaintenanceReplicaMinimum(1);
+    createReplicationManager(newConf);
+    final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+        State.MAINTENANCE, State.MAINTENANCE, State.MAINTENANCE);
+    assertReplicaScheduled(1);
+  }
+
+  /**
+   * ReplicationManager should replicate additional replica when all copies
+   * are going into maintenance.
+   */
+  @Test
+  public void testUnderReplicatedDueToAllMaintenance() throws
+      SCMException, ContainerNotFoundException, InterruptedException {
+    final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+        State.MAINTENANCE, State.MAINTENANCE, State.MAINTENANCE);
+    assertReplicaScheduled(2);
+  }
+
+  /**
+   * ReplicationManager should not replicate additional replica sufficient
+   * replica are available.
+   */
+  @Test
+  public void testCorrectlyReplicatedWithMaintenance() throws
+      SCMException, ContainerNotFoundException, InterruptedException {
+    final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+        State.CLOSED, State.CLOSED, State.MAINTENANCE, State.MAINTENANCE);
+    assertReplicaScheduled(0);
+  }
+
+  /**
+   * ReplicationManager should replicate additional replica when all copies
+   * are decommissioning or maintenance.
+   */
+  @Test
+  public void testUnderReplicatedWithDecommissionAndMaintenance() throws
+      SCMException, ContainerNotFoundException, InterruptedException {
+    final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+        State.DECOMMISSIONED, State.DECOMMISSIONED, State.MAINTENANCE,
+        State.MAINTENANCE);
+    assertReplicaScheduled(2);
+  }
+
+  /**
+   * When a CLOSED container is over replicated, ReplicationManager
+   * deletes the excess replicas. While choosing the replica for deletion
+   * ReplicationManager should not attempt to remove a DECOMMISSION or
+   * MAINTENANCE replica.
+   */
+  @Test
+  public void testOverReplicatedClosedContainerWithDecomAndMaint()
+      throws SCMException, ContainerNotFoundException, InterruptedException {
+    final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+        State.DECOMMISSIONED, State.MAINTENANCE,
+        State.CLOSED, State.CLOSED, State.CLOSED, State.CLOSED);
+
+    final int currentDeleteCommandCount = datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
+
+    replicationManager.processContainersNow();
+    // Wait for EventQueue to call the event handler
+    Thread.sleep(100L);
+    Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+    // Get the DECOM and Maint replica and ensure none of them are scheduled
+    // for removal
+    Set<ContainerReplica> decom =
+        containerStateManager.getContainerReplicas(container.containerID())
+        .stream()
+        .filter(r -> r.getState() != State.CLOSED)
+        .collect(Collectors.toSet());
+    for (ContainerReplica r : decom) {
+      Assert.assertFalse(datanodeCommandHandler.received(
+          SCMCommandProto.Type.deleteContainerCommand,
+          r.getDatanodeDetails()));
+    }
+  }
+
+  /**
+   * Replication Manager should not attempt to replicate from an unhealthy
+   * (stale or dead) node. To test this, setup a scenario where a replia needs
+   * to be created, but mark all nodes stale. That way, no new replica will be
+   * scheduled.
+   */
+  @Test
+  public void testUnderReplicatedNotHealthySource()
+      throws SCMException, ContainerNotFoundException, InterruptedException {
+    final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED,
+        NodeStatus.inServiceStale(),
+        State.CLOSED, State.DECOMMISSIONED, State.DECOMMISSIONED);
+    // There should be replica scheduled, but as all nodes are stale, nothing
+    // gets scheduled.
+    assertReplicaScheduled(0);
+  }
+
+  private ContainerInfo setupReplicas(
+      LifeCycleState containerState, State... states)
+      throws SCMException, ContainerNotFoundException {
+    return setupReplicas(containerState, NodeStatus.inServiceHealthy(), states);
+  }
+
+  private ContainerInfo setupReplicas(
+      LifeCycleState containerState, NodeStatus allNodesStatus, State... states)
+      throws SCMException, ContainerNotFoundException {
+    final ContainerInfo container = getContainer(containerState);
+    final ContainerID id = container.containerID();
+    containerStateManager.loadContainer(container);
+    final UUID originNodeId = UUID.randomUUID();
+
+    for (State s : states) {
+      DatanodeDetails dn = randomDatanodeDetails();
+      nodeManager.register(dn, allNodesStatus);
+      final ContainerReplica replica = getReplicas(
+          id, s, 1000L, originNodeId, dn);
+      containerStateManager.updateContainerReplica(id, replica);
+    }
+    return container;
+  }
+
+  private void assertReplicaScheduled(int delta) throws InterruptedException {
+    final int currentReplicateCommandCount = datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
+
+    replicationManager.processContainersNow();
+    // Wait for EventQueue to call the event handler
+    Thread.sleep(100L);
+    Assert.assertEquals(currentReplicateCommandCount + delta,
+        datanodeCommandHandler.getInvocationCount(
+            SCMCommandProto.Type.replicateContainerCommand));
+  }
+
   @After
   public void teardown() throws IOException {
     containerStateManager.close();
@@ -659,4 +900,180 @@ public class TestReplicationManager {
     }
   }
 
+  private class SimpleNodeManager implements NodeManager {
+
+    private Map<UUID, DatanodeInfo> nodeMap = new HashMap();
+
+    public void register(DatanodeDetails dd, NodeStatus status) {
+      nodeMap.put(dd.getUuid(), new DatanodeInfo(dd, status));
+    }
+
+    /**
+     * If the given node was registed with the nodeManager, return the
+     * NodeStatus for the node. Otherwise return a NodeStatus of "In Service
+     * and Healthy".
+     * @param datanodeDetails DatanodeDetails
+     * @return The NodeStatus of the node if it is registered, otherwise an
+     *         Inservice and Healthy NodeStatus.
+     */
+    @Override
+    public NodeStatus getNodeStatus(DatanodeDetails datanodeDetails) {
+      DatanodeInfo dni = nodeMap.get(datanodeDetails.getUuid());
+      if (dni != null) {
+        return dni.getNodeStatus();
+      } else {
+        return NodeStatus.inServiceHealthy();
+      }
+    }
+
+    /**
+     * Below here, are all auto-generate placeholder methods to implement the
+     * interface.
+     */
+    @Override
+    public List<DatanodeDetails> getNodes(NodeStatus nodeStatus) {
+      return null;
+    }
+
+    @Override
+    public List<DatanodeDetails> getNodes(
+        HddsProtos.NodeOperationalState opState, HddsProtos.NodeState health) {
+      return null;
+    }
+
+    @Override
+    public int getNodeCount(NodeStatus nodeStatus) {
+      return 0;
+    }
+
+    @Override
+    public int getNodeCount(HddsProtos.NodeOperationalState opState,
+        HddsProtos.NodeState health) {
+      return 0;
+    }
+
+    @Override
+    public List<DatanodeDetails> getAllNodes() {
+      return null;
+    }
+
+    @Override
+    public SCMNodeStat getStats() {
+      return null;
+    }
+
+    @Override
+    public Map<DatanodeDetails, SCMNodeStat> getNodeStats() {
+      return null;
+    }
+
+    @Override
+    public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) {
+      return null;
+    }
+
+    @Override
+    public void setNodeOperationalState(DatanodeDetails datanodeDetails,
+        HddsProtos.NodeOperationalState newState) throws NodeNotFoundException {
+    }
+
+    @Override
+    public Set<PipelineID> getPipelines(DatanodeDetails datanodeDetails) {
+      return null;
+    }
+
+    @Override
+    public void addPipeline(Pipeline pipeline) {
+    }
+
+    @Override
+    public void removePipeline(Pipeline pipeline) {
+    }
+
+    @Override
+    public void addContainer(DatanodeDetails datanodeDetails,
+        ContainerID containerId) throws NodeNotFoundException {
+    }
+
+    @Override
+    public void setContainers(DatanodeDetails datanodeDetails,
+        Set<ContainerID> containerIds) throws NodeNotFoundException {
+    }
+
+    @Override
+    public Set<ContainerID> getContainers(DatanodeDetails datanodeDetails)
+        throws NodeNotFoundException {
+      return null;
+    }
+
+    @Override
+    public void addDatanodeCommand(UUID dnId, SCMCommand command) {
+    }
+
+    @Override
+    public void processNodeReport(DatanodeDetails datanodeDetails,
+        StorageContainerDatanodeProtocolProtos.NodeReportProto nodeReport) {
+    }
+
+    @Override
+    public List<SCMCommand> getCommandQueue(UUID dnID) {
+      return null;
+    }
+
+    @Override
+    public DatanodeDetails getNodeByUuid(String uuid) {
+      return null;
+    }
+
+    @Override
+    public List<DatanodeDetails> getNodesByAddress(String address) {
+      return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public Map<String, Integer> getNodeCount() {
+      return null;
+    }
+
+    @Override
+    public Map<String, Long> getNodeInfo() {
+      return null;
+    }
+
+    @Override
+    public void onMessage(CommandForDatanode commandForDatanode,
+        EventPublisher publisher) {
+    }
+
+    @Override
+    public VersionResponse getVersion(
+        StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto
+            versionRequest) {
+      return null;
+    }
+
+    @Override
+    public RegisteredCommand register(DatanodeDetails datanodeDetails,
+        StorageContainerDatanodeProtocolProtos.NodeReportProto nodeReport,
+        StorageContainerDatanodeProtocolProtos.PipelineReportsProto
+        pipelineReport) {
+      return null;
+    }
+
+    @Override
+    public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) {
+      return null;
+    }
+
+    @Override
+    public Boolean isNodeRegistered(DatanodeDetails datanodeDetails) {
+      return null;
+    }
+  }
+
 }
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
index 003035c..abf7f9f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
@@ -47,17 +47,11 @@ import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
 import org.hamcrest.MatcherAssert;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import org.junit.Assert;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 import static org.mockito.Matchers.anyObject;
-import org.mockito.Mockito;
 import static org.mockito.Mockito.when;
 
 /**
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerReplicaCount.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerReplicaCount.java
new file mode 100644
index 0000000..9a50232
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerReplicaCount.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.states;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaCount;
+import org.junit.Before;
+import org.junit.Test;
+import java.util.*;
+import static junit.framework.TestCase.assertEquals;
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State
+    .DECOMMISSIONED;
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State
+    .MAINTENANCE;
+
+/**
+ * Class used to test the ContainerReplicaCount class.
+ */
+public class TestContainerReplicaCount {
+
+  @Before
+  public void setup() {
+  }
+
+  @Test
+  public void testThreeHealthyReplica() {
+    Set<ContainerReplica> replica = registerNodes(CLOSED, CLOSED, CLOSED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+    validate(rcnt, true, 0, false);
+  }
+
+  @Test
+  public void testTwoHealthyReplica() {
+    Set<ContainerReplica> replica = registerNodes(CLOSED, CLOSED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+    validate(rcnt, false, 1, false);
+  }
+
+  @Test
+  public void testOneHealthyReplica() {
+    Set<ContainerReplica> replica = registerNodes(CLOSED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+    validate(rcnt, false, 2, false);
+  }
+
+  @Test
+  public void testTwoHealthyAndInflightAdd() {
+    Set<ContainerReplica> replica = registerNodes(CLOSED, CLOSED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 3, 2);
+    validate(rcnt, false, 0, false);
+  }
+
+  @Test
+  /**
+   * This does not schedule a container to be removed, as the inFlight add may
+   * fail and then the delete would make things under-replicated. Once the add
+   * completes there will be 4 healthy and it will get taken care of then.
+   */
+  public void testThreeHealthyAndInflightAdd() {
+    Set<ContainerReplica> replica = registerNodes(CLOSED, CLOSED, CLOSED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 3, 2);
+    validate(rcnt, true, 0, false);
+  }
+
+  @Test
+  /**
+   * As the inflight delete may fail, but as it will make the the container
+   * under replicated, we go ahead and schedule another replica to be added.
+   */
+  public void testThreeHealthyAndInflightDelete() {
+    Set<ContainerReplica> replica = registerNodes(CLOSED, CLOSED, CLOSED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 1, 3, 2);
+    validate(rcnt, false, 1, false);
+  }
+
+  @Test
+  /**
+   * This is NOT sufficiently replicated as the inflight add may fail and the
+   * inflight del could succeed, leaving only 2 healthy replicas.
+   */
+  public void testThreeHealthyAndInflightAddAndInFlightDelete() {
+    Set<ContainerReplica> replica = registerNodes(CLOSED, CLOSED, CLOSED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 1, 3, 2);
+    validate(rcnt, false, 0, false);
+  }
+
+  @Test
+  public void testFourHealthyReplicas() {
+    Set<ContainerReplica> replica =
+        registerNodes(CLOSED, CLOSED, CLOSED, CLOSED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+    validate(rcnt, true, -1, true);
+  }
+
+  @Test
+  public void testFourHealthyReplicasAndInFlightDelete() {
+    Set<ContainerReplica> replica =
+        registerNodes(CLOSED, CLOSED, CLOSED, CLOSED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 1, 3, 2);
+    validate(rcnt, true, 0, false);
+  }
+
+  @Test
+  public void testFourHealthyReplicasAndTwoInFlightDelete() {
+    Set<ContainerReplica> replica =
+        registerNodes(CLOSED, CLOSED, CLOSED, CLOSED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 2, 3, 2);
+    validate(rcnt, false, 1, false);
+  }
+
+  @Test
+  public void testOneHealthyReplicaRepFactorOne() {
+    Set<ContainerReplica> replica = registerNodes(CLOSED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 1, 2);
+    validate(rcnt, true, 0, false);
+  }
+
+  @Test
+  public void testOneHealthyReplicaRepFactorOneInFlightDelete() {
+    Set<ContainerReplica> replica = registerNodes(CLOSED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 1, 1, 2);
+    validate(rcnt, false, 1, false);
+  }
+
+  @Test
+  public void testTwoHealthyReplicaTwoInflightAdd() {
+    Set<ContainerReplica> replica = registerNodes(CLOSED, CLOSED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 2, 0, 3, 2);
+    validate(rcnt, false, 0, false);
+  }
+
+  /**
+   * From here consider decommission replicas.
+   */
+
+  @Test
+  public void testThreeHealthyAndTwoDecommission() {
+    Set<ContainerReplica> replica = registerNodes(CLOSED, CLOSED, CLOSED,
+        DECOMMISSIONED, DECOMMISSIONED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+    validate(rcnt, true, 0, false);
+  }
+
+  @Test
+  public void testOneDecommissionedReplica() {
+    Set<ContainerReplica> replica =
+        registerNodes(CLOSED, CLOSED, DECOMMISSIONED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+    validate(rcnt, false, 1, false);
+  }
+
+  @Test
+  public void testTwoHealthyOneDecommissionedneInFlightAdd() {
+    Set<ContainerReplica> replica =
+        registerNodes(CLOSED, CLOSED, DECOMMISSIONED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 3, 2);
+    validate(rcnt, false, 0, false);
+  }
+
+  @Test
+  public void testAllDecommissioned() {
+    Set<ContainerReplica> replica =
+        registerNodes(DECOMMISSIONED, DECOMMISSIONED, DECOMMISSIONED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+    validate(rcnt, false, 3, false);
+  }
+
+  @Test
+  public void testAllDecommissionedRepFactorOne() {
+    Set<ContainerReplica> replica = registerNodes(DECOMMISSIONED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 1, 2);
+    validate(rcnt, false, 1, false);
+  }
+
+  @Test
+  public void testAllDecommissionedRepFactorOneInFlightAdd() {
+    Set<ContainerReplica> replica = registerNodes(DECOMMISSIONED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 1, 2);
+    validate(rcnt, false, 0, false);
+  }
+
+  @Test
+  public void testOneHealthyOneDecommissioningRepFactorOne() {
+    Set<ContainerReplica> replica = registerNodes(DECOMMISSIONED, CLOSED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 1, 2);
+    validate(rcnt, true, 0, false);
+  }
+
+  /**
+   * Maintenance tests from here.
+   */
+
+  @Test
+  public void testOneHealthyTwoMaintenanceMinRepOfTwo() {
+    Set<ContainerReplica> replica =
+        registerNodes(CLOSED, MAINTENANCE, MAINTENANCE);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+    validate(rcnt, false, 1, false);
+  }
+
+  @Test
+  public void testOneHealthyThreeMaintenanceMinRepOfTwo() {
+    Set<ContainerReplica> replica = registerNodes(CLOSED,
+        MAINTENANCE, MAINTENANCE, MAINTENANCE);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+    validate(rcnt, false, 1, false);
+  }
+
+  @Test
+  public void testOneHealthyTwoMaintenanceMinRepOfOne() {
+    Set<ContainerReplica> replica =
+        registerNodes(CLOSED, MAINTENANCE, MAINTENANCE);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 1);
+    validate(rcnt, true, 0, false);
+  }
+
+  @Test
+  public void testOneHealthyThreeMaintenanceMinRepOfTwoInFlightAdd() {
+    Set<ContainerReplica> replica = registerNodes(CLOSED,
+        MAINTENANCE, MAINTENANCE, MAINTENANCE);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 3, 2);
+    validate(rcnt, false, 0, false);
+  }
+
+  @Test
+  public void testAllMaintenance() {
+    Set<ContainerReplica> replica =
+        registerNodes(MAINTENANCE, MAINTENANCE, MAINTENANCE);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+    validate(rcnt, false, 2, false);
+  }
+
+  @Test
+  /**
+   * As we have exactly 3 healthy, but then an excess of maintenance copies
+   * we ignore the over-replication caused by the maintenance copies until they
+   * come back online, and then deal with them.
+   */
+  public void testThreeHealthyTwoInMaintenance() {
+    Set<ContainerReplica> replica = registerNodes(CLOSED, CLOSED, CLOSED,
+        MAINTENANCE, MAINTENANCE);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+    validate(rcnt, true, 0, false);
+  }
+
+  @Test
+  /**
+   * This is somewhat similar to testThreeHealthyTwoInMaintenance() except now
+   * one of the maintenance copies has become healthy and we will need to remove
+   * the over-replicated healthy container.
+   */
+  public void testFourHealthyOneInMaintenance() {
+    Set<ContainerReplica> replica =
+        registerNodes(CLOSED, CLOSED, CLOSED, CLOSED, MAINTENANCE);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2);
+    validate(rcnt, true, -1, true);
+  }
+
+  @Test
+  public void testOneMaintenanceMinRepOfTwoRepFactorOne() {
+    Set<ContainerReplica> replica = registerNodes(MAINTENANCE);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 1, 2);
+    validate(rcnt, false, 1, false);
+  }
+
+  @Test
+  public void testOneMaintenanceMinRepOfTwoRepFactorOneInFlightAdd() {
+    Set<ContainerReplica> replica = registerNodes(MAINTENANCE);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 1, 2);
+    validate(rcnt, false, 0, false);
+  }
+
+  @Test
+  public void testOneHealthyOneMaintenanceRepFactorOne() {
+    Set<ContainerReplica> replica = registerNodes(MAINTENANCE, CLOSED);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 1, 2);
+    validate(rcnt, true, 0, false);
+  }
+
+  @Test
+  public void testTwoDecomTwoMaintenanceOneInflightAdd() {
+    Set<ContainerReplica> replica =
+        registerNodes(DECOMMISSIONED, DECOMMISSIONED, MAINTENANCE, MAINTENANCE);
+    ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 3, 2);
+    validate(rcnt, false, 1, false);
+  }
+
+  private void validate(ContainerReplicaCount rcnt,
+      boolean sufficientlyReplicated, int replicaDelta, boolean overRelicated) {
+    assertEquals(sufficientlyReplicated, rcnt.isSufficientlyReplicated());
+    assertEquals(replicaDelta, rcnt.additionalReplicaNeeded());
+  }
+
+  private Set<ContainerReplica> registerNodes(
+      ContainerReplicaProto.State... states) {
+    Set<ContainerReplica> replica = new HashSet<>();
+    for (ContainerReplicaProto.State s : states) {
+      DatanodeDetails dn = TestUtils.randomDatanodeDetails();
+      replica.add(new ContainerReplica.ContainerReplicaBuilder()
+          .setContainerID(new ContainerID(1))
+          .setContainerState(s)
+          .setDatanodeDetails(dn)
+          .setOriginNodeId(dn.getUuid())
+          .setSequenceId(1)
+          .build());
+    }
+    return replica;
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
index 0aa0221..acd8993 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -127,7 +128,9 @@ public class TestDatanodeAdminMonitor {
 
   }
 
+
   @Test
+  @Ignore // HDDS-2631
   public void testMonitoredNodeHasPipelinesClosed()
       throws NodeNotFoundException, TimeoutException, InterruptedException {
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java
index 5572e9a..a2587a7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java
@@ -23,11 +23,13 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
 import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.container.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
@@ -54,6 +56,7 @@ public class TestSafeModeHandler {
   private EventQueue eventQueue;
   private SCMSafeModeManager.SafeModeStatus safeModeStatus;
   private PipelineManager scmPipelineManager;
+  private NodeManager nodeManager;
 
   public void setup(boolean enabled) {
     configuration = new OzoneConfiguration();
@@ -68,10 +71,11 @@ public class TestSafeModeHandler {
         Mockito.mock(ContainerManager.class);
     Mockito.when(containerManager.getContainerIDs())
         .thenReturn(new HashSet<>());
+    nodeManager = new MockNodeManager(false, 0);
     replicationManager = new ReplicationManager(
         new ReplicationManagerConfiguration(),
         containerManager, Mockito.mock(ContainerPlacementPolicy.class),
-        eventQueue, new LockManager(configuration));
+        eventQueue, new LockManager(configuration), nodeManager);
     scmPipelineManager = Mockito.mock(SCMPipelineManager.class);
     blockManager = Mockito.mock(BlockManagerImpl.class);
     safeModeHandler =
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
index 5cf0864..159683c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.scm.node;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.client.ContainerOperationClient;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.ozone.MiniOzoneCluster;


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org