You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2022/10/19 08:32:33 UTC

[ozone] branch master updated: HDDS-7058. EC: ReplicationManager - Implement ratis container replication check handler (#3802)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 237a9a1594 HDDS-7058. EC: ReplicationManager - Implement ratis container replication check handler (#3802)
237a9a1594 is described below

commit 237a9a15949299c700c7352019662429b0acf242
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Wed Oct 19 09:32:27 2022 +0100

    HDDS-7058. EC: ReplicationManager - Implement ratis container replication check handler (#3802)
---
 .../replication/ContainerHealthResult.java         |  69 +++
 .../replication/LegacyReplicationManager.java      |   1 -
 .../RatisContainerReplicaCount.java                |  87 +++-
 .../container/replication/ReplicationManager.java  |  19 +-
 .../health/RatisReplicationCheckHandler.java       | 201 ++++++++
 .../TestRatisContainerReplicaCount.java            |  58 ++-
 .../health/TestRatisReplicationCheckHandler.java   | 560 +++++++++++++++++++++
 .../hdds/scm/node/TestDatanodeAdminMonitor.java    |   2 +-
 8 files changed, 987 insertions(+), 10 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
index de290294e9..b438d6de4c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
@@ -106,6 +106,9 @@ public class ContainerHealthResult {
     private final int remainingRedundancy;
     private final boolean dueToDecommission;
     private final boolean sufficientlyReplicatedAfterPending;
+    private boolean dueToMisReplication = false;
+    private boolean isMisReplicated = false;
+    private boolean isMisReplicatedAfterPending = false;
     private final boolean unrecoverable;
     private int requeueCount = 0;
 
@@ -119,6 +122,44 @@ public class ContainerHealthResult {
       this.unrecoverable = unrecoverable;
     }
 
+    /**
+     * Pass true to indicate the container is mis-replicated - ie it does not
+     * meet the placement policy.
+     * @param isMisRep True if the container is mis-replicated, false if not.
+     * @return this object to allow calls to be chained
+     */
+    public UnderReplicatedHealthResult
+        setMisReplicated(boolean isMisRep) {
+      this.isMisReplicated = isMisRep;
+      return this;
+    }
+
+    /**
+     * Pass true to indicate the container is mis-replicated after considering
+     * pending replicas scheduled for create or delete.
+     * @param isMisRep True if the container is mis-replicated considering
+     *                 pending replicas, or false if not.
+     * @return this object to allow calls to be chained
+     */
+    public UnderReplicatedHealthResult
+        setMisReplicatedAfterPending(boolean isMisRep) {
+      this.isMisReplicatedAfterPending = isMisRep;
+      return this;
+    }
+
+    /**
+     * If the container is ONLY under replicated due to mis-replication, pass
+     * true, otherwise pass false.
+     * @param dueToMisRep Pass true if the container has enough replicas but
+     *                    does not meet the placement policy.
+     * @return
+     */
+    public UnderReplicatedHealthResult
+        setDueToMisReplication(boolean dueToMisRep) {
+      this.dueToMisReplication = dueToMisRep;
+      return this;
+    }
+
     /**
      * How many more replicas can be lost before the container is
      * unreadable. For containers which are under-replicated due to decommission
@@ -187,6 +228,34 @@ public class ContainerHealthResult {
       return sufficientlyReplicatedAfterPending;
     }
 
+    /**
+     * Returns true if the container is mis-replicated, ignoring any pending
+     * replicas scheduled to be created.
+     * @return True if mis-replicated, ignoring pending
+     */
+    public boolean isMisReplicated() {
+      return isMisReplicated;
+    }
+
+    /**
+     * Returns true if the container is mis-replicated after taking account of
+     * pending replicas, which are schedule to be created.
+     * @return true is mis-replicated after pending.
+     */
+    public boolean isMisReplicatedAfterPending() {
+      return isMisReplicatedAfterPending;
+    }
+
+    /**
+     * Returns true if the under replication is only due to mis-replication.
+     * In other words, the container has enough replicas, but they do not meet
+     * the placement policy.
+     * @return true if the under-replication is only due to mis-replication
+     */
+    public boolean isDueToMisReplication() {
+      return dueToMisReplication;
+    }
+
     /**
      * Indicates whether a container has enough replicas to be read. For Ratis
      * at least one replia must be available. For EC, at least dataNum replicas
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index 27c7a12e66..b58140ac31 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.RatisContainerReplicaCount;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/RatisContainerReplicaCount.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
similarity index 76%
rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/RatisContainerReplicaCount.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
index f25423d4ec..577fc6004d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/RatisContainerReplicaCount.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
@@ -15,10 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdds.scm.container;
+package org.apache.hadoop.hdds.scm.container.replication;
 
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaCount;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 
 import java.util.Set;
 
@@ -243,11 +244,27 @@ public class RatisContainerReplicaCount implements ContainerReplicaCount {
    */
   @Override
   public boolean isSufficientlyReplicated() {
-    return missingReplicas() + inFlightDel <= 0;
+    return isSufficientlyReplicated(false);
   }
 
   /**
-   * Return true is the container is over replicated. Decommission and
+   * Return true if the container is sufficiently replicated. Decommissioning
+   * and Decommissioned containers are ignored in this check, assuming they will
+   * eventually be removed from the cluster.
+   * This check ignores inflight additions, if includePendingAdd is false,
+   * otherwise it will assume they complete ok.
+   *
+   * @return True if the container is sufficiently replicated and False
+   *         otherwise.
+   */
+  public boolean isSufficientlyReplicated(boolean includePendingAdd) {
+    // Positive for under-rep, negative for over-rep
+    int delta = redundancyDelta(true, includePendingAdd);
+    return delta <= 0;
+  }
+
+  /**
+   * Return true if the container is over replicated. Decommission and
    * maintenance containers are ignored for this check.
    * The check ignores inflight additions, as they may fail, but it does
    * consider inflight deletes, as they would reduce the over replication when
@@ -257,7 +274,67 @@ public class RatisContainerReplicaCount implements ContainerReplicaCount {
    */
   @Override
   public boolean isOverReplicated() {
-    return missingReplicas() + inFlightDel < 0;
+    return isOverReplicated(true);
+  }
+
+  /**
+   * Return true if the container is over replicated. Decommission and
+   * maintenance containers are ignored for this check.
+   * The check ignores inflight additions, as they may fail, but it does
+   * consider inflight deletes if includePendingDelete is true.
+   *
+   * @return True if the container is over replicated, false otherwise.
+   */
+  public boolean isOverReplicated(boolean includePendingDelete) {
+    return getExcessRedundancy(includePendingDelete) > 0;
+  }
+
+  /**
+   * @return Return Excess Redundancy replica nums.
+   */
+  public int getExcessRedundancy(boolean includePendingDelete) {
+    int excessRedundancy = redundancyDelta(includePendingDelete, false);
+    if (excessRedundancy >= 0) {
+      // either perfectly replicated or under replicated
+      return 0;
+    }
+    return -excessRedundancy;
+  }
+
+  /**
+   * Return the delta from the expected number of replicas, optionally
+   * considering inflight add and deletes.
+   * @param includePendingDelete
+   * @param includePendingAdd
+   * @return zero if perfectly replicated, a negative value for over replication
+   *         and a positive value for under replication. The magnitude of the
+   *         return value indicates how many replias the container is over or
+   *         under replicated by.
+   */
+  private int redundancyDelta(boolean includePendingDelete,
+      boolean includePendingAdd) {
+    int excessRedundancy = missingReplicas();
+    if (includePendingDelete) {
+      excessRedundancy += inFlightDel;
+    }
+    if (includePendingAdd) {
+      excessRedundancy -= inFlightAdd;
+    }
+    return excessRedundancy;
+  }
+
+  /**
+   * How many more replicas can be lost before the container is
+   * unreadable, assuming any infligh deletes will complete. For containers
+   * which are under-replicated due to decommission
+   * or maintenance only, the remaining redundancy will include those
+   * decommissioning or maintenance replicas, as they are technically still
+   * available until the datanode processes are stopped.
+   * @return Count of remaining redundant replicas.
+   */
+  public int getRemainingRedundancy() {
+    return Math.max(0,
+        healthyCount + decommissionCount + maintenanceCount - inFlightDel - 1);
   }
 
   /**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 05300d6a08..77142848b6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.scm.container.replication.health.ECReplicationChec
 import org.apache.hadoop.hdds.scm.container.replication.health.HealthCheck;
 import org.apache.hadoop.hdds.scm.container.replication.health.OpenContainerHandler;
 import org.apache.hadoop.hdds.scm.container.replication.health.QuasiClosedContainerHandler;
+import org.apache.hadoop.hdds.scm.container.replication.health.RatisReplicationCheckHandler;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMService;
@@ -74,6 +75,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
 import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
 
 /**
  * Replication Manager (RM) is the one which is responsible for making sure
@@ -144,12 +146,14 @@ public class ReplicationManager implements SCMService {
   private final Clock clock;
   private final ContainerReplicaPendingOps containerReplicaPendingOps;
   private final ECReplicationCheckHandler ecReplicationCheckHandler;
+  private final RatisReplicationCheckHandler ratisReplicationCheckHandler;
   private final EventPublisher eventPublisher;
   private final ReentrantLock lock = new ReentrantLock();
   private ReplicationQueue replicationQueue;
   private final ECUnderReplicationHandler ecUnderReplicationHandler;
   private final ECOverReplicationHandler ecOverReplicationHandler;
   private final int maintenanceRedundancy;
+  private final int ratisMaintenanceMinReplicas;
   private Thread underReplicatedProcessorThread;
   private Thread overReplicatedProcessorThread;
   private final UnderReplicatedProcessor underReplicatedProcessor;
@@ -190,9 +194,13 @@ public class ReplicationManager implements SCMService {
     this.containerReplicaPendingOps = replicaPendingOps;
     this.legacyReplicationManager = legacyReplicationManager;
     this.ecReplicationCheckHandler = new ECReplicationCheckHandler();
+    this.ratisReplicationCheckHandler =
+        new RatisReplicationCheckHandler(containerPlacement);
     this.nodeManager = nodeManager;
     this.replicationQueue = new ReplicationQueue();
     this.maintenanceRedundancy = rmConf.maintenanceRemainingRedundancy;
+    this.ratisMaintenanceMinReplicas = rmConf.getMaintenanceReplicaMinimum();
+
     ecUnderReplicationHandler = new ECUnderReplicationHandler(
         ecReplicationCheckHandler, containerPlacement, conf, nodeManager);
     ecOverReplicationHandler =
@@ -212,7 +220,8 @@ public class ReplicationManager implements SCMService {
         .addNext(new ClosingContainerHandler(this))
         .addNext(new QuasiClosedContainerHandler(this))
         .addNext(new ClosedWithMismatchedReplicasHandler(this))
-        .addNext(ecReplicationCheckHandler);
+        .addNext(ecReplicationCheckHandler)
+        .addNext(ratisReplicationCheckHandler);
     start();
   }
 
@@ -466,10 +475,16 @@ public class ReplicationManager implements SCMService {
     List<ContainerReplicaOp> pendingOps =
         containerReplicaPendingOps.getPendingOps(containerID);
 
+    // There is a different config for EC and Ratis maintenance
+    // minimum replicas, so we must pass through the correct one.
+    int maintRedundancy = maintenanceRedundancy;
+    if (containerInfo.getReplicationType() == RATIS) {
+      maintRedundancy = ratisMaintenanceMinReplicas;
+    }
     ContainerCheckRequest checkRequest = new ContainerCheckRequest.Builder()
         .setContainerInfo(containerInfo)
         .setContainerReplicas(replicas)
-        .setMaintenanceRedundancy(maintenanceRedundancy)
+        .setMaintenanceRedundancy(maintRedundancy)
         .setReport(report)
         .setPendingOps(pendingOps)
         .setReplicationQueue(repQueue)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/RatisReplicationCheckHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/RatisReplicationCheckHandler.java
new file mode 100644
index 0000000000..cb53c59e08
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/RatisReplicationCheckHandler.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.RatisContainerReplicaCount;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
+
+/**
+ * Class to determine the health state of a Ratis Container. Given the container
+ * and current replica details, along with replicas pending add and delete,
+ * this class will return a ContainerHealthResult indicating if the container
+ * is healthy, or under / over replicated etc.
+ */
+public class RatisReplicationCheckHandler extends AbstractCheck {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(RatisReplicationCheckHandler.class);
+
+  /**
+   * PlacementPolicy which is used to identify where a container
+   * should be replicated.
+   */
+  private final PlacementPolicy ratisContainerPlacement;
+
+  public RatisReplicationCheckHandler(PlacementPolicy containerPlacement) {
+    this.ratisContainerPlacement = containerPlacement;
+  }
+
+  @Override
+  public boolean handle(ContainerCheckRequest request) {
+    if (request.getContainerInfo().getReplicationType() != RATIS) {
+      // This handler is only for Ratis containers.
+      return false;
+    }
+    ReplicationManagerReport report = request.getReport();
+    ContainerInfo container = request.getContainerInfo();
+    ContainerHealthResult health = checkHealth(request);
+    if (health.getHealthState() == ContainerHealthResult.HealthState.HEALTHY) {
+      // If the container is healthy, there is nothing else to do in this
+      // handler so return as unhandled so any further handlers will be tried.
+      return false;
+    }
+    if (health.getHealthState()
+        == ContainerHealthResult.HealthState.UNDER_REPLICATED) {
+      report.incrementAndSample(
+          ReplicationManagerReport.HealthState.UNDER_REPLICATED,
+          container.containerID());
+      ContainerHealthResult.UnderReplicatedHealthResult underHealth
+          = ((ContainerHealthResult.UnderReplicatedHealthResult) health);
+      if (underHealth.isUnrecoverable()) {
+        report.incrementAndSample(ReplicationManagerReport.HealthState.MISSING,
+            container.containerID());
+      }
+      if (underHealth.isMisReplicated()) {
+        report.incrementAndSample(
+            ReplicationManagerReport.HealthState.MIS_REPLICATED,
+            container.containerID());
+      }
+      // TODO - if it is unrecoverable, should we return false to other
+      //        handlers can be tried?
+      if (!underHealth.isUnrecoverable() &&
+          (underHealth.isMisReplicatedAfterPending() ||
+              !underHealth.isSufficientlyReplicatedAfterPending())) {
+        request.getReplicationQueue().enqueue(underHealth);
+      }
+      return true;
+    }
+
+    if (health.getHealthState()
+        == ContainerHealthResult.HealthState.OVER_REPLICATED) {
+      report.incrementAndSample(
+          ReplicationManagerReport.HealthState.OVER_REPLICATED,
+          container.containerID());
+      ContainerHealthResult.OverReplicatedHealthResult overHealth
+          = ((ContainerHealthResult.OverReplicatedHealthResult) health);
+      if (!overHealth.isSufficientlyReplicatedAfterPending()) {
+        request.getReplicationQueue().enqueue(overHealth);
+      }
+      return true;
+    }
+    return false;
+  }
+
+  public ContainerHealthResult checkHealth(ContainerCheckRequest request) {
+    ContainerInfo container = request.getContainerInfo();
+    Set<ContainerReplica> replicas = request.getContainerReplicas();
+    List<ContainerReplicaOp> replicaPendingOps = request.getPendingOps();
+    // Note that this setting is minReplicasForMaintenance. For EC the variable
+    // is defined as remainingRedundancy which is subtly different.
+    int minReplicasForMaintenance = request.getMaintenanceRedundancy();
+    int pendingAdd = 0;
+    int pendingDelete = 0;
+    for (ContainerReplicaOp op : replicaPendingOps) {
+      if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+        pendingAdd++;
+      } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+        pendingDelete++;
+      }
+    }
+    int requiredNodes = container.getReplicationConfig().getRequiredNodes();
+
+    // RatisContainerReplicaCount uses the minReplicasForMaintenance rather
+    // than remainingRedundancy which ECContainerReplicaCount uses.
+    RatisContainerReplicaCount replicaCount =
+        new RatisContainerReplicaCount(container, replicas, pendingAdd,
+            pendingDelete, requiredNodes, minReplicasForMaintenance);
+
+    ContainerPlacementStatus placementStatus =
+        getPlacementStatus(replicas, requiredNodes, Collections.emptyList());
+
+    ContainerPlacementStatus placementStatusWithPending = placementStatus;
+    if (replicaPendingOps.size() > 0) {
+      placementStatusWithPending =
+          getPlacementStatus(replicas, requiredNodes, replicaPendingOps);
+    }
+    boolean sufficientlyReplicated
+        = replicaCount.isSufficientlyReplicated(false);
+    boolean isPolicySatisfied = placementStatus.isPolicySatisfied();
+    if (!sufficientlyReplicated || !isPolicySatisfied) {
+      ContainerHealthResult.UnderReplicatedHealthResult result =
+          new ContainerHealthResult.UnderReplicatedHealthResult(
+          container, replicaCount.getRemainingRedundancy(),
+          isPolicySatisfied
+              && replicas.size() - pendingDelete >= requiredNodes,
+          replicaCount.isSufficientlyReplicated(true),
+          replicaCount.isUnrecoverable());
+      result.setMisReplicated(!isPolicySatisfied)
+          .setMisReplicatedAfterPending(
+              !placementStatusWithPending.isPolicySatisfied())
+          .setDueToMisReplication(
+              !isPolicySatisfied && replicaCount.isSufficientlyReplicated());
+      return result;
+    }
+
+    boolean isOverReplicated = replicaCount.isOverReplicated(false);
+    if (isOverReplicated) {
+      boolean repOkWithPending = !replicaCount.isOverReplicated(true);
+      return new ContainerHealthResult.OverReplicatedHealthResult(
+          container, replicaCount.getExcessRedundancy(false), repOkWithPending);
+    }
+    // No issues detected, just return healthy.
+    return new ContainerHealthResult.HealthyResult(container);
+  }
+
+  /**
+   * Given a set of ContainerReplica, transform it to a list of DatanodeDetails
+   * and then check if the list meets the container placement policy.
+   * @param replicas List of containerReplica
+   * @param replicationFactor Expected Replication Factor of the containe
+   * @return ContainerPlacementStatus indicating if the policy is met or not
+   */
+  private ContainerPlacementStatus getPlacementStatus(
+      Set<ContainerReplica> replicas, int replicationFactor,
+      List<ContainerReplicaOp> pendingOps) {
+
+    Set<DatanodeDetails> replicaDns = replicas.stream()
+        .map(ContainerReplica::getDatanodeDetails)
+        .collect(Collectors.toSet());
+    for (ContainerReplicaOp op : pendingOps) {
+      if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+        replicaDns.add(op.getTarget());
+      }
+      if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+        replicaDns.remove(op.getTarget());
+      }
+    }
+    return ratisContainerPlacement.validateContainerPlacement(
+        new ArrayList<>(replicaDns), replicationFactor);
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisContainerReplicaCount.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisContainerReplicaCount.java
index 5ceaec39bf..7d4947dd64 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisContainerReplicaCount.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisContainerReplicaCount.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.container.RatisContainerReplicaCount;
+import org.junit.Assert;
 import org.junit.jupiter.api.Test;
 
 import java.util.HashSet;
@@ -433,6 +433,62 @@ class TestRatisContainerReplicaCount {
     assertFalse(rcnt.isSufficientlyReplicated());
   }
 
+  @Test
+  void testOverReplicatedWithAndWithoutPending() {
+    Set<ContainerReplica> replica = registerNodes(IN_SERVICE, IN_SERVICE,
+        IN_SERVICE, IN_SERVICE, IN_SERVICE);
+    ContainerInfo container = createContainer(HddsProtos.LifeCycleState.CLOSED);
+    RatisContainerReplicaCount rcnt =
+        new RatisContainerReplicaCount(container, replica, 0, 2, 3, 2);
+    assertTrue(rcnt.isOverReplicated(false));
+    assertFalse(rcnt.isOverReplicated(true));
+    assertEquals(2, rcnt.getExcessRedundancy(false));
+    assertEquals(0, rcnt.getExcessRedundancy(true));
+  }
+
+  @Test
+  void testRemainingRedundancy() {
+    Set<ContainerReplica> replica = registerNodes(IN_SERVICE, IN_SERVICE,
+        IN_SERVICE, IN_SERVICE);
+    ContainerInfo container = createContainer(HddsProtos.LifeCycleState.CLOSED);
+    RatisContainerReplicaCount rcnt =
+        new RatisContainerReplicaCount(container, replica, 0, 1, 3, 2);
+    Assert.assertEquals(2, rcnt.getRemainingRedundancy());
+    replica = registerNodes(IN_SERVICE);
+    rcnt =
+        new RatisContainerReplicaCount(container, replica, 0, 0, 3, 2);
+    Assert.assertEquals(0, rcnt.getRemainingRedundancy());
+    rcnt =
+        new RatisContainerReplicaCount(container, replica, 0, 1, 3, 2);
+    Assert.assertEquals(0, rcnt.getRemainingRedundancy());
+  }
+
+  @Test
+  void testSufficientlyReplicatedWithAndWithoutPending() {
+    Set<ContainerReplica> replica = registerNodes(IN_SERVICE, IN_SERVICE);
+    ContainerInfo container = createContainer(HddsProtos.LifeCycleState.CLOSED);
+    RatisContainerReplicaCount rcnt =
+        new RatisContainerReplicaCount(container, replica, 0, 0, 3, 2);
+    assertFalse(rcnt.isSufficientlyReplicated(true));
+    assertFalse(rcnt.isSufficientlyReplicated(false));
+
+    rcnt =
+        new RatisContainerReplicaCount(container, replica, 1, 0, 3, 2);
+    assertTrue(rcnt.isSufficientlyReplicated(true));
+    assertFalse(rcnt.isSufficientlyReplicated(false));
+
+    replica = registerNodes(IN_SERVICE, IN_SERVICE, IN_SERVICE);
+    rcnt =
+        new RatisContainerReplicaCount(container, replica, 0, 1, 3, 2);
+    assertFalse(rcnt.isSufficientlyReplicated(false));
+    assertFalse(rcnt.isSufficientlyReplicated(true));
+    rcnt =
+        new RatisContainerReplicaCount(container, replica, 1, 1, 3, 2);
+    assertFalse(rcnt.isSufficientlyReplicated(false));
+    assertTrue(rcnt.isSufficientlyReplicated(true));
+
+  }
+
   private void validate(RatisContainerReplicaCount rcnt,
       boolean sufficientlyReplicated, int replicaDelta,
       boolean overReplicated) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestRatisReplicationCheckHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestRatisReplicationCheckHandler.java
new file mode 100644
index 0000000000..d8dd69284b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestRatisReplicationCheckHandler.java
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.HealthState;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.OverReplicatedHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.ADD;
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE;
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerInfo;
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
+
+/**
+ * Tests for the RatisReplicationCheckHandler class.
+ */
+public class TestRatisReplicationCheckHandler {
+
+  private RatisReplicationCheckHandler healthCheck;
+  private ReplicationConfig repConfig;
+  private PlacementPolicy containerPlacementPolicy;
+  private ReplicationQueue repQueue;
+  private ContainerCheckRequest.Builder requestBuilder;
+  private ReplicationManagerReport report;
+  private int maintenanceRedundancy = 2;
+
+  @Before
+  public void setup() throws IOException {
+    containerPlacementPolicy = Mockito.mock(PlacementPolicy.class);
+    Mockito.when(containerPlacementPolicy.validateContainerPlacement(
+        Mockito.any(),
+        Mockito.anyInt()
+    )).thenAnswer(invocation ->
+        new ContainerPlacementStatusDefault(2, 2, 3));
+    healthCheck = new RatisReplicationCheckHandler(containerPlacementPolicy);
+    repConfig = RatisReplicationConfig.getInstance(THREE);
+    repQueue = new ReplicationQueue();
+    report = new ReplicationManagerReport();
+    requestBuilder = new ContainerCheckRequest.Builder()
+        .setReplicationQueue(repQueue)
+        .setMaintenanceRedundancy(maintenanceRedundancy)
+        .setPendingOps(Collections.emptyList())
+        .setReport(report);
+  }
+
+  @Test
+  public void testReturnFalseForNonRatis() {
+    ContainerInfo container =
+        createContainerInfo(new ECReplicationConfig(3, 2));
+    Set<ContainerReplica> replicas =
+        createReplicas(container.containerID(), 1, 2, 3, 4);
+
+    requestBuilder.setContainerReplicas(replicas)
+        .setContainerInfo(container);
+    Assert.assertFalse(healthCheck.handle(requestBuilder.build()));
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+  }
+
+  @Test
+  public void testHealthyContainerIsHealthy() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas
+        = createReplicas(container.containerID(), 0, 0, 0);
+    requestBuilder.setContainerReplicas(replicas)
+        .setContainerInfo(container);
+    ContainerHealthResult result =
+        healthCheck.checkHealth(requestBuilder.build());
+    Assert.assertEquals(HealthState.HEALTHY, result.getHealthState());
+
+    Assert.assertFalse(healthCheck.handle(requestBuilder.build()));
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+  }
+
+  @Test
+  public void testUnderReplicatedContainerIsUnderReplicated() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas
+        = createReplicas(container.containerID(), 0, 0);
+    requestBuilder.setContainerReplicas(replicas)
+        .setContainerInfo(container);
+    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+        healthCheck.checkHealth(requestBuilder.build());
+    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(1, result.getRemainingRedundancy());
+    Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+    Assert.assertFalse(result.underReplicatedDueToDecommission());
+
+    Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+    Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+  }
+
+  @Test
+  public void testUnderReplicatedContainerDueToPendingDelete() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas
+        = createReplicas(container.containerID(), 0, 0, 0);
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        DELETE, MockDatanodeDetails.randomDatanodeDetails(), 0));
+    requestBuilder.setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .setPendingOps(pending);
+    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+        healthCheck.checkHealth(requestBuilder.build());
+    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(1, result.getRemainingRedundancy());
+    Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+    Assert.assertFalse(result.underReplicatedDueToDecommission());
+
+    Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+    Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+  }
+
+  @Test
+  public void testUnderReplicatedContainerFixedWithPending() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas
+        = createReplicas(container.containerID(), 0, 0);
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        ADD, MockDatanodeDetails.randomDatanodeDetails(), 0));
+    requestBuilder.setContainerReplicas(replicas)
+        .setPendingOps(pending)
+        .setContainerInfo(container);
+    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+        healthCheck.checkHealth(requestBuilder.build());
+    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(1, result.getRemainingRedundancy());
+    Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
+    Assert.assertFalse(result.underReplicatedDueToDecommission());
+
+    Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+    // Fixed with pending, so nothing added to the queue
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+    // Still under replicated until the pending complete
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+  }
+
+  @Test
+  public void testUnderReplicatedDueToDecommission() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+        Pair.of(IN_SERVICE, 0), Pair.of(DECOMMISSIONING, 0),
+        Pair.of(DECOMMISSIONED, 0));
+
+    requestBuilder.setContainerReplicas(replicas)
+        .setContainerInfo(container);
+    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+        healthCheck.checkHealth(requestBuilder.build());
+    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(2, result.getRemainingRedundancy());
+    Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+    Assert.assertTrue(result.underReplicatedDueToDecommission());
+
+    Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+    Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+  }
+
+  @Test
+  public void testUnderReplicatedDueToDecommissionFixedWithPending() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+        Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0),
+        Pair.of(DECOMMISSIONED, 0));
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        ADD, MockDatanodeDetails.randomDatanodeDetails(), 0));
+
+    requestBuilder.setContainerReplicas(replicas)
+        .setPendingOps(pending)
+        .setContainerInfo(container);
+    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+        healthCheck.checkHealth(requestBuilder.build());
+    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(2, result.getRemainingRedundancy());
+    Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
+    Assert.assertTrue(result.underReplicatedDueToDecommission());
+
+    Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+    // Nothing queued as inflight replicas will fix it.
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+    // Still under replicated in the report until pending complete
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+  }
+
+  @Test
+  public void testUnderReplicatedDueToDecommissionAndMissing() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+        Pair.of(IN_SERVICE, 0), Pair.of(DECOMMISSIONED, 0));
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        ADD, MockDatanodeDetails.randomDatanodeDetails(), 0));
+
+    requestBuilder.setContainerReplicas(replicas)
+        .setPendingOps(pending)
+        .setContainerInfo(container);
+    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+        healthCheck.checkHealth(requestBuilder.build());
+    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(1, result.getRemainingRedundancy());
+    Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+    Assert.assertFalse(result.underReplicatedDueToDecommission());
+
+    Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+    Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+  }
+
+  @Test
+  public void testUnderReplicatedAndUnrecoverable() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas = Collections.EMPTY_SET;
+
+    requestBuilder.setContainerReplicas(replicas)
+        .setContainerInfo(container);
+    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+        healthCheck.checkHealth(requestBuilder.build());
+    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(0, result.getRemainingRedundancy());
+    Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+    Assert.assertFalse(result.underReplicatedDueToDecommission());
+    Assert.assertTrue(result.isUnrecoverable());
+
+    Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+    // Unrecoverable, so not added to the queue.
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.MISSING));
+  }
+
+  @Test
+  public void testOverReplicatedContainer() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas =  createReplicas(container.containerID(),
+        Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0),
+        Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0),
+        Pair.of(IN_SERVICE, 0),
+        Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0));
+
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        DELETE, MockDatanodeDetails.randomDatanodeDetails(), 0));
+    pending.add(ContainerReplicaOp.create(
+        DELETE, MockDatanodeDetails.randomDatanodeDetails(), 0));
+
+    requestBuilder.setContainerReplicas(replicas)
+        .setPendingOps(pending)
+        .setContainerInfo(container);
+    OverReplicatedHealthResult result = (OverReplicatedHealthResult)
+        healthCheck.checkHealth(requestBuilder.build());
+    Assert.assertEquals(HealthState.OVER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(4, result.getExcessRedundancy());
+    Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+
+    Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(1, repQueue.overReplicatedQueueSize());
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.OVER_REPLICATED));
+  }
+
+  @Test
+  public void testOverReplicatedContainerFixedByPending() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas =  createReplicas(container.containerID(),
+        Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0),
+        Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0));
+
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        DELETE, MockDatanodeDetails.randomDatanodeDetails(), 0));
+
+    requestBuilder.setContainerReplicas(replicas)
+        .setPendingOps(pending)
+        .setContainerInfo(container);
+    OverReplicatedHealthResult result = (OverReplicatedHealthResult)
+        healthCheck.checkHealth(requestBuilder.build());
+    Assert.assertEquals(HealthState.OVER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(1, result.getExcessRedundancy());
+    Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
+
+    Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    // Fixed by pending so nothing queued.
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+    // Still over replicated, so the report should contain it
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.OVER_REPLICATED));
+  }
+
+  @Test
+  public void testOverReplicatedContainerWithMaintenance() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas =  createReplicas(container.containerID(),
+        Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0),
+        Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0),
+        Pair.of(IN_MAINTENANCE, 0), Pair.of(DECOMMISSIONED, 0));
+
+    requestBuilder.setContainerReplicas(replicas)
+        .setContainerInfo(container);
+    OverReplicatedHealthResult result = (OverReplicatedHealthResult)
+        healthCheck.checkHealth(requestBuilder.build());
+    Assert.assertEquals(HealthState.OVER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(1, result.getExcessRedundancy());
+    Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+
+    Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(1, repQueue.overReplicatedQueueSize());
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.OVER_REPLICATED));
+  }
+
+  @Test
+  public void testOverReplicatedContainerDueToMaintenanceIsHealthy() {
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+        Pair.of(IN_SERVICE, 0), Pair.of(IN_SERVICE, 0),
+        Pair.of(IN_SERVICE, 0), Pair.of(IN_MAINTENANCE, 0),
+        Pair.of(IN_MAINTENANCE, 0));
+
+    requestBuilder.setContainerReplicas(replicas)
+        .setContainerInfo(container);
+    ContainerHealthResult result =
+        healthCheck.checkHealth(requestBuilder.build());
+    Assert.assertEquals(HealthState.HEALTHY, result.getHealthState());
+
+    Assert.assertFalse(healthCheck.handle(requestBuilder.build()));
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+    Assert.assertEquals(0, report.getStat(
+        ReplicationManagerReport.HealthState.OVER_REPLICATED));
+    Assert.assertEquals(0, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+  }
+
+  @Test
+  public void testUnderReplicatedWithMisReplication() {
+    Mockito.when(containerPlacementPolicy.validateContainerPlacement(
+        Mockito.any(),
+        Mockito.anyInt()
+    )).thenAnswer(invocation ->
+        new ContainerPlacementStatusDefault(1, 2, 3));
+
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas
+        = createReplicas(container.containerID(), 0, 0);
+    requestBuilder.setContainerReplicas(replicas)
+        .setContainerInfo(container);
+    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+        healthCheck.checkHealth(requestBuilder.build());
+    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(1, result.getRemainingRedundancy());
+    Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+    Assert.assertFalse(result.underReplicatedDueToDecommission());
+    Assert.assertTrue(result.isMisReplicated());
+    Assert.assertTrue(result.isMisReplicatedAfterPending());
+    Assert.assertFalse(result.isDueToMisReplication());
+
+    Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+    Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.MIS_REPLICATED));
+  }
+
+  @Test
+  public void testUnderReplicatedWithMisReplicationFixedByPending() {
+    Mockito.when(containerPlacementPolicy.validateContainerPlacement(
+        Mockito.any(),
+        Mockito.anyInt()
+    )).thenAnswer(invocation -> {
+      List<DatanodeDetails> dns = invocation.getArgument(0);
+      // If the number of DNs is 3 or less make it be mis-replicated
+      if (dns.size() <= 3) {
+        return new ContainerPlacementStatusDefault(1, 2, 3);
+      } else {
+        return new ContainerPlacementStatusDefault(2, 2, 3);
+      }
+    });
+
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas
+        = createReplicas(container.containerID(), 0, 0);
+
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        ADD, MockDatanodeDetails.randomDatanodeDetails(), 0));
+    pending.add(ContainerReplicaOp.create(
+        ADD, MockDatanodeDetails.randomDatanodeDetails(), 0));
+
+    requestBuilder.setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .setPendingOps(pending);
+    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+        healthCheck.checkHealth(requestBuilder.build());
+    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(1, result.getRemainingRedundancy());
+    Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
+    Assert.assertFalse(result.underReplicatedDueToDecommission());
+    Assert.assertTrue(result.isMisReplicated());
+    Assert.assertFalse(result.isMisReplicatedAfterPending());
+    Assert.assertFalse(result.isDueToMisReplication());
+
+    Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.MIS_REPLICATED));
+  }
+
+  @Test
+  public void testUnderReplicatedOnlyDueToMisReplication() {
+    Mockito.when(containerPlacementPolicy.validateContainerPlacement(
+        Mockito.any(),
+        Mockito.anyInt()
+    )).thenAnswer(invocation ->
+        new ContainerPlacementStatusDefault(1, 2, 3));
+
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas
+        = createReplicas(container.containerID(), 0, 0, 0);
+    requestBuilder.setContainerReplicas(replicas)
+        .setContainerInfo(container);
+    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+        healthCheck.checkHealth(requestBuilder.build());
+    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(2, result.getRemainingRedundancy());
+    Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
+    Assert.assertFalse(result.underReplicatedDueToDecommission());
+    Assert.assertTrue(result.isMisReplicated());
+    Assert.assertTrue(result.isMisReplicatedAfterPending());
+    Assert.assertTrue(result.isDueToMisReplication());
+
+    Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+    Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.MIS_REPLICATED));
+  }
+
+  @Test
+  public void testUnderReplicatedOnlyDueToMisReplicationFixByPending() {
+    Mockito.when(containerPlacementPolicy.validateContainerPlacement(
+        Mockito.any(),
+        Mockito.anyInt()
+    )).thenAnswer(invocation -> {
+      List<DatanodeDetails> dns = invocation.getArgument(0);
+      // If the number of DNs is 3 or less make it be mis-replicated
+      if (dns.size() <= 3) {
+        return new ContainerPlacementStatusDefault(1, 2, 3);
+      } else {
+        return new ContainerPlacementStatusDefault(2, 2, 3);
+      }
+    });
+
+    ContainerInfo container = createContainerInfo(repConfig);
+    Set<ContainerReplica> replicas
+        = createReplicas(container.containerID(), 0, 0, 0);
+
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        ADD, MockDatanodeDetails.randomDatanodeDetails(), 0));
+    pending.add(ContainerReplicaOp.create(
+        ADD, MockDatanodeDetails.randomDatanodeDetails(), 0));
+
+    requestBuilder.setContainerReplicas(replicas)
+        .setContainerInfo(container)
+        .setPendingOps(pending);
+    UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+        healthCheck.checkHealth(requestBuilder.build());
+    Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+    Assert.assertEquals(2, result.getRemainingRedundancy());
+    Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
+    Assert.assertFalse(result.underReplicatedDueToDecommission());
+    Assert.assertTrue(result.isMisReplicated());
+    Assert.assertFalse(result.isMisReplicatedAfterPending());
+    Assert.assertTrue(result.isDueToMisReplication());
+
+    Assert.assertTrue(healthCheck.handle(requestBuilder.build()));
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+    Assert.assertEquals(1, report.getStat(
+        ReplicationManagerReport.HealthState.MIS_REPLICATED));
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
index 8afe2a1810..095d137a5a 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.RatisContainerReplicaCount;
+import org.apache.hadoop.hdds.scm.container.replication.RatisContainerReplicaCount;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org