You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/06/27 16:33:47 UTC

[ozone] branch master updated: HDDS-6699. EC: ReplicationManager - collect under and over replicated containers (#3545)

This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cf5c97aee HDDS-6699. EC: ReplicationManager - collect under and over replicated containers (#3545)
5cf5c97aee is described below

commit 5cf5c97aee67ea4f88658cc02e7fb80e8fd5e60f
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Mon Jun 27 17:33:42 2022 +0100

    HDDS-6699. EC: ReplicationManager - collect under and over replicated containers (#3545)
---
 .../replication/ContainerHealthCheck.java          |    5 +-
 .../container/replication/ContainerReplicaOp.java  |    6 +
 .../replication/ECContainerHealthCheck.java        |   31 +-
 .../replication/LegacyReplicationManager.java      |    1 -
 .../container/replication/ReplicationManager.java  |   86 +-
 .../hdds/scm/server/StorageContainerManager.java   |   10 +-
 .../container/replication/ReplicationTestUtil.java |   99 +
 .../replication/TestECContainerHealthCheck.java    |  108 +-
 ...ager.java => TestLegacyReplicationManager.java} |   12 +-
 .../replication/TestReplicationManager.java        | 2229 ++------------------
 10 files changed, 395 insertions(+), 2192 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthCheck.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthCheck.java
index fed85b8069..5d14d04e45 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthCheck.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthCheck.java
@@ -16,8 +16,6 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 
@@ -32,7 +30,6 @@ public interface ContainerHealthCheck {
 
   ContainerHealthResult checkHealth(
       ContainerInfo container, Set<ContainerReplica> replicas,
-      List<Pair<Integer, DatanodeDetails>> indexesPendingAdd,
-      List<Pair<Integer, DatanodeDetails>> indexesPendingDelete,
+      List<ContainerReplicaOp> replicaPendingOps,
       int remainingRedundancyForMaintenance);
 }
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
index 869b0f0cf2..ae3e8f4530 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
@@ -36,6 +36,12 @@ public class ContainerReplicaOp {
   private int replicaIndex;
   private long scheduledEpochMillis;
 
+  public static ContainerReplicaOp create(PendingOpType opType,
+      DatanodeDetails target, int replicaIndex) {
+    return new ContainerReplicaOp(opType, target, replicaIndex,
+        System.currentTimeMillis());
+  }
+
   public ContainerReplicaOp(PendingOpType opType,
       DatanodeDetails target, int replicaIndex, long scheduledTime) {
     this.opType = opType;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerHealthCheck.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerHealthCheck.java
index b879867d85..68761b649d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerHealthCheck.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerHealthCheck.java
@@ -16,16 +16,14 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.ECContainerReplicaCount;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * Class to determine the health state of an EC Container. Given the container
@@ -49,11 +47,10 @@ public class ECContainerHealthCheck implements ContainerHealthCheck {
   @Override
   public ContainerHealthResult checkHealth(ContainerInfo container,
       Set<ContainerReplica> replicas,
-      List<Pair<Integer, DatanodeDetails>> replicasPendingAdd,
-      List<Pair<Integer, DatanodeDetails>> replicasPendingDelete,
+      List<ContainerReplicaOp> replicaPendingOps,
       int remainingRedundancyForMaintenance) {
     ECContainerReplicaCount replicaCount = getReplicaCountWithPending(container,
-          replicas, replicasPendingAdd, replicasPendingDelete,
+          replicas, replicaPendingOps,
           remainingRedundancyForMaintenance);
 
     ECReplicationConfig repConfig =
@@ -91,17 +88,19 @@ public class ECContainerHealthCheck implements ContainerHealthCheck {
 
   private ECContainerReplicaCount getReplicaCountWithPending(
       ContainerInfo container, Set<ContainerReplica> replicas,
-      List<Pair<Integer, DatanodeDetails>> replicasPendingAdd,
-      List<Pair<Integer, DatanodeDetails>> replicasPendingDelete,
+      List<ContainerReplicaOp> replicaPendingOps,
       int remainingRedundancyForMaintenance) {
-    List<Integer> indexesPendingAdd = replicasPendingAdd.stream()
-        .map(i -> i.getLeft()).collect(Collectors.toList());
-    List<Integer> indexesPendingDelete = replicasPendingDelete.stream()
-        .map(i -> i.getLeft()).collect(Collectors.toList());
-
-    return new ECContainerReplicaCount(container, replicas, indexesPendingAdd,
-        indexesPendingDelete, remainingRedundancyForMaintenance);
-
+    List<Integer> pendingAdd = new ArrayList<>();
+    List<Integer> pendingDelete = new ArrayList<>();
+    for (ContainerReplicaOp op : replicaPendingOps) {
+      if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
+        pendingAdd.add(op.getReplicaIndex());
+      } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
+        pendingDelete.add(op.getReplicaIndex());
+      }
+    }
+    return new ECContainerReplicaCount(container, replicas, pendingAdd,
+        pendingDelete, remainingRedundancyForMaintenance);
   }
 
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index 37d01736b9..79e11bab5a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -404,7 +404,6 @@ public class LegacyReplicationManager {
         final Set<ContainerReplica> replicas = containerManager
             .getContainerReplicas(id);
         final LifeCycleState state = container.getState();
-        report.increment(state);
 
         /*
          * We don't take any action if the container is in OPEN state and
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index d0811ce2e2..b4e88cc60d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -30,17 +30,15 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.ContainerReplicaCount;
 import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
-import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport.HealthState;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMService;
-import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.util.ExitUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,8 +46,10 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.time.Clock;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
@@ -57,6 +57,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
 import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
 
 /**
  * Replication Manager (RM) is the one which is responsible for making sure
@@ -125,6 +126,8 @@ public class ReplicationManager implements SCMService {
   private long lastTimeToBeReadyInMillis = 0;
   private final Clock clock;
   private final ContainerReplicaPendingOps containerReplicaPendingOps;
+  private final ContainerHealthCheck ecContainerHealthCheck;
+  private final EventPublisher eventPublisher;
 
   /**
    * Constructs ReplicationManager instance with the given configuration.
@@ -140,11 +143,9 @@ public class ReplicationManager implements SCMService {
              final PlacementPolicy containerPlacement,
              final EventPublisher eventPublisher,
              final SCMContext scmContext,
-             final SCMServiceManager serviceManager,
              final NodeManager nodeManager,
              final Clock clock,
-             final SCMHAManager scmhaManager,
-             final Table<ContainerID, MoveDataNodePair> moveTable,
+             final LegacyReplicationManager legacyReplicationManager,
              final ContainerReplicaPendingOps replicaPendingOps)
              throws IOException {
     this.containerManager = containerManager;
@@ -154,19 +155,14 @@ public class ReplicationManager implements SCMService {
     this.clock = clock;
     this.containerReport = new ReplicationManagerReport();
     this.metrics = null;
+    this.eventPublisher = eventPublisher;
     this.waitTimeInMillis = conf.getTimeDuration(
         HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
         HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
         TimeUnit.MILLISECONDS);
     this.containerReplicaPendingOps = replicaPendingOps;
-    this.legacyReplicationManager = new LegacyReplicationManager(
-        conf, containerManager, containerPlacement, eventPublisher,
-        scmContext, nodeManager, scmhaManager, clock, moveTable);
-
-    // register ReplicationManager to SCMServiceManager.
-    serviceManager.register(this);
-
-    // start ReplicationManager.
+    this.legacyReplicationManager = legacyReplicationManager;
+    this.ecContainerHealthCheck = new ECContainerHealthCheck();
     start();
   }
 
@@ -233,24 +229,76 @@ public class ReplicationManager implements SCMService {
     final List<ContainerInfo> containers =
         containerManager.getContainers();
     ReplicationManagerReport report = new ReplicationManagerReport();
+    List<ContainerHealthResult.UnderReplicatedHealthResult> underReplicated =
+        new ArrayList<>();
+    List<ContainerHealthResult.OverReplicatedHealthResult> overReplicated =
+        new ArrayList<>();
+
     for (ContainerInfo c : containers) {
       if (!shouldRun()) {
         break;
       }
-      switch (c.getReplicationType()) {
-      case EC:
-        break;
-      default:
+      report.increment(c.getState());
+      if (c.getReplicationType() != EC) {
         legacyReplicationManager.processContainer(c, report);
+        continue;
+      }
+      try {
+        processContainer(c, underReplicated, overReplicated, report);
+        // TODO - send any commands contained in the health result
+      } catch (ContainerNotFoundException e) {
+        LOG.error("Container {} not found", c.getContainerID(), e);
       }
     }
     report.setComplete();
+    // TODO - Sort the pending lists by priority and assign to the main queue,
+    //        which is yet to be defined.
     this.containerReport = report;
     LOG.info("Replication Monitor Thread took {} milliseconds for" +
             " processing {} containers.", clock.millis() - start,
         containers.size());
   }
 
+  protected ContainerHealthResult processContainer(ContainerInfo containerInfo,
+      List<ContainerHealthResult.UnderReplicatedHealthResult> underRep,
+      List<ContainerHealthResult.OverReplicatedHealthResult> overRep,
+      ReplicationManagerReport report) throws ContainerNotFoundException {
+    Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
+        containerInfo.containerID());
+    List<ContainerReplicaOp> pendingOps =
+        containerReplicaPendingOps.getPendingOps(containerInfo.containerID());
+    ContainerHealthResult health = ecContainerHealthCheck
+        .checkHealth(containerInfo, replicas, pendingOps, 0);
+      // TODO - should the report have a HEALTHY state, rather than just bad
+      //        states? It would need to be added to legacy RM too.
+    if (health.getHealthState()
+        == ContainerHealthResult.HealthState.UNDER_REPLICATED) {
+      report.incrementAndSample(
+          HealthState.UNDER_REPLICATED, containerInfo.containerID());
+      ContainerHealthResult.UnderReplicatedHealthResult underHealth
+          = ((ContainerHealthResult.UnderReplicatedHealthResult) health);
+      if (underHealth.isUnrecoverable()) {
+        // TODO - do we need a new health state for unrecoverable EC?
+        report.incrementAndSample(
+            HealthState.MISSING, containerInfo.containerID());
+      }
+      if (!underHealth.isSufficientlyReplicatedAfterPending() &&
+          !underHealth.isUnrecoverable()) {
+        underRep.add(underHealth);
+      }
+    } else if (health.getHealthState()
+        == ContainerHealthResult.HealthState.OVER_REPLICATED) {
+      report.incrementAndSample(HealthState.OVER_REPLICATED,
+          containerInfo.containerID());
+      ContainerHealthResult.OverReplicatedHealthResult overHealth
+          = ((ContainerHealthResult.OverReplicatedHealthResult) health);
+      if (!overHealth.isSufficientlyReplicatedAfterPending()) {
+        overRep.add(overHealth);
+      }
+    }
+    return health;
+  }
+
   public ReplicationManagerReport getContainerReport() {
     return containerReport;
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 7ea21a2627..3da1d74c81 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
 import org.apache.hadoop.hdds.scm.crl.CRLStatusReportHandler;
 import org.apache.hadoop.hdds.scm.ha.BackgroundSCMService;
 import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
@@ -708,19 +709,22 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     if (configurator.getReplicationManager() != null) {
       replicationManager = configurator.getReplicationManager();
     }  else {
+      LegacyReplicationManager legacyRM = new LegacyReplicationManager(
+          conf, containerManager, containerPlacementPolicy, eventQueue,
+          scmContext, scmNodeManager, scmHAManager, clock,
+          getScmMetadataStore().getMoveTable());
       replicationManager = new ReplicationManager(
           conf,
           containerManager,
           containerPlacementPolicy,
           eventQueue,
           scmContext,
-          serviceManager,
           scmNodeManager,
           clock,
-          scmHAManager,
-          getScmMetadataStore().getMoveTable(),
+          legacyRM,
           containerReplicaPendingOps);
     }
+    serviceManager.register(replicationManager);
     if (configurator.getScmSafeModeManager() != null) {
       scmSafeModeManager = configurator.getScmSafeModeManager();
     } else {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
new file mode 100644
index 0000000000..3fe8ddddf3
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+
+/**
+ * Helper class to provide common methods used to test ReplicationManager.
+ */
+public final class ReplicationTestUtil {
+
+  private ReplicationTestUtil() {
+  }
+
+  public static Set<ContainerReplica> createReplicas(ContainerID containerID,
+      Pair<HddsProtos.NodeOperationalState, Integer>... nodes) {
+    Set<ContainerReplica> replicas = new HashSet<>();
+    for (Pair<HddsProtos.NodeOperationalState, Integer> p : nodes) {
+      replicas.add(
+          createContainerReplica(containerID, p.getRight(), p.getLeft()));
+    }
+    return replicas;
+  }
+
+  public static Set<ContainerReplica> createReplicas(ContainerID containerID,
+      int... indexes) {
+    Set<ContainerReplica> replicas = new HashSet<>();
+    for (int i : indexes) {
+      replicas.add(createContainerReplica(
+          containerID, i, IN_SERVICE));
+    }
+    return replicas;
+  }
+
+  public static ContainerReplica createContainerReplica(ContainerID containerID,
+      int replicaIndex, HddsProtos.NodeOperationalState opState) {
+    ContainerReplica.ContainerReplicaBuilder builder
+        = ContainerReplica.newBuilder();
+    DatanodeDetails datanodeDetails
+        = MockDatanodeDetails.randomDatanodeDetails();
+    datanodeDetails.setPersistedOpState(opState);
+    builder.setContainerID(containerID);
+    builder.setReplicaIndex(replicaIndex);
+    builder.setKeyCount(123);
+    builder.setBytesUsed(1234);
+    builder.setContainerState(StorageContainerDatanodeProtocolProtos
+        .ContainerReplicaProto.State.CLOSED);
+    builder.setDatanodeDetails(datanodeDetails);
+    builder.setSequenceId(0);
+    builder.setOriginNodeId(datanodeDetails.getUuid());
+    return builder.build();
+  }
+
+  public static ContainerInfo createContainerInfo(ReplicationConfig repConfig) {
+    return createContainerInfo(repConfig, 1, HddsProtos.LifeCycleState.CLOSED);
+  }
+
+  public static ContainerInfo createContainerInfo(
+      ReplicationConfig replicationConfig, long containerID,
+      HddsProtos.LifeCycleState containerState) {
+    ContainerInfo.Builder builder = new ContainerInfo.Builder();
+    builder.setContainerID(containerID);
+    builder.setOwner("Ozone");
+    builder.setPipelineID(PipelineID.randomId());
+    builder.setReplicationConfig(replicationConfig);
+    builder.setState(containerState);
+    return builder.build();
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerHealthCheck.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerHealthCheck.java
index b22999a2f8..d8060dae16 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerHealthCheck.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerHealthCheck.java
@@ -18,18 +18,12 @@ package org.apache.hadoop.hdds.scm.container.replication;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.OverReplicatedHealthResult;
 import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
 import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.HealthState;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 
 import org.junit.Assert;
 import org.junit.Before;
@@ -37,7 +31,6 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -45,6 +38,10 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalSt
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.ADD;
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE;
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerInfo;
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
 
 /**
  * Tests for the ECContainerHealthCheck class.
@@ -66,7 +63,7 @@ public class TestECContainerHealthCheck {
     Set<ContainerReplica> replicas
         = createReplicas(container.containerID(), 1, 2, 3, 4, 5);
     ContainerHealthResult result = healthCheck.checkHealth(container, replicas,
-        Collections.emptyList(), Collections.emptyList(), 2);
+        Collections.emptyList(), 2);
     Assert.assertEquals(HealthState.HEALTHY, result.getHealthState());
   }
 
@@ -77,7 +74,7 @@ public class TestECContainerHealthCheck {
         = createReplicas(container.containerID(), 1, 2, 4, 5);
     UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
         healthCheck.checkHealth(container, replicas,
-            Collections.emptyList(), Collections.emptyList(), 2);
+            Collections.emptyList(), 2);
     Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
     Assert.assertEquals(1, result.getRemainingRedundancy());
     Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
@@ -89,11 +86,11 @@ public class TestECContainerHealthCheck {
     ContainerInfo container = createContainerInfo(repConfig);
     Set<ContainerReplica> replicas
         = createReplicas(container.containerID(), 1, 2, 4, 5);
-    List<Pair<Integer, DatanodeDetails>> pending = new ArrayList<>();
-    pending.add(Pair.of(3, MockDatanodeDetails.randomDatanodeDetails()));
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        ADD, MockDatanodeDetails.randomDatanodeDetails(), 3));
     UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
-        healthCheck.checkHealth(container, replicas, pending,
-            Collections.emptyList(), 2);
+        healthCheck.checkHealth(container, replicas, pending, 2);
     Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
     Assert.assertEquals(1, result.getRemainingRedundancy());
     Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
@@ -110,7 +107,7 @@ public class TestECContainerHealthCheck {
 
     UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
         healthCheck.checkHealth(container, replicas, Collections.emptyList(),
-            Collections.emptyList(), 2);
+            2);
     Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
     Assert.assertEquals(2, result.getRemainingRedundancy());
     Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
@@ -124,12 +121,12 @@ public class TestECContainerHealthCheck {
         Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
         Pair.of(IN_SERVICE, 3), Pair.of(DECOMMISSIONING, 4),
         Pair.of(IN_SERVICE, 4), Pair.of(DECOMMISSIONED, 5));
-    List<Pair<Integer, DatanodeDetails>> pending = new ArrayList<>();
-    pending.add(Pair.of(5, MockDatanodeDetails.randomDatanodeDetails()));
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        ADD, MockDatanodeDetails.randomDatanodeDetails(), 5));
 
     UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
-        healthCheck.checkHealth(container, replicas, pending,
-            Collections.emptyList(), 2);
+        healthCheck.checkHealth(container, replicas, pending, 2);
     Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
     Assert.assertEquals(2, result.getRemainingRedundancy());
     Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
@@ -142,12 +139,12 @@ public class TestECContainerHealthCheck {
     Set<ContainerReplica> replicas = createReplicas(container.containerID(),
         Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
         Pair.of(DECOMMISSIONING, 4), Pair.of(DECOMMISSIONED, 5));
-    List<Pair<Integer, DatanodeDetails>> pending = new ArrayList<>();
-    pending.add(Pair.of(3, MockDatanodeDetails.randomDatanodeDetails()));
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        ADD, MockDatanodeDetails.randomDatanodeDetails(), 3));
 
     UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
-        healthCheck.checkHealth(container, replicas, pending,
-            Collections.emptyList(), 2);
+        healthCheck.checkHealth(container, replicas, pending, 2);
     Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
     Assert.assertEquals(1, result.getRemainingRedundancy());
     Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
@@ -161,7 +158,7 @@ public class TestECContainerHealthCheck {
         Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
 
     UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
-        healthCheck.checkHealth(container, replicas, Collections.emptyList(),
+        healthCheck.checkHealth(container, replicas,
             Collections.emptyList(), 2);
     Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
     Assert.assertEquals(-1, result.getRemainingRedundancy());
@@ -179,13 +176,14 @@ public class TestECContainerHealthCheck {
         Pair.of(IN_SERVICE, 5),
         Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
 
-    List<Pair<Integer, DatanodeDetails>> pending = new ArrayList<>();
-    pending.add(Pair.of(1, MockDatanodeDetails.randomDatanodeDetails()));
-    pending.add(Pair.of(2, MockDatanodeDetails.randomDatanodeDetails()));
+    List<ContainerReplicaOp> pending = new ArrayList<>();
+    pending.add(ContainerReplicaOp.create(
+        DELETE, MockDatanodeDetails.randomDatanodeDetails(), 1));
+    pending.add(ContainerReplicaOp.create(
+        DELETE, MockDatanodeDetails.randomDatanodeDetails(), 2));
 
     OverReplicatedHealthResult result = (OverReplicatedHealthResult)
-        healthCheck.checkHealth(container, replicas, Collections.emptyList(),
-            pending, 2);
+        healthCheck.checkHealth(container, replicas, pending, 2);
     Assert.assertEquals(HealthState.OVER_REPLICATED, result.getHealthState());
     Assert.assertEquals(2, result.getExcessRedundancy());
     Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
@@ -201,7 +199,7 @@ public class TestECContainerHealthCheck {
         Pair.of(IN_MAINTENANCE, 1), Pair.of(IN_MAINTENANCE, 2));
 
     ContainerHealthResult result = healthCheck.checkHealth(container, replicas,
-        Collections.emptyList(), Collections.emptyList(), 2);
+        Collections.emptyList(), 2);
     Assert.assertEquals(HealthState.HEALTHY, result.getHealthState());
   }
 
@@ -214,60 +212,10 @@ public class TestECContainerHealthCheck {
         Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
 
     ContainerHealthResult result = healthCheck.checkHealth(container, replicas,
-        Collections.emptyList(), Collections.emptyList(), 2);
+        Collections.emptyList(), 2);
     Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
     Assert.assertEquals(1,
         ((UnderReplicatedHealthResult)result).getRemainingRedundancy());
   }
 
-
-  private Set<ContainerReplica> createReplicas(ContainerID containerID,
-      Pair<HddsProtos.NodeOperationalState, Integer>... nodes) {
-    Set<ContainerReplica> replicas = new HashSet<>();
-    for (Pair<HddsProtos.NodeOperationalState, Integer> p : nodes) {
-      replicas.add(
-          createContainerReplica(containerID, p.getRight(), p.getLeft()));
-    }
-    return replicas;
-  }
-
-  private Set<ContainerReplica> createReplicas(ContainerID containerID,
-      int... indexes) {
-    Set<ContainerReplica> replicas = new HashSet<>();
-    for (int i : indexes) {
-      replicas.add(createContainerReplica(
-          containerID, i, IN_SERVICE));
-    }
-    return replicas;
-  }
-
-  private ContainerReplica createContainerReplica(ContainerID containerID,
-      int replicaIndex, HddsProtos.NodeOperationalState opState) {
-    ContainerReplica.ContainerReplicaBuilder builder
-        = ContainerReplica.newBuilder();
-    DatanodeDetails datanodeDetails
-        = MockDatanodeDetails.randomDatanodeDetails();
-    datanodeDetails.setPersistedOpState(opState);
-    builder.setContainerID(containerID);
-    builder.setReplicaIndex(replicaIndex);
-    builder.setKeyCount(123);
-    builder.setBytesUsed(1234);
-    builder.setContainerState(StorageContainerDatanodeProtocolProtos
-        .ContainerReplicaProto.State.CLOSED);
-    builder.setDatanodeDetails(datanodeDetails);
-    builder.setSequenceId(0);
-    builder.setOriginNodeId(datanodeDetails.getUuid());
-    return builder.build();
-  }
-
-  private ContainerInfo createContainerInfo(
-      ReplicationConfig replicationConfig) {
-    ContainerInfo.Builder builder = new ContainerInfo.Builder();
-    builder.setContainerID(1);
-    builder.setOwner("Ozone");
-    builder.setPipelineID(PipelineID.randomId());
-    builder.setReplicationConfig(replicationConfig);
-    builder.setState(HddsProtos.LifeCycleState.CLOSED);
-    return builder.build();
-  }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
similarity index 99%
copy from hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
copy to hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
index 38ece4992d..66863ce553 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
@@ -113,7 +113,7 @@ import static org.mockito.Mockito.when;
 /**
  * Test cases to verify the functionality of ReplicationManager.
  */
-public class TestReplicationManager {
+public class TestLegacyReplicationManager {
 
   private ReplicationManager replicationManager;
   private ContainerStateManager containerStateManager;
@@ -256,19 +256,23 @@ public class TestReplicationManager {
     dbStore = DBStoreBuilder.createDBStore(
       config, new SCMDBDefinition());
 
+    LegacyReplicationManager legacyRM = new LegacyReplicationManager(
+        config, containerManager, containerPlacementPolicy, eventQueue,
+        SCMContext.emptyContext(), nodeManager, scmHAManager, clock,
+        SCMDBDefinition.MOVE.getTable(dbStore));
+
     replicationManager = new ReplicationManager(
         config,
         containerManager,
         containerPlacementPolicy,
         eventQueue,
         SCMContext.emptyContext(),
-        serviceManager,
         nodeManager,
         clock,
-        scmHAManager,
-        SCMDBDefinition.MOVE.getTable(dbStore),
+        legacyRM,
         containerReplicaPendingOps);
 
+    serviceManager.register(replicationManager);
     serviceManager.notifyStatusChanged();
     scmLogs.clearOutput();
     Thread.sleep(100L);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 38ece4992d..fdce87f8ba 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -15,66 +15,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hdds.scm.container.replication;
 
-import com.google.common.primitives.Longs;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
-import org.apache.hadoop.hdds.scm.container.ContainerStateManagerImpl;
 import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
-import org.apache.hadoop.hdds.scm.container.SimpleMockNodeManager;
-import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
-    .ReplicationManagerConfiguration;
-import org.apache.hadoop.hdds.scm.PlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
-import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager.MoveResult;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
-import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
-import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
-import org.apache.hadoop.hdds.scm.metadata.SCMDBTransactionBufferImpl;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.utils.db.DBStore;
-import org.apache.hadoop.hdds.scm.node.NodeStatus;
-import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
-import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.server.events.EventQueue;
-import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
-import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
-import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
-import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.TestClock;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.io.File;
 import java.io.IOException;
 import java.time.Instant;
 import java.time.ZoneId;
@@ -82,2076 +45,212 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
-import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.createDatanodeDetails;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
-import static org.apache.hadoop.hdds.scm.HddsTestUtils.CONTAINER_NUM_KEYS_DEFAULT;
-import static org.apache.hadoop.hdds.scm.HddsTestUtils.CONTAINER_USED_BYTES_DEFAULT;
-import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer;
-import static org.apache.hadoop.hdds.scm.HddsTestUtils.getReplicas;
-import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
-import static org.mockito.Mockito.when;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT;
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerInfo;
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
 
 /**
- * Test cases to verify the functionality of ReplicationManager.
+ * Tests for the ReplicationManager.
  */
 public class TestReplicationManager {
 
+  private OzoneConfiguration configuration;
   private ReplicationManager replicationManager;
-  private ContainerStateManager containerStateManager;
-  private PlacementPolicy containerPlacementPolicy;
-  private EventQueue eventQueue;
-  private DatanodeCommandHandler datanodeCommandHandler;
-  private SimpleMockNodeManager nodeManager;
+  private LegacyReplicationManager legacyReplicationManager;
   private ContainerManager containerManager;
-  private GenericTestUtils.LogCapturer scmLogs;
-  private SCMServiceManager serviceManager;
+  private PlacementPolicy placementPolicy;
+  private EventPublisher eventPublisher;
+  private SCMContext scmContext;
+  private NodeManager nodeManager;
   private TestClock clock;
-  private File testDir;
-  private DBStore dbStore;
-  private PipelineManager pipelineManager;
-  private SCMHAManager scmhaManager;
   private ContainerReplicaPendingOps containerReplicaPendingOps;
 
-  int getInflightCount(InflightType type) {
-    return replicationManager.getLegacyReplicationManager()
-        .getInflightCount(type);
-  }
-
-  @BeforeEach
-  public void setup()
-      throws IOException, InterruptedException,
-      NodeNotFoundException, InvalidStateTransitionException {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setTimeDuration(
-        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
-        0, TimeUnit.SECONDS);
+  private Map<ContainerID, Set<ContainerReplica>> containerReplicaMap;
+  private ReplicationConfig repConfig;
+  private ReplicationManagerReport repReport;
+  private List<ContainerHealthResult.UnderReplicatedHealthResult> underRep;
+  private List<ContainerHealthResult.OverReplicatedHealthResult> overRep;
 
-    scmLogs = GenericTestUtils.LogCapturer.
-        captureLogs(LegacyReplicationManager.LOG);
+  @Before
+  public void setup() throws IOException {
+    configuration = new OzoneConfiguration();
+    configuration.set(HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "0s");
     containerManager = Mockito.mock(ContainerManager.class);
-    nodeManager = new SimpleMockNodeManager();
-    eventQueue = new EventQueue();
-    scmhaManager = SCMHAManagerStub.getInstance(true);
-    testDir = GenericTestUtils.getTestDir(
-        TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
-    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
-    dbStore = DBStoreBuilder.createDBStore(
-        conf, new SCMDBDefinition());
-    pipelineManager = Mockito.mock(PipelineManager.class);
-    when(pipelineManager.containsPipeline(Mockito.any(PipelineID.class)))
-        .thenReturn(true);
-    containerStateManager = ContainerStateManagerImpl.newBuilder()
-        .setConfiguration(conf)
-        .setPipelineManager(pipelineManager)
-        .setRatisServer(scmhaManager.getRatisServer())
-        .setContainerStore(SCMDBDefinition.CONTAINERS.getTable(dbStore))
-        .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
-        .build();
-    serviceManager = new SCMServiceManager();
-
-    datanodeCommandHandler = new DatanodeCommandHandler();
-    eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, datanodeCommandHandler);
-
-    Mockito.when(containerManager.getContainers())
-        .thenAnswer(invocation -> {
-          Set<ContainerID> ids = containerStateManager.getContainerIDs();
-          List<ContainerInfo> containers = new ArrayList<>();
-          for (ContainerID id : ids) {
-            containers.add(containerStateManager.getContainer(
-                id));
-          }
-          return containers;
-        });
-
-    Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
-        .thenAnswer(invocation -> containerStateManager
-            .getContainer(((ContainerID)invocation
-                .getArguments()[0])));
-
-    Mockito.when(containerManager.getContainerReplicas(
-        Mockito.any(ContainerID.class)))
-        .thenAnswer(invocation -> containerStateManager
-            .getContainerReplicas(((ContainerID)invocation
-                .getArguments()[0])));
-
-    containerPlacementPolicy = Mockito.mock(PlacementPolicy.class);
-
-    Mockito.when(containerPlacementPolicy.chooseDatanodes(
-        Mockito.any(), Mockito.any(), Mockito.anyInt(),
-            Mockito.anyLong(), Mockito.anyLong()))
-        .thenAnswer(invocation -> {
-          int count = (int) invocation.getArguments()[2];
-          return IntStream.range(0, count)
-              .mapToObj(i -> randomDatanodeDetails())
-              .collect(Collectors.toList());
-        });
-
-    Mockito.when(containerPlacementPolicy.validateContainerPlacement(
-        Mockito.any(),
-        Mockito.anyInt()
-        )).thenAnswer(invocation ->
-        new ContainerPlacementStatusDefault(2, 2, 3));
-    clock = new TestClock(Instant.now(), ZoneId.of("UTC"));
-    containerReplicaPendingOps = new ContainerReplicaPendingOps(conf, clock);
-    createReplicationManager(new ReplicationManagerConfiguration());
-  }
-
-  void createReplicationManager(int replicationLimit, int deletionLimit)
-      throws Exception {
-    replicationManager.stop();
-    dbStore.close();
-    final LegacyReplicationManager.ReplicationManagerConfiguration conf
-        = new LegacyReplicationManager.ReplicationManagerConfiguration();
-    conf.setContainerInflightReplicationLimit(replicationLimit);
-    conf.setContainerInflightDeletionLimit(deletionLimit);
-    createReplicationManager(conf);
-  }
-
-  void createReplicationManager(
-      LegacyReplicationManager.ReplicationManagerConfiguration conf)
-      throws Exception {
-    createReplicationManager(null, conf);
-  }
-
-  private void createReplicationManager(ReplicationManagerConfiguration rmConf)
-      throws InterruptedException, IOException {
-    createReplicationManager(rmConf, null);
-  }
-
-  void createReplicationManager(ReplicationManagerConfiguration rmConf,
-      LegacyReplicationManager.ReplicationManagerConfiguration lrmConf)
-      throws InterruptedException, IOException {
-    OzoneConfiguration config = new OzoneConfiguration();
-    testDir = GenericTestUtils
-      .getTestDir(TestContainerManagerImpl.class.getSimpleName());
-    config.set(HddsConfigKeys.OZONE_METADATA_DIRS,
-        testDir.getAbsolutePath());
-    config.setTimeDuration(
-        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
-        0, TimeUnit.SECONDS);
-    Optional.ofNullable(rmConf).ifPresent(config::setFromObject);
-    Optional.ofNullable(lrmConf).ifPresent(config::setFromObject);
-
-    SCMHAManager scmHAManager = SCMHAManagerStub
-        .getInstance(true, new SCMDBTransactionBufferImpl());
-    dbStore = DBStoreBuilder.createDBStore(
-      config, new SCMDBDefinition());
+    placementPolicy = Mockito.mock(PlacementPolicy.class);
+    eventPublisher = Mockito.mock(EventPublisher.class);
+    scmContext = Mockito.mock(SCMContext.class);
+    nodeManager = Mockito.mock(NodeManager.class);
+    legacyReplicationManager = Mockito.mock(LegacyReplicationManager.class);
+    clock = new TestClock(Instant.now(), ZoneId.systemDefault());
+    containerReplicaPendingOps =
+        new ContainerReplicaPendingOps(configuration, clock);
+
+    Mockito.when(containerManager
+        .getContainerReplicas(Mockito.any(ContainerID.class))).thenAnswer(
+          invocation -> {
+            ContainerID cid = invocation.getArgument(0);
+            return containerReplicaMap.get(cid);
+          });
 
     replicationManager = new ReplicationManager(
-        config,
+        configuration,
         containerManager,
-        containerPlacementPolicy,
-        eventQueue,
-        SCMContext.emptyContext(),
-        serviceManager,
+        placementPolicy,
+        eventPublisher,
+        scmContext,
         nodeManager,
         clock,
-        scmHAManager,
-        SCMDBDefinition.MOVE.getTable(dbStore),
+        legacyReplicationManager,
         containerReplicaPendingOps);
-
-    serviceManager.notifyStatusChanged();
-    scmLogs.clearOutput();
-    Thread.sleep(100L);
+    containerReplicaMap = new HashMap<>();
+    repConfig = new ECReplicationConfig(3, 2);
+    repReport = new ReplicationManagerReport();
+    underRep = new ArrayList<>();
+    overRep = new ArrayList<>();
   }
 
-  @AfterEach
-  public void tearDown() throws Exception {
-    containerStateManager.close();
-    if (dbStore != null) {
-      dbStore.close();
-    }
-
-    FileUtil.fullyDelete(testDir);
-  }
-
-  /**
-   * Checks if restarting of replication manager works.
-   */
   @Test
-  public void testReplicationManagerRestart() throws InterruptedException {
-    Assertions.assertTrue(replicationManager.isRunning());
-    replicationManager.stop();
-    // Stop is a non-blocking call, it might take sometime for the
-    // ReplicationManager to shutdown
-    Thread.sleep(500);
-    Assertions.assertFalse(replicationManager.isRunning());
-    replicationManager.start();
-    Assertions.assertTrue(replicationManager.isRunning());
-  }
+  public void testHealthyContainer() throws ContainerNotFoundException {
+    ContainerInfo container = createContainerInfo(repConfig, 1,
+        HddsProtos.LifeCycleState.CLOSED);
+    addReplicas(container, 1, 2, 3, 4, 5);
 
-  /**
-   * Open containers are not handled by ReplicationManager.
-   * This test-case makes sure that ReplicationManages doesn't take
-   * any action on OPEN containers.
-   */
-  @Test
-  public void testOpenContainer() throws IOException {
-    final ContainerInfo container = getContainer(LifeCycleState.OPEN);
-    containerStateManager.addContainer(container.getProtobuf());
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    ReplicationManagerReport report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.OPEN));
-    Assertions.assertEquals(0, datanodeCommandHandler.getInvocation());
+    ContainerHealthResult result = replicationManager.processContainer(
+        container, underRep, overRep, repReport);
+    Assert.assertEquals(ContainerHealthResult.HealthState.HEALTHY,
+        result.getHealthState());
+    Assert.assertEquals(0, underRep.size());
+    Assert.assertEquals(0, overRep.size());
   }
 
-  /**
-   * If the container is in CLOSING state we resend close container command
-   * to all the datanodes.
-   */
   @Test
-  public void testClosingContainer() throws IOException {
-    final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
-    final ContainerID id = container.containerID();
-
-    containerStateManager.addContainer(container.getProtobuf());
-
-    // Two replicas in CLOSING state
-    final Set<ContainerReplica> replicas = getReplicas(id, State.CLOSING,
-        randomDatanodeDetails(),
-        randomDatanodeDetails());
-
-    // One replica in OPEN state
-    final DatanodeDetails datanode = randomDatanodeDetails();
-    replicas.addAll(getReplicas(id, State.OPEN, datanode));
-
-    for (ContainerReplica replica : replicas) {
-      containerStateManager.updateContainerReplica(id, replica);
-    }
-
-    final int currentCloseCommandCount = datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
+  public void testUnderReplicatedContainer() throws ContainerNotFoundException {
+    ContainerInfo container = createContainerInfo(repConfig, 1,
+        HddsProtos.LifeCycleState.CLOSED);
+    addReplicas(container, 1, 2, 3, 4);
 
-    // Update the OPEN to CLOSING
-    for (ContainerReplica replica : getReplicas(id, State.CLOSING, datanode)) {
-      containerStateManager.updateContainerReplica(id, replica);
-    }
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(currentCloseCommandCount + 6, datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
-    ReplicationManagerReport report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.CLOSING));
-  }
-
-
-  /**
-   * The container is QUASI_CLOSED but two of the replica is still in
-   * open state. ReplicationManager should resend close command to those
-   * datanodes.
-   */
-  @Test
-  public void testQuasiClosedContainerWithTwoOpenReplica() throws IOException {
-    final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
-    final ContainerID id = container.containerID();
-    final UUID originNodeId = UUID.randomUUID();
-    final ContainerReplica replicaOne = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaTwo = getReplicas(
-        id, State.OPEN, 1000L, originNodeId, randomDatanodeDetails());
-    final DatanodeDetails datanodeDetails = randomDatanodeDetails();
-    final ContainerReplica replicaThree = getReplicas(
-        id, State.OPEN, 1000L, datanodeDetails.getUuid(), datanodeDetails);
-
-    containerStateManager.addContainer(container.getProtobuf());
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-    containerStateManager.updateContainerReplica(
-        id, replicaThree);
-
-    final int currentCloseCommandCount = datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
-    // Two of the replicas are in OPEN state
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(currentCloseCommandCount + 2, datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
-    Assertions.assertTrue(datanodeCommandHandler.received(
-        SCMCommandProto.Type.closeContainerCommand,
-        replicaTwo.getDatanodeDetails()));
-    Assertions.assertTrue(datanodeCommandHandler.received(
-        SCMCommandProto.Type.closeContainerCommand,
-        replicaThree.getDatanodeDetails()));
-    ReplicationManagerReport report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
-  }
-
-  /**
-   * When the container is in QUASI_CLOSED state and all the replicas are
-   * also in QUASI_CLOSED state and doesn't have a quorum to force close
-   * the container, ReplicationManager will not do anything.
-   */
-  @Test
-  public void testHealthyQuasiClosedContainer() throws IOException {
-    final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
-    final ContainerID id = container.containerID();
-    final UUID originNodeId = UUID.randomUUID();
-    final ContainerReplica replicaOne = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaTwo = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaThree = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-
-    containerStateManager.addContainer(container.getProtobuf());
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-    containerStateManager.updateContainerReplica(
-        id, replicaThree);
-
-    // All the QUASI_CLOSED replicas have same originNodeId, so the
-    // container will not be closed. ReplicationManager should take no action.
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(0, datanodeCommandHandler.getInvocation());
-    ReplicationManagerReport report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
-  }
-
-  /**
-   * When a container is QUASI_CLOSED and we don't have quorum to force close
-   * the container, the container should have all the replicas in QUASI_CLOSED
-   * state, else ReplicationManager will take action.
-   *
-   * In this test case we make one of the replica unhealthy, replication manager
-   * will send delete container command to the datanode which has the unhealthy
-   * replica.
-   */
-  @Test
-  public void testQuasiClosedContainerWithUnhealthyReplica()
-      throws IOException {
-    final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
-    container.setUsedBytes(100);
-    final ContainerID id = container.containerID();
-    final UUID originNodeId = UUID.randomUUID();
-    final ContainerReplica replicaOne = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaTwo = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaThree = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-
-    containerStateManager.addContainer(container.getProtobuf());
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-    containerStateManager.updateContainerReplica(
-        id, replicaThree);
-
-    final int currentDeleteCommandCount = datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
-    final int currentReplicateCommandCount = datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
-
-    // All the QUASI_CLOSED replicas have same originNodeId, so the
-    // container will not be closed. ReplicationManager should take no action.
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(0, datanodeCommandHandler.getInvocation());
-
-    // Make the first replica unhealthy
-    final ContainerReplica unhealthyReplica = getReplicas(
-        id, State.UNHEALTHY, 1000L, originNodeId,
-        replicaOne.getDatanodeDetails());
-    containerStateManager.updateContainerReplica(
-        id, unhealthyReplica);
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(currentDeleteCommandCount + 1,
-        datanodeCommandHandler
-            .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
-    Assertions.assertTrue(datanodeCommandHandler.received(
-        SCMCommandProto.Type.deleteContainerCommand,
-        replicaOne.getDatanodeDetails()));
-    Assertions.assertEquals(currentDeleteCommandCount + 1,
-        replicationManager.getMetrics().getNumDeletionCmdsSent());
-
-    // Now we will delete the unhealthy replica from in-memory.
-    containerStateManager.removeContainerReplica(id, replicaOne);
-
-    final long currentBytesToReplicate = replicationManager.getMetrics()
-        .getNumReplicationBytesTotal();
-
-    // The container is under replicated as unhealthy replica is removed
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-
-    // We should get replicate command
-    Assertions.assertEquals(currentReplicateCommandCount + 1,
-        datanodeCommandHandler.getInvocationCount(
-            SCMCommandProto.Type.replicateContainerCommand));
-    Assertions.assertEquals(currentReplicateCommandCount + 1,
-        replicationManager.getMetrics().getNumReplicationCmdsSent());
-    Assertions.assertEquals(currentBytesToReplicate + 100L,
-        replicationManager.getMetrics().getNumReplicationBytesTotal());
-    Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
-    Assertions.assertEquals(1, replicationManager.getMetrics()
-        .getInflightReplication());
-
-    // We should have one under replicated and one quasi_closed_stuck
-    ReplicationManagerReport report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
-
-    // Now we add the missing replica back
-    DatanodeDetails targetDn = replicationManager.getLegacyReplicationManager()
-        .getFirstDatanode(InflightType.REPLICATION, id);
-    final ContainerReplica replicatedReplicaOne = getReplicas(
-        id, State.CLOSED, 1000L, originNodeId, targetDn);
-    containerStateManager.updateContainerReplica(
-        id, replicatedReplicaOne);
-
-    final long currentReplicationCommandCompleted = replicationManager
-        .getMetrics().getNumReplicationCmdsCompleted();
-    final long currentBytesCompleted = replicationManager.getMetrics()
-        .getNumReplicationBytesCompleted();
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-
-    Assertions.assertEquals(0, getInflightCount(InflightType.REPLICATION));
-    Assertions.assertEquals(0, replicationManager.getMetrics()
-        .getInflightReplication());
-    Assertions.assertEquals(currentReplicationCommandCompleted + 1,
-        replicationManager.getMetrics().getNumReplicationCmdsCompleted());
-    Assertions.assertEquals(currentBytesCompleted + 100L,
-        replicationManager.getMetrics().getNumReplicationBytesCompleted());
-
-    report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
-    Assertions.assertEquals(0, report.getStat(
+    ContainerHealthResult result = replicationManager.processContainer(
+        container, underRep, overRep, repReport);
+    Assert.assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED,
+        result.getHealthState());
+    Assert.assertEquals(1, underRep.size());
+    Assert.assertEquals(0, overRep.size());
+    Assert.assertEquals(1, repReport.getStat(
         ReplicationManagerReport.HealthState.UNDER_REPLICATED));
   }
 
-  /**
-   * When a QUASI_CLOSED container is over replicated, ReplicationManager
-   * deletes the excess replicas.
-   */
-  @Test
-  public void testOverReplicatedQuasiClosedContainer() throws IOException {
-    final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
-    container.setUsedBytes(101);
-    final ContainerID id = container.containerID();
-    final UUID originNodeId = UUID.randomUUID();
-    final ContainerReplica replicaOne = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaTwo = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaThree = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaFour = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-
-    containerStateManager.addContainer(container.getProtobuf());
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-    containerStateManager.updateContainerReplica(
-        id, replicaThree);
-    containerStateManager.updateContainerReplica(id, replicaFour);
-
-    final int currentDeleteCommandCount = datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(currentDeleteCommandCount + 1,
-        datanodeCommandHandler
-            .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
-    Assertions.assertEquals(currentDeleteCommandCount + 1,
-        replicationManager.getMetrics().getNumDeletionCmdsSent());
-    Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
-    Assertions.assertEquals(1, replicationManager.getMetrics()
-        .getInflightDeletion());
-
-    ReplicationManagerReport report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.OVER_REPLICATED));
-
-    // Now we remove the replica according to inflight
-    DatanodeDetails targetDn = replicationManager.getLegacyReplicationManager()
-        .getFirstDatanode(InflightType.DELETION, id);
-    if (targetDn.equals(replicaOne.getDatanodeDetails())) {
-      containerStateManager.removeContainerReplica(
-          id, replicaOne);
-    } else if (targetDn.equals(replicaTwo.getDatanodeDetails())) {
-      containerStateManager.removeContainerReplica(
-          id, replicaTwo);
-    } else if (targetDn.equals(replicaThree.getDatanodeDetails())) {
-      containerStateManager.removeContainerReplica(
-          id, replicaThree);
-    } else if (targetDn.equals(replicaFour.getDatanodeDetails())) {
-      containerStateManager.removeContainerReplica(
-          id, replicaFour);
-    }
-
-    final long currentDeleteCommandCompleted = replicationManager.getMetrics()
-        .getNumDeletionCmdsCompleted();
-    final long deleteBytesCompleted =
-        replicationManager.getMetrics().getNumDeletionBytesCompleted();
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(0, getInflightCount(InflightType.DELETION));
-    Assertions.assertEquals(0, replicationManager.getMetrics()
-        .getInflightDeletion());
-    Assertions.assertEquals(currentDeleteCommandCompleted + 1,
-        replicationManager.getMetrics().getNumDeletionCmdsCompleted());
-    Assertions.assertEquals(deleteBytesCompleted + 101,
-        replicationManager.getMetrics().getNumDeletionBytesCompleted());
-
-    report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
-    Assertions.assertEquals(0, report.getStat(
-        ReplicationManagerReport.HealthState.OVER_REPLICATED));
-  }
-
-  /**
-   * When a QUASI_CLOSED container is over replicated, ReplicationManager
-   * deletes the excess replicas. While choosing the replica for deletion
-   * ReplicationManager should prioritize unhealthy replica over QUASI_CLOSED
-   * replica.
-   */
-  @Test
-  public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica()
-      throws IOException {
-    final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
-    final ContainerID id = container.containerID();
-    final UUID originNodeId = UUID.randomUUID();
-    final ContainerReplica replicaOne = getReplicas(
-        id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaTwo = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaThree = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaFour = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-
-    containerStateManager.addContainer(container.getProtobuf());
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-    containerStateManager.updateContainerReplica(
-        id, replicaThree);
-    containerStateManager.updateContainerReplica(id, replicaFour);
-
-    final int currentDeleteCommandCount = datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(currentDeleteCommandCount + 1,
-        datanodeCommandHandler
-            .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
-    Assertions.assertTrue(datanodeCommandHandler.received(
-        SCMCommandProto.Type.deleteContainerCommand,
-        replicaOne.getDatanodeDetails()));
-    Assertions.assertEquals(currentDeleteCommandCount + 1,
-        replicationManager.getMetrics().getNumDeletionCmdsSent());
-    Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
-    Assertions.assertEquals(1, replicationManager.getMetrics()
-        .getInflightDeletion());
-
-    ReplicationManagerReport report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.OVER_REPLICATED));
-
-    final long currentDeleteCommandCompleted = replicationManager.getMetrics()
-        .getNumDeletionCmdsCompleted();
-    // Now we remove the replica to simulate deletion complete
-    containerStateManager.removeContainerReplica(id, replicaOne);
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-
-    Assertions.assertEquals(currentDeleteCommandCompleted + 1,
-        replicationManager.getMetrics().getNumDeletionCmdsCompleted());
-    Assertions.assertEquals(0, getInflightCount(InflightType.DELETION));
-    Assertions.assertEquals(0, replicationManager.getMetrics()
-        .getInflightDeletion());
-
-    report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
-    Assertions.assertEquals(0, report.getStat(
-        ReplicationManagerReport.HealthState.OVER_REPLICATED));
-  }
-
-  /**
-   * ReplicationManager should replicate an QUASI_CLOSED replica if it is
-   * under replicated.
-   */
   @Test
-  public void testUnderReplicatedQuasiClosedContainer() throws IOException {
-    final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
-    container.setUsedBytes(100);
-    final ContainerID id = container.containerID();
-    final UUID originNodeId = UUID.randomUUID();
-    final ContainerReplica replicaOne = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaTwo = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-
-    containerStateManager.addContainer(container.getProtobuf());
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-
-    final int currentReplicateCommandCount = datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
-    final long currentBytesToReplicate = replicationManager.getMetrics()
-        .getNumReplicationBytesTotal();
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(currentReplicateCommandCount + 1,
-        datanodeCommandHandler.getInvocationCount(
-            SCMCommandProto.Type.replicateContainerCommand));
-    Assertions.assertEquals(currentReplicateCommandCount + 1,
-        replicationManager.getMetrics().getNumReplicationCmdsSent());
-    Assertions.assertEquals(currentBytesToReplicate + 100,
-        replicationManager.getMetrics().getNumReplicationBytesTotal());
-    Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
-    Assertions.assertEquals(1, replicationManager.getMetrics()
-        .getInflightReplication());
-
-    ReplicationManagerReport report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
-
-    final long currentReplicateCommandCompleted = replicationManager
-        .getMetrics().getNumReplicationCmdsCompleted();
-    final long currentReplicateBytesCompleted = replicationManager
-        .getMetrics().getNumReplicationBytesCompleted();
-
-    // Now we add the replicated new replica
-    DatanodeDetails targetDn = replicationManager.getLegacyReplicationManager()
-        .getFirstDatanode(InflightType.REPLICATION, id);
-    final ContainerReplica replicatedReplicaThree = getReplicas(
-        id, State.CLOSED, 1000L, originNodeId, targetDn);
-    containerStateManager.updateContainerReplica(
-        id, replicatedReplicaThree);
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-
-    Assertions.assertEquals(currentReplicateCommandCompleted + 1,
-        replicationManager.getMetrics().getNumReplicationCmdsCompleted());
-    Assertions.assertEquals(currentReplicateBytesCompleted + 100,
-        replicationManager.getMetrics().getNumReplicationBytesCompleted());
-    Assertions.assertEquals(0, getInflightCount(InflightType.REPLICATION));
-    Assertions.assertEquals(0, replicationManager.getMetrics()
-        .getInflightReplication());
-
-    report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
-    Assertions.assertEquals(0, report.getStat(
+  public void testUnderReplicatedContainerFixedByPending()
+      throws ContainerNotFoundException {
+    ContainerInfo container = createContainerInfo(repConfig, 1,
+        HddsProtos.LifeCycleState.CLOSED);
+    addReplicas(container, 1, 2, 3, 4);
+    containerReplicaPendingOps.scheduleAddReplica(container.containerID(),
+        MockDatanodeDetails.randomDatanodeDetails(), 5);
+
+    ContainerHealthResult result = replicationManager.processContainer(
+        container, underRep, overRep, repReport);
+    Assert.assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED,
+        result.getHealthState());
+    // As the pending replication fixes the under replication, nothing is added
+    // to the under replication list.
+    Assert.assertEquals(0, underRep.size());
+    Assert.assertEquals(0, overRep.size());
+    Assert.assertTrue(((ContainerHealthResult.UnderReplicatedHealthResult)
+        result).isSufficientlyReplicatedAfterPending());
+    // As the container is still under replicated, as the pending have not
+    // completed yet, the container is still marked as under-replciated in the
+    // report.
+    Assert.assertEquals(1, repReport.getStat(
         ReplicationManagerReport.HealthState.UNDER_REPLICATED));
   }
 
-  /**
-   * When a QUASI_CLOSED container is under replicated, ReplicationManager
-   * should re-replicate it. If there are any unhealthy replica, it has to
-   * be deleted.
-   *
-   * In this test case, the container is QUASI_CLOSED and is under replicated
-   * and also has an unhealthy replica.
-   *
-   * In the first iteration of ReplicationManager, it should re-replicate
-   * the container so that it has enough replicas.
-   *
-   * In the second iteration, ReplicationManager should delete the unhealthy
-   * replica.
-   *
-   * In the third iteration, ReplicationManager will re-replicate as the
-   * container has again become under replicated after the unhealthy
-   * replica has been deleted.
-   *
-   */
   @Test
-  public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica()
-      throws IOException, InterruptedException,
-      TimeoutException {
-    final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
-    container.setUsedBytes(99);
-    final ContainerID id = container.containerID();
-    final UUID originNodeId = UUID.randomUUID();
-    final ContainerReplica replicaOne = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaTwo = getReplicas(
-        id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
-
-    containerStateManager.addContainer(container.getProtobuf());
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-
-    final int currentReplicateCommandCount = datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
-    final int currentDeleteCommandCount = datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
-    final long currentBytesToDelete = replicationManager.getMetrics()
-        .getNumDeletionBytesTotal();
-
-    replicationManager.processAll();
-    GenericTestUtils.waitFor(
-        () -> (currentReplicateCommandCount + 1) == datanodeCommandHandler
-            .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand),
-        50, 5000);
-
-    Optional<CommandForDatanode> replicateCommand = datanodeCommandHandler
-        .getReceivedCommands().stream()
-        .filter(c -> c.getCommand().getType()
-            .equals(SCMCommandProto.Type.replicateContainerCommand))
-        .findFirst();
-
-    Assertions.assertTrue(replicateCommand.isPresent());
-
-    DatanodeDetails newNode = createDatanodeDetails(
-        replicateCommand.get().getDatanodeId());
-    ContainerReplica newReplica = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, newNode);
-    containerStateManager.updateContainerReplica(id, newReplica);
-
-    ReplicationManagerReport report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
-    Assertions.assertEquals(0, report.getStat(
-        ReplicationManagerReport.HealthState.UNHEALTHY));
-
-    /*
-     * We have report the replica to SCM, in the next ReplicationManager
-     * iteration it should delete the unhealthy replica.
-     */
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(currentDeleteCommandCount + 1,
-        datanodeCommandHandler
-            .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
-    // ReplicaTwo should be deleted, that is the unhealthy one
-    Assertions.assertTrue(datanodeCommandHandler.received(
-        SCMCommandProto.Type.deleteContainerCommand,
-        replicaTwo.getDatanodeDetails()));
-    Assertions.assertEquals(currentDeleteCommandCount + 1,
-        replicationManager.getMetrics().getNumDeletionCmdsSent());
-    Assertions.assertEquals(currentBytesToDelete + 99,
-        replicationManager.getMetrics().getNumDeletionBytesTotal());
-    Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
-    Assertions.assertEquals(1, replicationManager.getMetrics()
-        .getInflightDeletion());
-
-    containerStateManager.removeContainerReplica(id, replicaTwo);
-
-    final long currentDeleteCommandCompleted = replicationManager.getMetrics()
-        .getNumDeletionCmdsCompleted();
-
-    report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
-    Assertions.assertEquals(0, report.getStat(
-        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.UNHEALTHY));
-    /*
-     * We have now removed unhealthy replica, next iteration of
-     * ReplicationManager should re-replicate the container as it
-     * is under replicated now
-     */
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-
-    Assertions.assertEquals(0, getInflightCount(InflightType.DELETION));
-    Assertions.assertEquals(0, replicationManager.getMetrics()
-        .getInflightDeletion());
-    Assertions.assertEquals(currentDeleteCommandCompleted + 1,
-        replicationManager.getMetrics().getNumDeletionCmdsCompleted());
-
-    Assertions.assertEquals(currentReplicateCommandCount + 2,
-        datanodeCommandHandler.getInvocationCount(
-            SCMCommandProto.Type.replicateContainerCommand));
-    Assertions.assertEquals(currentReplicateCommandCount + 2,
-        replicationManager.getMetrics().getNumReplicationCmdsSent());
-    Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
-    Assertions.assertEquals(1, replicationManager.getMetrics()
-        .getInflightReplication());
-
-    report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
-    Assertions.assertEquals(1, report.getStat(
+  public void testUnderReplicatedAndUnrecoverable()
+      throws ContainerNotFoundException {
+    ContainerInfo container = createContainerInfo(repConfig, 1,
+        HddsProtos.LifeCycleState.CLOSED);
+    addReplicas(container, 1, 2);
+
+    ContainerHealthResult result = replicationManager.processContainer(
+        container, underRep, overRep, repReport);
+    Assert.assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED,
+        result.getHealthState());
+    // If it is unrecoverable, there is no point in putting it into the under
+    // replication list. It will be checked again on the next RM run.
+    Assert.assertEquals(0, underRep.size());
+    Assert.assertEquals(0, overRep.size());
+    Assert.assertEquals(1, repReport.getStat(
         ReplicationManagerReport.HealthState.UNDER_REPLICATED));
-    Assertions.assertEquals(0, report.getStat(
-        ReplicationManagerReport.HealthState.UNHEALTHY));
-  }
-
-
-  /**
-   * When a container is QUASI_CLOSED and it has >50% of its replica
-   * in QUASI_CLOSED state with unique origin node id,
-   * ReplicationManager should force close the replica(s) with
-   * highest BCSID.
-   */
-  @Test
-  public void testQuasiClosedToClosed() throws IOException {
-    final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
-    final ContainerID id = container.containerID();
-    final Set<ContainerReplica> replicas = getReplicas(id, State.QUASI_CLOSED,
-        randomDatanodeDetails(),
-        randomDatanodeDetails(),
-        randomDatanodeDetails());
-    containerStateManager.addContainer(container.getProtobuf());
-    for (ContainerReplica replica : replicas) {
-      containerStateManager.updateContainerReplica(id, replica);
-    }
-
-    final int currentCloseCommandCount = datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-
-    // All the replicas have same BCSID, so all of them will be closed.
-    Assertions.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
-
-    ReplicationManagerReport report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
-    Assertions.assertEquals(0, report.getStat(
-        ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
-  }
-
-
-  /**
-   * ReplicationManager should not take any action if the container is
-   * CLOSED and healthy.
-   */
-  @Test
-  public void testHealthyClosedContainer() throws IOException {
-    final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
-    final ContainerID id = container.containerID();
-    final Set<ContainerReplica> replicas = getReplicas(id, State.CLOSED,
-        randomDatanodeDetails(),
-        randomDatanodeDetails(),
-        randomDatanodeDetails());
-
-    containerStateManager.addContainer(container.getProtobuf());
-    for (ContainerReplica replica : replicas) {
-      containerStateManager.updateContainerReplica(id, replica);
-    }
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(0, datanodeCommandHandler.getInvocation());
-
-    ReplicationManagerReport report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.CLOSED));
-    for (ReplicationManagerReport.HealthState s :
-        ReplicationManagerReport.HealthState.values()) {
-      Assertions.assertEquals(0, report.getStat(s));
-    }
-  }
-
-  /**
-   * ReplicationManager should close the unhealthy OPEN container.
-   */
-  @Test
-  public void testUnhealthyOpenContainer() throws IOException {
-    final ContainerInfo container = getContainer(LifeCycleState.OPEN);
-    final ContainerID id = container.containerID();
-    final Set<ContainerReplica> replicas = getReplicas(id, State.OPEN,
-        randomDatanodeDetails(),
-        randomDatanodeDetails());
-    replicas.addAll(getReplicas(id, State.UNHEALTHY, randomDatanodeDetails()));
-
-    containerStateManager.addContainer(container.getProtobuf());
-    for (ContainerReplica replica : replicas) {
-      containerStateManager.updateContainerReplica(id, replica);
-    }
-
-    final CloseContainerEventHandler closeContainerHandler =
-        Mockito.mock(CloseContainerEventHandler.class);
-    eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    Mockito.verify(closeContainerHandler, Mockito.times(1))
-        .onMessage(id, eventQueue);
-
-    ReplicationManagerReport report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.OPEN));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.OPEN_UNHEALTHY));
-  }
-
-  /**
-   * ReplicationManager should skip send close command to unhealthy replica.
-   */
-  @Test
-  public void testCloseUnhealthyReplica() throws IOException {
-    final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
-    final ContainerID id = container.containerID();
-    final Set<ContainerReplica> replicas = getReplicas(id, State.UNHEALTHY,
-        randomDatanodeDetails());
-    replicas.addAll(getReplicas(id, State.OPEN, randomDatanodeDetails()));
-    replicas.addAll(getReplicas(id, State.OPEN, randomDatanodeDetails()));
-
-    containerStateManager.addContainer(container.getProtobuf());
-    for (ContainerReplica replica : replicas) {
-      containerStateManager.updateContainerReplica(id, replica);
-    }
-
-    replicationManager.processAll();
-    // Wait for EventQueue to call the event handler
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(2, datanodeCommandHandler.getInvocation());
-  }
-
-  @Test
-  public void testGeneratedConfig() {
-    ReplicationManagerConfiguration rmc =
-        OzoneConfiguration.newInstanceOf(ReplicationManagerConfiguration.class);
-
-    //default is not included in ozone-site.xml but generated from annotation
-    //to the ozone-site-generated.xml which should be loaded by the
-    // OzoneConfiguration.
-    Assertions.assertEquals(1800000, rmc.getEventTimeout());
-
-  }
-
-  @Test
-  public void additionalReplicaScheduledWhenMisReplicated() throws IOException {
-    final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
-    container.setUsedBytes(100);
-    final ContainerID id = container.containerID();
-    final UUID originNodeId = UUID.randomUUID();
-    final ContainerReplica replicaOne = getReplicas(
-        id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaTwo = getReplicas(
-        id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaThree = getReplicas(
-        id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-
-    containerStateManager.addContainer(container.getProtobuf());
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-    containerStateManager.updateContainerReplica(
-        id, replicaThree);
-
-    // Ensure a mis-replicated status is returned for any containers in this
-    // test where there are 3 replicas. When there are 2 or 4 replicas
-    // the status returned will be healthy.
-    Mockito.when(containerPlacementPolicy.validateContainerPlacement(
-        Mockito.argThat(list -> list.size() == 3),
-        Mockito.anyInt()
-    )).thenAnswer(invocation ->  {
-      return new ContainerPlacementStatusDefault(1, 2, 3);
-    });
-
-    int currentReplicateCommandCount = datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
-    final long currentBytesToReplicate = replicationManager.getMetrics()
-        .getNumReplicationBytesTotal();
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    // At this stage, due to the mocked calls to validateContainerPlacement
-    // the policy will not be satisfied, and replication will be triggered.
-
-    Assertions.assertEquals(currentReplicateCommandCount + 1,
-        datanodeCommandHandler.getInvocationCount(
-            SCMCommandProto.Type.replicateContainerCommand));
-    Assertions.assertEquals(currentReplicateCommandCount + 1,
-        replicationManager.getMetrics().getNumReplicationCmdsSent());
-    Assertions.assertEquals(currentBytesToReplicate + 100,
-        replicationManager.getMetrics().getNumReplicationBytesTotal());
-    Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
-    Assertions.assertEquals(1, replicationManager.getMetrics()
-        .getInflightReplication());
-
-    ReplicationManagerReport report = replicationManager.getContainerReport();
-    Assertions.assertEquals(1, report.getStat(LifeCycleState.CLOSED));
-    Assertions.assertEquals(1, report.getStat(
-        ReplicationManagerReport.HealthState.MIS_REPLICATED));
-
-    // Now make it so that all containers seem mis-replicated no matter how
-    // many replicas. This will test replicas are not scheduled if the new
-    // replica does not fix the mis-replication.
-    Mockito.when(containerPlacementPolicy.validateContainerPlacement(
-        Mockito.anyList(),
-        Mockito.anyInt()
-    )).thenAnswer(invocation ->  {
-      return new ContainerPlacementStatusDefault(1, 2, 3);
-    });
-
-    currentReplicateCommandCount = datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    // At this stage, due to the mocked calls to validateContainerPlacement
-    // the mis-replicated racks will not have improved, so expect to see nothing
-    // scheduled.
-    Assertions.assertEquals(currentReplicateCommandCount, datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand));
-    Assertions.assertEquals(currentReplicateCommandCount,
-        replicationManager.getMetrics().getNumReplicationCmdsSent());
-    Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
-    Assertions.assertEquals(1, replicationManager.getMetrics()
-        .getInflightReplication());
-  }
-
-  @Test
-  public void overReplicatedButRemovingMakesMisReplicated() throws IOException {
-    // In this test, the excess replica should not be removed.
-    final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
-    final ContainerID id = container.containerID();
-    final UUID originNodeId = UUID.randomUUID();
-    final ContainerReplica replicaOne = getReplicas(
-        id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaTwo = getReplicas(
-        id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaThree = getReplicas(
-        id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaFour = getReplicas(
-        id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaFive = getReplicas(
-        id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
-
-    containerStateManager.addContainer(container.getProtobuf());
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-    containerStateManager.updateContainerReplica(
-        id, replicaThree);
-    containerStateManager.updateContainerReplica(id, replicaFour);
-    containerStateManager.updateContainerReplica(id, replicaFive);
-
-    // Ensure a mis-replicated status is returned for any containers in this
-    // test where there are exactly 3 replicas checked.
-    Mockito.when(containerPlacementPolicy.validateContainerPlacement(
-        Mockito.argThat(list -> list.size() == 3),
-        Mockito.anyInt()
-    )).thenAnswer(
-        invocation -> new ContainerPlacementStatusDefault(1, 2, 3));
-
-    int currentDeleteCommandCount = datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    // The unhealthy replica should be removed, but not the other replica
-    // as each time we test with 3 replicas, Mockito ensures it returns
-    // mis-replicated
-    Assertions.assertEquals(currentDeleteCommandCount + 1,
-        datanodeCommandHandler
-            .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
-    Assertions.assertEquals(currentDeleteCommandCount + 1,
-        replicationManager.getMetrics().getNumDeletionCmdsSent());
-
-    Assertions.assertTrue(datanodeCommandHandler.received(
-        SCMCommandProto.Type.deleteContainerCommand,
-        replicaFive.getDatanodeDetails()));
-    Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
-    Assertions.assertEquals(1, replicationManager.getMetrics()
-        .getInflightDeletion());
-    assertOverReplicatedCount(1);
-  }
-
-  @Test
-  public void testOverReplicatedAndPolicySatisfied() throws IOException {
-    final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
-    final ContainerID id = container.containerID();
-    final UUID originNodeId = UUID.randomUUID();
-    final ContainerReplica replicaOne = getReplicas(
-        id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaTwo = getReplicas(
-        id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaThree = getReplicas(
-        id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaFour = getReplicas(
-        id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-
-    containerStateManager.addContainer(container.getProtobuf());
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-    containerStateManager.updateContainerReplica(
-        id, replicaThree);
-    containerStateManager.updateContainerReplica(id, replicaFour);
-
-    Mockito.when(containerPlacementPolicy.validateContainerPlacement(
-        Mockito.argThat(list -> list.size() == 3),
-        Mockito.anyInt()
-    )).thenAnswer(
-        invocation -> new ContainerPlacementStatusDefault(2, 2, 3));
-
-    final int currentDeleteCommandCount = datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(currentDeleteCommandCount + 1,
-        datanodeCommandHandler
-            .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
-    Assertions.assertEquals(currentDeleteCommandCount + 1,
-        replicationManager.getMetrics().getNumDeletionCmdsSent());
-    Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
-    Assertions.assertEquals(1, replicationManager.getMetrics()
-        .getInflightDeletion());
-
-    assertOverReplicatedCount(1);
-  }
-
-  @Test
-  public void testOverReplicatedAndPolicyUnSatisfiedAndDeleted() throws
-      IOException {
-    final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
-    final ContainerID id = container.containerID();
-    final UUID originNodeId = UUID.randomUUID();
-    final ContainerReplica replicaOne = getReplicas(
-        id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaTwo = getReplicas(
-        id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaThree = getReplicas(
-        id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaFour = getReplicas(
-        id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-    final ContainerReplica replicaFive = getReplicas(
-        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
-
-    containerStateManager.addContainer(container.getProtobuf());
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-    containerStateManager.updateContainerReplica(
-        id, replicaThree);
-    containerStateManager.updateContainerReplica(id, replicaFour);
-    containerStateManager.updateContainerReplica(id, replicaFive);
-
-    Mockito.when(containerPlacementPolicy.validateContainerPlacement(
-        Mockito.argThat(list -> list != null && list.size() <= 4),
-        Mockito.anyInt()
-    )).thenAnswer(
-        invocation -> new ContainerPlacementStatusDefault(1, 2, 3));
-
-    final int currentDeleteCommandCount = datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(currentDeleteCommandCount + 2,
-        datanodeCommandHandler
-            .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
-    Assertions.assertEquals(currentDeleteCommandCount + 2,
-        replicationManager.getMetrics().getNumDeletionCmdsSent());
-    Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
-    Assertions.assertEquals(1, replicationManager.getMetrics()
-        .getInflightDeletion());
-  }
-
-  /**
-   * ReplicationManager should replicate an additional replica if there are
-   * decommissioned replicas.
-   */
-  @Test
-  public void testUnderReplicatedDueToDecommission() throws IOException {
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
-    assertReplicaScheduled(2);
-    assertUnderReplicatedCount(1);
-  }
-
-  /**
-   * ReplicationManager should replicate an additional replica when all copies
-   * are decommissioning.
-   */
-  @Test
-  public void testUnderReplicatedDueToAllDecommission() throws IOException {
-    runTestUnderReplicatedDueToAllDecommission(3);
-  }
-
-  Void runTestUnderReplicatedDueToAllDecommission(int expectedReplication)
-      throws IOException {
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
-    addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
-    assertReplicaScheduled(expectedReplication);
-    assertUnderReplicatedCount(1);
-    return null;
-  }
-
-  @Test
-  public void testReplicationLimit() throws Exception {
-    runTestLimit(1, 0, 2, 0,
-        () -> runTestUnderReplicatedDueToAllDecommission(1));
-  }
-
-  void runTestLimit(int replicationLimit, int deletionLimit,
-      int expectedReplicationSkipped, int expectedDeletionSkipped,
-      Callable<Void> testcase) throws Exception {
-    createReplicationManager(replicationLimit, deletionLimit);
-
-    final ReplicationManagerMetrics metrics = replicationManager.getMetrics();
-    final long replicationSkipped = metrics.getInflightReplicationSkipped();
-    final long deletionSkipped = metrics.getInflightDeletionSkipped();
-
-    testcase.call();
-
-    Assertions.assertEquals(replicationSkipped + expectedReplicationSkipped,
-        metrics.getInflightReplicationSkipped());
-    Assertions.assertEquals(deletionSkipped + expectedDeletionSkipped,
-        metrics.getInflightDeletionSkipped());
-
-    //reset limits for other tests.
-    createReplicationManager(0, 0);
-  }
-
-  /**
-   * ReplicationManager should not take any action when the container is
-   * correctly replicated with decommissioned replicas still present.
-   */
-  @Test
-  public void testCorrectlyReplicatedWithDecommission() throws IOException {
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
-    assertReplicaScheduled(0);
-    assertUnderReplicatedCount(0);
-  }
-
-  /**
-   * ReplicationManager should replicate an additional replica when min rep
-   * is not met for maintenance.
-   */
-  @Test
-  public void testUnderReplicatedDueToMaintenance() throws IOException {
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
-    assertReplicaScheduled(1);
-    assertUnderReplicatedCount(1);
-  }
-
-  /**
-   * ReplicationManager should not replicate an additional replica when if
-   * min replica for maintenance is 1 and another replica is available.
-   */
-  @Test
-  public void testNotUnderReplicatedDueToMaintenanceMinRepOne()
-      throws Exception {
-    replicationManager.stop();
-    ReplicationManagerConfiguration newConf =
-        new ReplicationManagerConfiguration();
-    newConf.setMaintenanceReplicaMinimum(1);
-    dbStore.close();
-    createReplicationManager(newConf);
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
-    assertReplicaScheduled(0);
-    assertUnderReplicatedCount(0);
-  }
-
-  /**
-   * ReplicationManager should replicate an additional replica when all copies
-   * are going off line and min rep is 1.
-   */
-  @Test
-  public void testUnderReplicatedDueToMaintenanceMinRepOne()
-      throws Exception {
-    replicationManager.stop();
-    ReplicationManagerConfiguration newConf =
-        new ReplicationManagerConfiguration();
-    newConf.setMaintenanceReplicaMinimum(1);
-    dbStore.close();
-    createReplicationManager(newConf);
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
-    addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
-    assertReplicaScheduled(1);
-    assertUnderReplicatedCount(1);
-  }
-
-  /**
-   * ReplicationManager should replicate additional replica when all copies
-   * are going into maintenance.
-   */
-  @Test
-  public void testUnderReplicatedDueToAllMaintenance() throws IOException {
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
-    addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
-    assertReplicaScheduled(2);
-    assertUnderReplicatedCount(1);
-  }
-
-  /**
-   * ReplicationManager should not replicate additional replica sufficient
-   * replica are available.
-   */
-  @Test
-  public void testCorrectlyReplicatedWithMaintenance() throws IOException {
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
-    assertReplicaScheduled(0);
-    assertUnderReplicatedCount(0);
-  }
-
-  /**
-   * ReplicationManager should replicate additional replica when all copies
-   * are decommissioning or maintenance.
-   */
-  @Test
-  public void testUnderReplicatedWithDecommissionAndMaintenance()
-      throws IOException {
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
-    addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
-    assertReplicaScheduled(2);
-    assertUnderReplicatedCount(1);
-  }
-
-  /**
-   * ReplicationManager should replicate zero replica when all copies
-   * are missing.
-   */
-  @Test
-  public void testContainerWithMissingReplicas()
-      throws IOException {
-    createContainer(LifeCycleState.CLOSED);
-    assertReplicaScheduled(0);
-    assertUnderReplicatedCount(1);
-    assertMissingCount(1);
-  }
-  /**
-   * When a CLOSED container is over replicated, ReplicationManager
-   * deletes the excess replicas. While choosing the replica for deletion
-   * ReplicationManager should not attempt to remove a DECOMMISSION or
-   * MAINTENANCE replica.
-   */
-  @Test
-  public void testOverReplicatedClosedContainerWithDecomAndMaint()
-      throws IOException {
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
-    addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
-    addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
-    addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
-    addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
-    addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
-    addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
-
-    final int currentDeleteCommandCount = datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(currentDeleteCommandCount + 2,
-        datanodeCommandHandler
-            .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
-    Assertions.assertEquals(currentDeleteCommandCount + 2,
-        replicationManager.getMetrics().getNumDeletionCmdsSent());
-    Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
-    Assertions.assertEquals(1, replicationManager.getMetrics()
-        .getInflightDeletion());
-    // Get the DECOM and Maint replica and ensure none of them are scheduled
-    // for removal
-    Set<ContainerReplica> decom =
-        containerStateManager.getContainerReplicas(
-            container.containerID())
-        .stream()
-        .filter(r -> r.getDatanodeDetails().getPersistedOpState() != IN_SERVICE)
-        .collect(Collectors.toSet());
-    for (ContainerReplica r : decom) {
-      Assertions.assertFalse(datanodeCommandHandler.received(
-          SCMCommandProto.Type.deleteContainerCommand,
-          r.getDatanodeDetails()));
-    }
-    assertOverReplicatedCount(1);
-  }
-
-  /**
-   * Replication Manager should not attempt to replicate from an unhealthy
-   * (stale or dead) node. To test this, setup a scenario where a replia needs
-   * to be created, but mark all nodes stale. That way, no new replica will be
-   * scheduled.
-   */
-  @Test
-  public void testUnderReplicatedNotHealthySource() throws IOException {
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
-    addReplica(container, NodeStatus.inServiceStale(), CLOSED);
-    addReplica(container, new NodeStatus(DECOMMISSIONED, STALE), CLOSED);
-    addReplica(container, new NodeStatus(DECOMMISSIONED, STALE), CLOSED);
-    // There should be replica scheduled, but as all nodes are stale, nothing
-    // gets scheduled.
-    assertReplicaScheduled(0);
-    assertUnderReplicatedCount(1);
-  }
-
-  /**
-   * if all the prerequisites are satisfied, move should work as expected.
-   */
-  @Test
-  public void testMove() throws IOException, NodeNotFoundException,
-      InterruptedException, ExecutionException {
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
-    ContainerID id = container.containerID();
-    ContainerReplica dn1 = addReplica(container,
-        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container,
-        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container,
-        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
-    CompletableFuture<MoveResult> cf =
-        replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
-    Assertions.assertTrue(scmLogs.getOutput().contains(
-        "receive a move request about container"));
-    Thread.sleep(100L);
-    Assertions.assertTrue(datanodeCommandHandler.received(
-        SCMCommandProto.Type.replicateContainerCommand, dn3));
-    Assertions.assertEquals(1, datanodeCommandHandler.getInvocationCount(
-        SCMCommandProto.Type.replicateContainerCommand));
-
-    //replicate container to dn3
-    addReplicaToDn(container, dn3, CLOSED);
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-
-    Assertions.assertTrue(datanodeCommandHandler.received(
-        SCMCommandProto.Type.deleteContainerCommand, dn1.getDatanodeDetails()));
-    Assertions.assertEquals(1, datanodeCommandHandler.getInvocationCount(
-        SCMCommandProto.Type.deleteContainerCommand));
-    containerStateManager.removeContainerReplica(id, dn1);
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-
-    Assertions.assertTrue(cf.isDone() && cf.get() == MoveResult.COMPLETED);
-  }
-
-  /**
-   * if crash happened and restarted, move option should work as expected.
-   */
-  @Test
-  public void testMoveCrashAndRestart() throws IOException,
-      NodeNotFoundException, InterruptedException {
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
-    ContainerID id = container.containerID();
-    ContainerReplica dn1 = addReplica(container,
-        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container,
-        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container,
-        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
-    replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
-    Assertions.assertTrue(scmLogs.getOutput().contains(
-        "receive a move request about container"));
-    Thread.sleep(100L);
-    Assertions.assertTrue(datanodeCommandHandler.received(
-        SCMCommandProto.Type.replicateContainerCommand, dn3));
-    Assertions.assertEquals(1, datanodeCommandHandler.getInvocationCount(
-        SCMCommandProto.Type.replicateContainerCommand));
-
-    //crash happens, restart scm.
-    //clear current inflight actions and reload inflightMove from DBStore.
-    resetReplicationManager();
-    replicationManager.getMoveScheduler()
-        .reinitialize(SCMDBDefinition.MOVE.getTable(dbStore));
-    Assertions.assertTrue(replicationManager.getMoveScheduler()
-        .getInflightMove().containsKey(id));
-    MoveDataNodePair kv = replicationManager.getMoveScheduler()
-        .getInflightMove().get(id);
-    Assertions.assertEquals(kv.getSrc(), dn1.getDatanodeDetails());
-    Assertions.assertEquals(kv.getTgt(), dn3);
-    serviceManager.notifyStatusChanged();
-
-    Thread.sleep(100L);
-    // now, the container is not over-replicated,
-    // so no deleteContainerCommand will be sent
-    Assertions.assertFalse(datanodeCommandHandler.received(
-        SCMCommandProto.Type.deleteContainerCommand, dn1.getDatanodeDetails()));
-    //replica does not exist in target datanode, so a replicateContainerCommand
-    //will be sent again at notifyStatusChanged#onLeaderReadyAndOutOfSafeMode
-    Assertions.assertEquals(2, datanodeCommandHandler.getInvocationCount(
-        SCMCommandProto.Type.replicateContainerCommand));
-
-
-    //replicate container to dn3, now, over-replicated
-    addReplicaToDn(container, dn3, CLOSED);
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-
-    //deleteContainerCommand is sent, but the src replica is not deleted now
-    Assertions.assertEquals(1, datanodeCommandHandler.getInvocationCount(
-        SCMCommandProto.Type.deleteContainerCommand));
-
-    //crash happens, restart scm.
-    //clear current inflight actions and reload inflightMove from DBStore.
-    resetReplicationManager();
-    replicationManager.getMoveScheduler()
-        .reinitialize(SCMDBDefinition.MOVE.getTable(dbStore));
-    Assertions.assertTrue(replicationManager.getMoveScheduler()
-        .getInflightMove().containsKey(id));
-    kv = replicationManager.getMoveScheduler()
-        .getInflightMove().get(id);
-    Assertions.assertEquals(kv.getSrc(), dn1.getDatanodeDetails());
-    Assertions.assertEquals(kv.getTgt(), dn3);
-    serviceManager.notifyStatusChanged();
-
-    //after restart and the container is over-replicated now,
-    //deleteContainerCommand will be sent again
-    Assertions.assertEquals(2, datanodeCommandHandler.getInvocationCount(
-        SCMCommandProto.Type.deleteContainerCommand));
-    containerStateManager.removeContainerReplica(id, dn1);
-
-    //replica in src datanode is deleted now
-    containerStateManager.removeContainerReplica(id, dn1);
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-
-    //since the move is complete,so after scm crash and restart
-    //inflightMove should not contain the container again
-    resetReplicationManager();
-    replicationManager.getMoveScheduler()
-        .reinitialize(SCMDBDefinition.MOVE.getTable(dbStore));
-    Assertions.assertFalse(replicationManager.getMoveScheduler()
-        .getInflightMove().containsKey(id));
-
-    //completeableFuture is not stored in DB, so after scm crash and
-    //restart ,completeableFuture is missing
-  }
-
-  /**
-   * make sure RM does not delete replica if placement policy is not satisfied.
-   */
-  @Test
-  public void testMoveNotDeleteSrcIfPolicyNotSatisfied()
-      throws IOException, NodeNotFoundException,
-      InterruptedException, ExecutionException {
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
-    ContainerID id = container.containerID();
-    ContainerReplica dn1 = addReplica(container,
-        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    ContainerReplica dn2 = addReplica(container,
-        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container,
-        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    DatanodeDetails dn4 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
-    CompletableFuture<MoveResult> cf =
-        replicationManager.move(id, dn1.getDatanodeDetails(), dn4);
-    Assertions.assertTrue(scmLogs.getOutput().contains(
-        "receive a move request about container"));
-    Thread.sleep(100L);
-    Assertions.assertTrue(datanodeCommandHandler.received(
-        SCMCommandProto.Type.replicateContainerCommand, dn4));
-    Assertions.assertEquals(1, datanodeCommandHandler.getInvocationCount(
-        SCMCommandProto.Type.replicateContainerCommand));
-
-    //replicate container to dn4
-    addReplicaToDn(container, dn4, CLOSED);
-    //now, replication succeeds, but replica in dn2 lost,
-    //and there are only tree replicas totally, so rm should
-    //not delete the replica on dn1
-    containerStateManager.removeContainerReplica(id, dn2);
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-
-    Assertions.assertFalse(datanodeCommandHandler.received(
-        SCMCommandProto.Type.deleteContainerCommand, dn1.getDatanodeDetails()));
-
-    Assertions.assertTrue(cf.isDone() &&
-        cf.get() == MoveResult.DELETE_FAIL_POLICY);
-  }
-
-
-  /**
-   * test src and target datanode become unhealthy when moving.
-   */
-  @Test
-  public void testDnBecameUnhealthyWhenMoving() throws IOException,
-      NodeNotFoundException, InterruptedException, ExecutionException {
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
-    ContainerID id = container.containerID();
-    ContainerReplica dn1 = addReplica(container,
-        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container,
-        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container,
-        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
-    CompletableFuture<MoveResult> cf =
-        replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
-    Assertions.assertTrue(scmLogs.getOutput().contains(
-        "receive a move request about container"));
-
-    nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, STALE));
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-
-    Assertions.assertTrue(cf.isDone() && cf.get() ==
-        MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
-
-    nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
-    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
-    addReplicaToDn(container, dn3, CLOSED);
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    nodeManager.setNodeStatus(dn1.getDatanodeDetails(),
-        new NodeStatus(IN_SERVICE, STALE));
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-
-    Assertions.assertTrue(cf.isDone() && cf.get() ==
-        MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
-  }
-
-  /**
-   * before Replication Manager generates a completablefuture for a move option,
-   * some Prerequisites should be satisfied.
-   */
-  @Test
-  public void testMovePrerequisites() throws IOException, NodeNotFoundException,
-      InterruptedException, ExecutionException,
-      InvalidStateTransitionException {
-    //all conditions is met
-    final ContainerInfo container = createContainer(LifeCycleState.OPEN);
-    ContainerID id = container.containerID();
-    ContainerReplica dn1 = addReplica(container,
-        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    ContainerReplica dn2 = addReplica(container,
-        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
-    ContainerReplica dn4 = addReplica(container,
-        new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    
-    CompletableFuture<MoveResult> cf;
-    //the above move is executed successfully, so there may be some item in
-    //inflightReplication or inflightDeletion. here we stop replication manager
-    //to clear these states, which may impact the tests below.
-    //we don't need a running replicationManamger now
-    replicationManager.stop();
-    Thread.sleep(100L);
-    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
-    Assertions.assertTrue(cf.isDone() && cf.get() ==
-        MoveResult.FAIL_NOT_RUNNING);
-    replicationManager.start();
-    Thread.sleep(100L);
-
-    //container in not in OPEN state
-    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
-    Assertions.assertTrue(cf.isDone() && cf.get() ==
-        MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
-    //open -> closing
-    containerStateManager.updateContainerState(id.getProtobuf(),
-        LifeCycleEvent.FINALIZE);
-    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
-    Assertions.assertTrue(cf.isDone() && cf.get() ==
-        MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
-    //closing -> quasi_closed
-    containerStateManager.updateContainerState(id.getProtobuf(),
-        LifeCycleEvent.QUASI_CLOSE);
-    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
-    Assertions.assertTrue(cf.isDone() && cf.get() ==
-        MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
-
-    //quasi_closed -> closed
-    containerStateManager.updateContainerState(id.getProtobuf(),
-        LifeCycleEvent.FORCE_CLOSE);
-    Assertions.assertSame(LifeCycleState.CLOSED,
-        containerStateManager.getContainer(id).getState());
-
-    //Node is not in healthy state
-    for (HddsProtos.NodeState state : HddsProtos.NodeState.values()) {
-      if (state != HEALTHY) {
-        nodeManager.setNodeStatus(dn3,
-            new NodeStatus(IN_SERVICE, state));
-        cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
-        Assertions.assertTrue(cf.isDone() && cf.get() ==
-            MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
-        cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
-        Assertions.assertTrue(cf.isDone() && cf.get() ==
-            MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
-      }
-    }
-    nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
-
-    //Node is not in IN_SERVICE state
-    for (HddsProtos.NodeOperationalState state :
-        HddsProtos.NodeOperationalState.values()) {
-      if (state != IN_SERVICE) {
-        nodeManager.setNodeStatus(dn3,
-            new NodeStatus(state, HEALTHY));
-        cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
-        Assertions.assertTrue(cf.isDone() && cf.get() ==
-            MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
-        cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
-        Assertions.assertTrue(cf.isDone() && cf.get() ==
-            MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
-      }
-    }
-    nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
-
-    //container exists in target datanode
-    cf = replicationManager.move(id, dn1.getDatanodeDetails(),
-        dn2.getDatanodeDetails());
-    Assertions.assertTrue(cf.isDone() && cf.get() ==
-        MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
-
-    //container does not exist in source datanode
-    cf = replicationManager.move(id, dn3, dn3);
-    Assertions.assertTrue(cf.isDone() && cf.get() ==
-        MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
-
-    //make container over relplicated to test the
-    // case that container is in inflightDeletion
-    ContainerReplica dn5 = addReplica(container,
-        new NodeStatus(IN_SERVICE, HEALTHY), State.CLOSED);
-    replicationManager.processAll();
-    //waiting for inflightDeletion generation
-    eventQueue.processAll(1000);
-    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
-    Assertions.assertTrue(cf.isDone() && cf.get() ==
-        MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
-    resetReplicationManager();
-
-    //make the replica num be 2 to test the case
-    //that container is in inflightReplication
-    containerStateManager.removeContainerReplica(id, dn5);
-    containerStateManager.removeContainerReplica(id, dn4);
-    //replication manager should generate inflightReplication
-    replicationManager.processAll();
-    //waiting for inflightReplication generation
-    eventQueue.processAll(1000);
-    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
-    Assertions.assertTrue(cf.isDone() && cf.get() ==
-        MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
-  }
-
-  @Test
-  public void testReplicateCommandTimeout() throws IOException {
-    long timeout = new ReplicationManagerConfiguration().getEventTimeout();
-
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    assertReplicaScheduled(1);
-
-    // Already a pending replica, so nothing scheduled
-    assertReplicaScheduled(0);
-
-    // Advance the clock past the timeout, and there should be a replica
-    // scheduled
-    clock.fastForward(timeout + 1000);
-    assertReplicaScheduled(1);
-    Assertions.assertEquals(1, replicationManager.getMetrics()
-        .getNumReplicationCmdsTimeout());
-  }
-
-  @Test
-  public void testDeleteCommandTimeout() throws
-      IOException, InterruptedException {
-    long timeout = new ReplicationManagerConfiguration().getEventTimeout();
-
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    assertDeleteScheduled(1);
-
-    // Already a pending replica, so nothing scheduled
-    assertReplicaScheduled(0);
-
-    // Advance the clock past the timeout, and there should be a replica
-    // scheduled
-    clock.fastForward(timeout + 1000);
-    assertDeleteScheduled(1);
-    Assertions.assertEquals(1, replicationManager.getMetrics()
-        .getNumDeletionCmdsTimeout());
+    Assert.assertEquals(1, repReport.getStat(
+        ReplicationManagerReport.HealthState.MISSING));
   }
 
-  /**
-   * A closed empty container with all the replicas also closed and empty
-   * should be deleted.
-   * A container/ replica should be deemed empty when it has 0 keyCount even
-   * if the usedBytes is not 0 (usedBytes should not be used to determine if
-   * the container or replica is empty).
-   */
   @Test
-  public void testDeleteEmptyContainer() throws Exception {
-    runTestDeleteEmptyContainer(3);
-  }
-
-  Void runTestDeleteEmptyContainer(int expectedDelete) throws Exception {
-    // Create container with usedBytes = 1000 and keyCount = 0
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED, 1000,
-        0);
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    // Create a replica with usedBytes != 0 and keyCount = 0
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED, 100, 0);
-
-    assertDeleteScheduled(expectedDelete);
-    return null;
+  public void testUnderAndOverReplicated()
+      throws ContainerNotFoundException {
+    ContainerInfo container = createContainerInfo(repConfig, 1,
+        HddsProtos.LifeCycleState.CLOSED);
+    addReplicas(container, 1, 2, 3, 5, 5);
+
+    ContainerHealthResult result = replicationManager.processContainer(
+        container, underRep, overRep, repReport);
+    // If it is both under and over replicated, we set it to the most important
+    // state, which is under-replicated. When that is fixed, over replication
+    // will be handled.
+    Assert.assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED,
+        result.getHealthState());
+    Assert.assertEquals(1, underRep.size());
+    Assert.assertEquals(0, overRep.size());
+    Assert.assertEquals(1, repReport.getStat(
+        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+    Assert.assertEquals(0, repReport.getStat(
+        ReplicationManagerReport.HealthState.OVER_REPLICATED));
   }
 
   @Test
-  public void testDeletionLimit() throws Exception {
-    runTestLimit(0, 2, 0, 1,
-        () -> runTestDeleteEmptyContainer(2));
+  public void testOverReplicated() throws ContainerNotFoundException {
+    ContainerInfo container = createContainerInfo(repConfig, 1,
+        HddsProtos.LifeCycleState.CLOSED);
+    addReplicas(container, 1, 2, 3, 4, 5, 5);
+    ContainerHealthResult result = replicationManager.processContainer(
+        container, underRep, overRep, repReport);
+    Assert.assertEquals(ContainerHealthResult.HealthState.OVER_REPLICATED,
+        result.getHealthState());
+    Assert.assertEquals(0, underRep.size());
+    Assert.assertEquals(1, overRep.size());
+    Assert.assertEquals(1, repReport.getStat(
+        ReplicationManagerReport.HealthState.OVER_REPLICATED));
   }
 
-  /**
-   * A closed empty container with a non-empty replica should not be deleted.
-   */
   @Test
-  public void testDeleteEmptyContainerNonEmptyReplica() throws Exception {
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED, 0,
-        0);
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-    // Create the 3rd replica with non-zero key count and used bytes
-    addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED, 100, 1);
-    assertDeleteScheduled(0);
-  }
-
-  private ContainerInfo createContainer(LifeCycleState containerState)
-      throws IOException {
-    return createContainer(containerState, CONTAINER_USED_BYTES_DEFAULT,
-        CONTAINER_NUM_KEYS_DEFAULT);
-  }
-
-  private ContainerInfo createContainer(LifeCycleState containerState,
-      long usedBytes, long numKeys) throws IOException {
-    final ContainerInfo container = getContainer(containerState);
-    container.setUsedBytes(usedBytes);
-    container.setNumberOfKeys(numKeys);
-    containerStateManager.addContainer(container.getProtobuf());
-    return container;
-  }
-
-  private DatanodeDetails addNode(NodeStatus nodeStatus) {
-    DatanodeDetails dn = randomDatanodeDetails();
-    dn.setPersistedOpState(nodeStatus.getOperationalState());
-    dn.setPersistedOpStateExpiryEpochSec(
-        nodeStatus.getOpStateExpiryEpochSeconds());
-    nodeManager.register(dn, nodeStatus);
-    return dn;
-  }
-
-  private void resetReplicationManager() throws InterruptedException {
-    replicationManager.stop();
-    Thread.sleep(100L);
-    replicationManager.start();
-    Thread.sleep(100L);
-  }
-
-  private ContainerReplica addReplica(ContainerInfo container,
-      NodeStatus nodeStatus, State replicaState)
-      throws ContainerNotFoundException {
-    DatanodeDetails dn = addNode(nodeStatus);
-    return addReplicaToDn(container, dn, replicaState);
-  }
-
-  private ContainerReplica addReplica(ContainerInfo container,
-      NodeStatus nodeStatus, State replicaState, long usedBytes, long numOfKeys)
-      throws ContainerNotFoundException {
-    DatanodeDetails dn = addNode(nodeStatus);
-    return addReplicaToDn(container, dn, replicaState, usedBytes, numOfKeys);
-  }
-
-  private ContainerReplica addReplicaToDn(ContainerInfo container,
-                               DatanodeDetails dn, State replicaState)
-      throws ContainerNotFoundException {
-    // Using the same originID for all replica in the container set. If each
-    // replica has a unique originID, it causes problems in ReplicationManager
-    // when processing over-replicated containers.
-    final UUID originNodeId =
-        UUID.nameUUIDFromBytes(Longs.toByteArray(container.getContainerID()));
-    final ContainerReplica replica = getReplicas(container.containerID(),
-        replicaState, container.getUsedBytes(), container.getNumberOfKeys(),
-        1000L, originNodeId, dn);
-    containerStateManager
-        .updateContainerReplica(container.containerID(), replica);
-    return replica;
-  }
-
-  private ContainerReplica addReplicaToDn(ContainerInfo container,
-      DatanodeDetails dn, State replicaState, long usedBytes, long numOfKeys)
+  public void testOverReplicatedFixByPending()
       throws ContainerNotFoundException {
-    // Using the same originID for all replica in the container set. If each
-    // replica has a unique originID, it causes problems in ReplicationManager
-    // when processing over-replicated containers.
-    final UUID originNodeId =
-        UUID.nameUUIDFromBytes(Longs.toByteArray(container.getContainerID()));
-    final ContainerReplica replica = getReplicas(container.containerID(),
-        replicaState, usedBytes, numOfKeys, 1000L, originNodeId, dn);
-    containerStateManager
-        .updateContainerReplica(container.containerID(), replica);
-    return replica;
-  }
-
-  private void assertReplicaScheduled(int delta) {
-    final int currentReplicateCommandCount = datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(currentReplicateCommandCount + delta,
-        datanodeCommandHandler.getInvocationCount(
-            SCMCommandProto.Type.replicateContainerCommand));
-    Assertions.assertEquals(currentReplicateCommandCount + delta,
-        replicationManager.getMetrics().getNumReplicationCmdsSent());
-  }
-
-  private void assertDeleteScheduled(int delta) {
-    final int currentDeleteCommandCount = datanodeCommandHandler
-        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
-
-    replicationManager.processAll();
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(currentDeleteCommandCount + delta,
-        datanodeCommandHandler.getInvocationCount(
-            SCMCommandProto.Type.deleteContainerCommand));
-    Assertions.assertEquals(currentDeleteCommandCount + delta,
-        replicationManager.getMetrics().getNumDeletionCmdsSent());
-  }
-
-  private void assertUnderReplicatedCount(int count) {
-    ReplicationManagerReport report = replicationManager.getContainerReport();
-    Assertions.assertEquals(count, report.getStat(
-        ReplicationManagerReport.HealthState.UNDER_REPLICATED));
-  }
-
-  private void assertMissingCount(int count) {
-    ReplicationManagerReport report = replicationManager.getContainerReport();
-    Assertions.assertEquals(count, report.getStat(
-        ReplicationManagerReport.HealthState.MISSING));
-  }
-
-  private void assertOverReplicatedCount(int count) {
-    ReplicationManagerReport report = replicationManager.getContainerReport();
-    Assertions.assertEquals(count, report.getStat(
+    ContainerInfo container = createContainerInfo(repConfig, 1,
+        HddsProtos.LifeCycleState.CLOSED);
+    addReplicas(container, 1, 2, 3, 4, 5, 5);
+    containerReplicaPendingOps.scheduleDeleteReplica(container.containerID(),
+        MockDatanodeDetails.randomDatanodeDetails(), 5);
+    ContainerHealthResult result = replicationManager.processContainer(
+        container, underRep, overRep, repReport);
+    Assert.assertEquals(ContainerHealthResult.HealthState.OVER_REPLICATED,
+        result.getHealthState());
+    Assert.assertEquals(0, underRep.size());
+    // If the pending replication fixes the over-replication, nothing is added
+    // to the over replication list.
+    Assert.assertEquals(0, overRep.size());
+    Assert.assertEquals(1, repReport.getStat(
         ReplicationManagerReport.HealthState.OVER_REPLICATED));
   }
 
-  @AfterEach
-  public void teardown() throws Exception {
-    containerStateManager.close();
-    replicationManager.stop();
-    if (dbStore != null) {
-      dbStore.close();
-    }
-    FileUtils.deleteDirectory(testDir);
+  private Set<ContainerReplica> addReplicas(ContainerInfo container,
+      int... indexes) {
+    final Set<ContainerReplica> replicas =
+        createReplicas(container.containerID(), indexes);
+    containerReplicaMap.put(container.containerID(), replicas);
+    return replicas;
   }
 
-  private static class DatanodeCommandHandler implements
-      EventHandler<CommandForDatanode> {
-
-    private AtomicInteger invocation = new AtomicInteger(0);
-    private Map<SCMCommandProto.Type, AtomicInteger> commandInvocation =
-        new HashMap<>();
-    private List<CommandForDatanode> commands = new ArrayList<>();
-
-    @Override
-    public void onMessage(final CommandForDatanode command,
-                          final EventPublisher publisher) {
-      final SCMCommandProto.Type type = command.getCommand().getType();
-      commandInvocation.computeIfAbsent(type, k -> new AtomicInteger(0));
-      commandInvocation.get(type).incrementAndGet();
-      invocation.incrementAndGet();
-      commands.add(command);
-    }
-
-    private int getInvocation() {
-      return invocation.get();
-    }
-
-    private int getInvocationCount(SCMCommandProto.Type type) {
-      return commandInvocation.containsKey(type) ?
-          commandInvocation.get(type).get() : 0;
-    }
-
-    private List<CommandForDatanode> getReceivedCommands() {
-      return commands;
-    }
-
-    /**
-     * Returns true if the command handler has received the given
-     * command type for the provided datanode.
-     *
-     * @param type Command Type
-     * @param datanode DatanodeDetails
-     * @return True if command was received, false otherwise
-     */
-    private boolean received(final SCMCommandProto.Type type,
-                             final DatanodeDetails datanode) {
-      return commands.stream().anyMatch(dc ->
-          dc.getCommand().getType().equals(type) &&
-              dc.getDatanodeId().equals(datanode.getUuid()));
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org