You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2019/04/04 11:03:05 UTC

[hadoop] branch trunk updated: HDDS-1207. Refactor Container Report Processing logic and plugin new Replication Manager. (#662)

This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 48a58bc  HDDS-1207. Refactor Container Report Processing logic and plugin new Replication Manager. (#662)
48a58bc is described below

commit 48a58bce37dfddf37a4a6888228f7c7fc80bccdd
Author: Nanda kumar <na...@gmail.com>
AuthorDate: Thu Apr 4 16:32:59 2019 +0530

    HDDS-1207. Refactor Container Report Processing logic and plugin new Replication Manager. (#662)
---
 .../hadoop/hdds/scm/container/ContainerInfo.java   |   7 -
 .../hdds/scm/chillmode/ChillModeHandler.java       |  36 +-
 .../container/AbstractContainerReportHandler.java  | 236 +++++++++
 .../hdds/scm/container/ContainerReportHandler.java | 234 ++++-----
 .../hdds/scm/container/ContainerStateManager.java  |  19 +-
 .../IncrementalContainerReportHandler.java         |  47 +-
 .../hdds/scm/container/ReplicationManager.java     |   9 +
 .../hdds/scm/container/ReportHandlerHelper.java    | 365 -------------
 .../hdds/scm/container/SCMContainerManager.java    |  23 +-
 .../scm/container/states/ContainerStateMap.java    |   8 +-
 .../hadoop/hdds/scm/server/SCMConfigurator.java    |   2 +-
 .../hdds/scm/server/StorageContainerManager.java   |  37 +-
 .../java/org/apache/hadoop/hdds/scm/TestUtils.java |   1 +
 .../hdds/scm/chillmode/TestChillModeHandler.java   |  28 +-
 .../scm/container/TestContainerReportHandler.java  | 585 ++++++++-------------
 .../scm/container/TestContainerReportHelper.java   |  73 ---
 .../TestIncrementalContainerReportHandler.java     | 158 +++---
 .../replication/TestReplicationActivityStatus.java |  85 ---
 .../scm/server/TestSCMClientProtocolServer.java    |   8 +-
 19 files changed, 723 insertions(+), 1238 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
index 05d4e77..7b5c467 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
@@ -108,13 +108,6 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
     this.replicationType = repType;
   }
 
-  public ContainerInfo(ContainerInfo info) {
-    this(info.getContainerID(), info.getState(), info.getPipelineID(),
-        info.getUsedBytes(), info.getNumberOfKeys(),
-        info.getStateEnterTime(), info.getOwner(),
-        info.getDeleteTransactionId(), info.getSequenceId(),
-        info.getReplicationFactor(), info.getReplicationType());
-  }
   /**
    * Needed for serialization findbugs.
    */
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeHandler.java
index 95e0d93..fff1fb2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeHandler.java
@@ -20,8 +20,7 @@ package org.apache.hadoop.hdds.scm.chillmode;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
-import org.apache.hadoop.hdds.scm.container.replication.
-    ReplicationActivityStatus;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
 import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
 import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager.ChillModeStatus;
 import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -41,7 +40,7 @@ public class ChillModeHandler implements EventHandler<ChillModeStatus> {
   private final BlockManager scmBlockManager;
   private final long waitTime;
   private final AtomicBoolean isInChillMode = new AtomicBoolean(true);
-  private final ReplicationActivityStatus replicationActivityStatus;
+  private final ReplicationManager replicationManager;
 
 
   /**
@@ -49,27 +48,27 @@ public class ChillModeHandler implements EventHandler<ChillModeStatus> {
    * @param configuration
    * @param clientProtocolServer
    * @param blockManager
-   * @param replicationStatus
+   * @param replicationManager
    */
   public ChillModeHandler(Configuration configuration,
       SCMClientProtocolServer clientProtocolServer,
       BlockManager blockManager,
-      ReplicationActivityStatus replicationStatus) {
+      ReplicationManager replicationManager) {
     Objects.requireNonNull(configuration, "Configuration cannot be null");
     Objects.requireNonNull(clientProtocolServer, "SCMClientProtocolServer " +
         "object cannot be null");
     Objects.requireNonNull(blockManager, "BlockManager object cannot be null");
-    Objects.requireNonNull(replicationStatus, "ReplicationActivityStatus " +
+    Objects.requireNonNull(replicationManager, "ReplicationManager " +
         "object cannot be null");
     this.waitTime = configuration.getTimeDuration(
         HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT,
         HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT_DEFAULT,
         TimeUnit.MILLISECONDS);
-    scmClientProtocolServer = clientProtocolServer;
-    scmBlockManager = blockManager;
-    replicationActivityStatus = replicationStatus;
+    this.scmClientProtocolServer = clientProtocolServer;
+    this.scmBlockManager = blockManager;
+    this.replicationManager = replicationManager;
 
-    boolean chillModeEnabled = configuration.getBoolean(
+    final boolean chillModeEnabled = configuration.getBoolean(
         HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED,
         HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED_DEFAULT);
     isInChillMode.set(chillModeEnabled);
@@ -89,13 +88,16 @@ public class ChillModeHandler implements EventHandler<ChillModeStatus> {
   @Override
   public void onMessage(ChillModeStatus chillModeStatus,
       EventPublisher publisher) {
-    isInChillMode.set(chillModeStatus.getChillModeStatus());
-
-    replicationActivityStatus.fireReplicationStart(isInChillMode.get(),
-        waitTime);
-    scmClientProtocolServer.setChillModeStatus(isInChillMode.get());
-    scmBlockManager.setChillModeStatus(isInChillMode.get());
-
+    try {
+      isInChillMode.set(chillModeStatus.getChillModeStatus());
+      scmClientProtocolServer.setChillModeStatus(isInChillMode.get());
+      scmBlockManager.setChillModeStatus(isInChillMode.get());
+      Thread.sleep(waitTime);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } finally {
+      replicationManager.start();
+    }
   }
 
   public boolean getChillModeStatus() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
new file mode 100644
index 0000000..f660442
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.function.Supplier;
+
+/**
+ * Base class for all the container report handlers.
+ */
+public class AbstractContainerReportHandler {
+
+  private final ContainerManager containerManager;
+  private final Logger logger;
+
+  /**
+   * Constructs AbstractContainerReportHandler instance with the
+   * given ContainerManager instance.
+   *
+   * @param containerManager ContainerManager
+   * @param logger Logger to be used for logging
+   */
+  AbstractContainerReportHandler(final ContainerManager containerManager,
+                                 final Logger logger) {
+    Preconditions.checkNotNull(containerManager);
+    Preconditions.checkNotNull(logger);
+    this.containerManager = containerManager;
+    this.logger = logger;
+  }
+
+  /**
+   * Process the given ContainerReplica received from specified datanode.
+   *
+   * @param datanodeDetails DatanodeDetails of the node which reported
+   *                        this replica
+   * @param replicaProto ContainerReplica
+   *
+   * @throws IOException In case of any Exception while processing the report
+   */
+  void processContainerReplica(final DatanodeDetails datanodeDetails,
+                               final ContainerReplicaProto replicaProto)
+      throws IOException {
+    final ContainerID containerId = ContainerID
+        .valueof(replicaProto.getContainerID());
+    final ContainerReplica replica = ContainerReplica.newBuilder()
+        .setContainerID(containerId)
+        .setContainerState(replicaProto.getState())
+        .setDatanodeDetails(datanodeDetails)
+        .setOriginNodeId(UUID.fromString(replicaProto.getOriginNodeId()))
+        .setSequenceId(replicaProto.getBlockCommitSequenceId())
+        .build();
+
+    logger.debug("Processing replica of container {} from datanode {}",
+        containerId, datanodeDetails);
+    // Synchronized block should be replaced by container lock,
+    // once we have introduced lock inside ContainerInfo.
+    synchronized (containerManager.getContainer(containerId)) {
+      updateContainerStats(containerId, replicaProto);
+      updateContainerState(datanodeDetails, containerId, replica);
+      containerManager.updateContainerReplica(containerId, replica);
+    }
+  }
+
+  /**
+   * Update the container stats if it's lagging behind the stats in reported
+   * replica.
+   *
+   * @param containerId ID of the container
+   * @param replicaProto Container Replica information
+   * @throws ContainerNotFoundException If the container is not present
+   */
+  private void updateContainerStats(final ContainerID containerId,
+                                    final ContainerReplicaProto replicaProto)
+      throws ContainerNotFoundException {
+
+    if (!isUnhealthy(replicaProto::getState)) {
+      final ContainerInfo containerInfo = containerManager
+          .getContainer(containerId);
+
+      if (containerInfo.getSequenceId() <
+          replicaProto.getBlockCommitSequenceId()) {
+        containerInfo.updateSequenceId(
+            replicaProto.getBlockCommitSequenceId());
+      }
+      if (containerInfo.getUsedBytes() < replicaProto.getUsed()) {
+        containerInfo.setUsedBytes(replicaProto.getUsed());
+      }
+      if (containerInfo.getNumberOfKeys() < replicaProto.getKeyCount()) {
+        containerInfo.setNumberOfKeys(replicaProto.getKeyCount());
+      }
+    }
+  }
+
+  /**
+   * Updates the container state based on the given replica state.
+   *
+   * @param datanode Datanode from which the report is received
+   * @param containerId ID of the container
+   * @param replica ContainerReplica
+   * @throws IOException In case of Exception
+   */
+  private void updateContainerState(final DatanodeDetails datanode,
+                                    final ContainerID containerId,
+                                    final ContainerReplica replica)
+      throws IOException {
+
+    final ContainerInfo container = containerManager
+        .getContainer(containerId);
+
+    switch (container.getState()) {
+    case OPEN:
+      /*
+       * If the state of a container is OPEN, datanodes cannot report
+       * any other state.
+       */
+      if (replica.getState() != State.OPEN) {
+        logger.warn("Container {} is in OPEN state, but the datanode {} " +
+            "reports an {} replica.", containerId,
+            datanode, replica.getState());
+        // Should we take some action?
+      }
+      break;
+    case CLOSING:
+      /*
+       * When the container is in CLOSING state the replicas can be in any
+       * of the following states:
+       *
+       * - OPEN
+       * - CLOSING
+       * - QUASI_CLOSED
+       * - CLOSED
+       *
+       * If all the replica are either in OPEN or CLOSING state, do nothing.
+       *
+       * If the replica is in QUASI_CLOSED state, move the container to
+       * QUASI_CLOSED state.
+       *
+       * If the replica is in CLOSED state, mark the container as CLOSED.
+       *
+       */
+
+      if (replica.getState() == State.QUASI_CLOSED) {
+        logger.info("Moving container {} to QUASI_CLOSED state, datanode {} " +
+                "reported QUASI_CLOSED replica.", containerId, datanode);
+        containerManager.updateContainerState(containerId,
+            LifeCycleEvent.QUASI_CLOSE);
+      }
+
+      if (replica.getState() == State.CLOSED) {
+        logger.info("Moving container {} to CLOSED state, datanode {} " +
+            "reported CLOSED replica.", containerId, datanode);
+        Preconditions.checkArgument(replica.getSequenceId()
+            == container.getSequenceId());
+        containerManager.updateContainerState(containerId,
+            LifeCycleEvent.CLOSE);
+      }
+
+      break;
+    case QUASI_CLOSED:
+      /*
+       * The container is in QUASI_CLOSED state, this means that at least
+       * one of the replica was QUASI_CLOSED.
+       *
+       * Now replicas can be in any of the following state.
+       *
+       * 1. OPEN
+       * 2. CLOSING
+       * 3. QUASI_CLOSED
+       * 4. CLOSED
+       *
+       * If at least one of the replica is in CLOSED state, mark the
+       * container as CLOSED.
+       *
+       */
+      if (replica.getState() == State.CLOSED) {
+        logger.info("Moving container {} to CLOSED state, datanode {} " +
+            "reported CLOSED replica.", containerId, datanode);
+        Preconditions.checkArgument(replica.getSequenceId()
+            == container.getSequenceId());
+        containerManager.updateContainerState(containerId,
+            LifeCycleEvent.FORCE_CLOSE);
+      }
+      break;
+    case CLOSED:
+      /*
+       * The container is already in closed state. do nothing.
+       */
+      break;
+    case DELETING:
+      throw new UnsupportedOperationException(
+          "Unsupported container state 'DELETING'.");
+    case DELETED:
+      throw new UnsupportedOperationException(
+          "Unsupported container state 'DELETED'.");
+    default:
+      break;
+    }
+  }
+
+  /**
+   * Returns true if the container replica is not marked as UNHEALTHY.
+   *
+   * @param replicaState State of the container replica.
+   * @return true if unhealthy, false otherwise
+   */
+  private boolean isUnhealthy(final Supplier<State> replicaState) {
+    return replicaState.get() == ContainerReplicaProto.State.UNHEALTHY;
+  }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
index 4500786..934b244 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
@@ -15,13 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdds.scm.container;
 
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
+package org.apache.hadoop.hdds.scm.container;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
@@ -29,115 +24,85 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
-import org.apache.hadoop.hdds.scm.container.replication
-    .ReplicationActivityStatus;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.server
-    .SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .ContainerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-
-import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 /**
  * Handles container reports from datanode.
  */
-public class ContainerReportHandler implements
-    EventHandler<ContainerReportFromDatanode> {
+public class ContainerReportHandler extends AbstractContainerReportHandler
+    implements EventHandler<ContainerReportFromDatanode> {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(ContainerReportHandler.class);
 
   private final NodeManager nodeManager;
-  private final PipelineManager pipelineManager;
   private final ContainerManager containerManager;
-  private final ReplicationActivityStatus replicationStatus;
 
+  /**
+   * Constructs ContainerReportHandler instance with the
+   * given NodeManager and ContainerManager instance.
+   *
+   * @param nodeManager NodeManager instance
+   * @param containerManager ContainerManager instance
+   */
   public ContainerReportHandler(final NodeManager nodeManager,
-      final PipelineManager pipelineManager,
-      final ContainerManager containerManager,
-      final ReplicationActivityStatus replicationActivityStatus) {
-    Preconditions.checkNotNull(nodeManager);
-    Preconditions.checkNotNull(pipelineManager);
-    Preconditions.checkNotNull(containerManager);
-    Preconditions.checkNotNull(replicationActivityStatus);
+                                final ContainerManager containerManager) {
+    super(containerManager, LOG);
     this.nodeManager = nodeManager;
-    this.pipelineManager = pipelineManager;
     this.containerManager = containerManager;
-    this.replicationStatus = replicationActivityStatus;
   }
 
+  /**
+   * Process the container reports from datanodes.
+   *
+   * @param reportFromDatanode Container Report
+   * @param publisher EventPublisher reference
+   */
   @Override
   public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
-      final EventPublisher publisher) {
+                        final EventPublisher publisher) {
 
     final DatanodeDetails datanodeDetails =
         reportFromDatanode.getDatanodeDetails();
-
     final ContainerReportsProto containerReport =
         reportFromDatanode.getReport();
 
     try {
+      final List<ContainerReplicaProto> replicas =
+          containerReport.getReportsList();
+      final Set<ContainerID> containersInSCM =
+          nodeManager.getContainers(datanodeDetails);
 
-      final List<ContainerReplicaProto> replicas = containerReport
-          .getReportsList();
-
-      // ContainerIDs which SCM expects this datanode to have.
-      final Set<ContainerID> expectedContainerIDs = nodeManager
-          .getContainers(datanodeDetails);
-
-      // ContainerIDs that this datanode actually has.
-      final Set<ContainerID> actualContainerIDs = replicas.parallelStream()
+      final Set<ContainerID> containersInDn = replicas.parallelStream()
           .map(ContainerReplicaProto::getContainerID)
           .map(ContainerID::valueof).collect(Collectors.toSet());
 
-      // Container replicas which SCM is not aware of.
-      final  Set<ContainerID> newReplicas =
-          new HashSet<>(actualContainerIDs);
-      newReplicas.removeAll(expectedContainerIDs);
-
-      // Container replicas which are missing from datanode.
-      final Set<ContainerID> missingReplicas =
-          new HashSet<>(expectedContainerIDs);
-      missingReplicas.removeAll(actualContainerIDs);
-
-      processContainerReplicas(datanodeDetails, replicas, publisher);
-
-      // Remove missing replica from ContainerManager
-      for (ContainerID id : missingReplicas) {
-        try {
-          containerManager.getContainerReplicas(id)
-              .stream()
-              .filter(replica ->
-                  replica.getDatanodeDetails().equals(datanodeDetails))
-              .findFirst()
-              .ifPresent(replica -> {
-                try {
-                  containerManager.removeContainerReplica(id, replica);
-                } catch (ContainerNotFoundException |
-                    ContainerReplicaNotFoundException e) {
-                  // This should not happen, but even if it happens, not an
-                  // issue
-                }
-              });
-        } catch (ContainerNotFoundException e) {
-          LOG.warn("Cannot remove container replica, container {} not found {}",
-              id, e);
-        }
-      }
+      final Set<ContainerID> missingReplicas = new HashSet<>(containersInSCM);
+      missingReplicas.removeAll(containersInDn);
 
-      // Update the latest set of containers for this datanode in NodeManager.
-      nodeManager.setContainers(datanodeDetails, actualContainerIDs);
+      processContainerReplicas(datanodeDetails, replicas);
+      processMissingReplicas(datanodeDetails, missingReplicas);
+      updateDeleteTransaction(datanodeDetails, replicas, publisher);
 
-      // Replicate if needed.
-      newReplicas.forEach(id -> checkReplicationState(id, publisher));
-      missingReplicas.forEach(id -> checkReplicationState(id, publisher));
+      /*
+       * Update the latest set of containers for this datanode in
+       * NodeManager
+       */
+      nodeManager.setContainers(datanodeDetails, containersInDn);
 
     } catch (NodeNotFoundException ex) {
       LOG.error("Received container report from unknown datanode {} {}",
@@ -146,68 +111,89 @@ public class ContainerReportHandler implements
 
   }
 
+  /**
+   * Processes the ContainerReport.
+   *
+   * @param datanodeDetails Datanode from which this report was received
+   * @param replicas list of ContainerReplicaProto
+   */
   private void processContainerReplicas(final DatanodeDetails datanodeDetails,
-      final List<ContainerReplicaProto> replicas,
-      final EventPublisher publisher) {
-    final PendingDeleteStatusList pendingDeleteStatusList =
-        new PendingDeleteStatusList(datanodeDetails);
+      final List<ContainerReplicaProto> replicas) {
     for (ContainerReplicaProto replicaProto : replicas) {
       try {
-        final ContainerID containerID = ContainerID.valueof(
-            replicaProto.getContainerID());
-
-        ReportHandlerHelper.processContainerReplica(containerManager,
-            containerID, replicaProto, datanodeDetails, publisher, LOG);
-
-        final ContainerInfo containerInfo = containerManager
-            .getContainer(containerID);
-
-        if (containerInfo.getDeleteTransactionId() >
-            replicaProto.getDeleteTransactionId()) {
-          pendingDeleteStatusList
-              .addPendingDeleteStatus(replicaProto.getDeleteTransactionId(),
-                  containerInfo.getDeleteTransactionId(),
-                  containerInfo.getContainerID());
-        }
+        processContainerReplica(datanodeDetails, replicaProto);
       } catch (ContainerNotFoundException e) {
-        LOG.error("Received container report for an unknown container {} from"
-                + " datanode {} {}", replicaProto.getContainerID(),
+        LOG.error("Received container report for an unknown container" +
+                " {} from datanode {}.", replicaProto.getContainerID(),
             datanodeDetails, e);
       } catch (IOException e) {
-        LOG.error("Exception while processing container report for container"
-                + " {} from datanode {} {}", replicaProto.getContainerID(),
+        LOG.error("Exception while processing container report for container" +
+                " {} from datanode {}.", replicaProto.getContainerID(),
             datanodeDetails, e);
       }
     }
-    if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
-      publisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS,
-          pendingDeleteStatusList);
-    }
   }
 
-  private void checkReplicationState(ContainerID containerID,
-      EventPublisher publisher) {
-    try {
-      ContainerInfo container = containerManager.getContainer(containerID);
-      replicateIfNeeded(container, publisher);
-    } catch (ContainerNotFoundException ex) {
-      LOG.warn("Container is missing from containerStateManager. Can't request "
-          + "replication. {} {}", containerID, ex);
+  /**
+   * Process the missing replica on the given datanode.
+   *
+   * @param datanodeDetails DatanodeDetails
+   * @param missingReplicas ContainerID which are missing on the given datanode
+   */
+  private void processMissingReplicas(final DatanodeDetails datanodeDetails,
+                                      final Set<ContainerID> missingReplicas) {
+    for (ContainerID id : missingReplicas) {
+      try {
+        containerManager.getContainerReplicas(id).stream()
+            .filter(replica -> replica.getDatanodeDetails()
+                .equals(datanodeDetails)).findFirst()
+            .ifPresent(replica -> {
+              try {
+                containerManager.removeContainerReplica(id, replica);
+              } catch (ContainerNotFoundException |
+                  ContainerReplicaNotFoundException ignored) {
+                // This should not happen, but even if it happens, not an issue
+              }
+            });
+      } catch (ContainerNotFoundException e) {
+        LOG.warn("Cannot remove container replica, container {} not found.",
+            id, e);
+      }
     }
-
   }
 
-  private void replicateIfNeeded(ContainerInfo container,
-      EventPublisher publisher) throws ContainerNotFoundException {
-    if (!container.isOpen() && replicationStatus.isReplicationEnabled()) {
-      final int existingReplicas = containerManager
-          .getContainerReplicas(container.containerID()).size();
-      final int expectedReplicas = container.getReplicationFactor().getNumber();
-      if (existingReplicas != expectedReplicas) {
-        publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
-            new ReplicationRequest(container.getContainerID(),
-                existingReplicas, expectedReplicas));
+  /**
+   * Updates the Delete Transaction Id for the given datanode.
+   *
+   * @param datanodeDetails DatanodeDetails
+   * @param replicas List of ContainerReplicaProto
+   * @param publisher EventPublisher reference
+   */
+  private void updateDeleteTransaction(final DatanodeDetails datanodeDetails,
+      final List<ContainerReplicaProto> replicas,
+      final EventPublisher publisher) {
+    final PendingDeleteStatusList pendingDeleteStatusList =
+        new PendingDeleteStatusList(datanodeDetails);
+    for (ContainerReplicaProto replica : replicas) {
+      try {
+        final ContainerInfo containerInfo = containerManager.getContainer(
+            ContainerID.valueof(replica.getContainerID()));
+        if (containerInfo.getDeleteTransactionId() >
+            replica.getDeleteTransactionId()) {
+          pendingDeleteStatusList.addPendingDeleteStatus(
+              replica.getDeleteTransactionId(),
+              containerInfo.getDeleteTransactionId(),
+              containerInfo.getContainerID());
+        }
+      } catch (ContainerNotFoundException cnfe) {
+        LOG.warn("Cannot update pending delete transaction for " +
+            "container #{}. Reason: container missing.",
+            replica.getContainerID());
       }
     }
+    if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
+      publisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS,
+          pendingDeleteStatusList);
+    }
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 4af8678..a37bf33 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -310,20 +310,19 @@ public class ContainerStateManager {
    *
    * @param containerID - ContainerID
    * @param event - LifeCycle Event
-   * @return Updated ContainerInfo.
    * @throws SCMException  on Failure.
    */
-  ContainerInfo updateContainerState(final ContainerID containerID,
+  void updateContainerState(final ContainerID containerID,
       final HddsProtos.LifeCycleEvent event)
       throws SCMException, ContainerNotFoundException {
     final ContainerInfo info = containers.getContainerInfo(containerID);
     try {
+      final LifeCycleState oldState = info.getState();
       final LifeCycleState newState = stateMachine.getNextState(
           info.getState(), event);
       containers.updateState(containerID, info.getState(), newState);
       containerStateCount.incrementAndGet(newState);
-      containerStateCount.decrementAndGet(info.getState());
-      return containers.getContainerInfo(containerID);
+      containerStateCount.decrementAndGet(oldState);
     } catch (InvalidStateTransitionException ex) {
       String error = String.format("Failed to update container state %s, " +
               "reason: invalid state transition from state: %s upon " +
@@ -335,18 +334,6 @@ public class ContainerStateManager {
   }
 
   /**
-   * Update the container State.
-   * @param info - Container Info
-   * @return  ContainerInfo
-   * @throws SCMException - on Error.
-   */
-  ContainerInfo updateContainerInfo(final ContainerInfo info)
-      throws ContainerNotFoundException {
-    containers.updateContainerInfo(info);
-    return containers.getContainerInfo(info.containerID());
-  }
-
-  /**
    * Update deleteTransactionId for a container.
    *
    * @param deleteTransactionMap maps containerId to its new
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
index d70edfb..042fd56 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
@@ -18,55 +18,40 @@
 
 package org.apache.hadoop.hdds.scm.container;
 
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.server
-    .SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos
+    .ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .IncrementalContainerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 /**
  * Handles incremental container reports from datanode.
  */
-public class IncrementalContainerReportHandler implements
-    EventHandler<IncrementalContainerReportFromDatanode> {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(IncrementalContainerReportHandler.class);
+public class IncrementalContainerReportHandler extends
+    AbstractContainerReportHandler
+    implements EventHandler<IncrementalContainerReportFromDatanode> {
 
-  private final PipelineManager pipelineManager;
-  private final ContainerManager containerManager;
+  private static final Logger LOG = LoggerFactory.getLogger(
+      IncrementalContainerReportHandler.class);
 
   public IncrementalContainerReportHandler(
-      final PipelineManager pipelineManager,
       final ContainerManager containerManager)  {
-    Preconditions.checkNotNull(pipelineManager);
-    Preconditions.checkNotNull(containerManager);
-    this.pipelineManager = pipelineManager;
-    this.containerManager = containerManager;
+    super(containerManager, LOG);
   }
 
   @Override
-  public void onMessage(
-      final IncrementalContainerReportFromDatanode containerReportFromDatanode,
-      final EventPublisher publisher) {
+  public void onMessage(final IncrementalContainerReportFromDatanode report,
+                        final EventPublisher publisher) {
 
     for (ContainerReplicaProto replicaProto :
-         containerReportFromDatanode.getReport().getReportList()) {
+        report.getReport().getReportList()) {
       try {
-        final DatanodeDetails datanodeDetails = containerReportFromDatanode
-            .getDatanodeDetails();
-        final ContainerID containerID = ContainerID
-            .valueof(replicaProto.getContainerID());
-        ReportHandlerHelper.processContainerReplica(containerManager,
-            containerID, replicaProto, datanodeDetails, publisher, LOG);
+        processContainerReplica(report.getDatanodeDetails(), replicaProto);
       } catch (ContainerNotFoundException e) {
         LOG.warn("Container {} not found!", replicaProto.getContainerID());
       } catch (IOException e) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index 97c600b..1dce81b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -170,6 +170,15 @@ public class ReplicationManager {
   }
 
   /**
+   * Returns true if the Replication Monitor Thread is running.
+   *
+   * @return true if running, false otherwise
+   */
+  public boolean isRunning() {
+    return replicationMonitor.isAlive();
+  }
+
+  /**
    * Process all the containers immediately.
    */
   @VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java
deleted file mode 100644
index c566ca9..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
-
-/**
- * Helper functions to handler container reports.
- */
-public final class ReportHandlerHelper {
-
-  private ReportHandlerHelper() {}
-
-  /**
-   * Processes the container replica and updates the container state in SCM.
-   * If needed, sends command to datanode to update the replica state.
-   *
-   * @param containerManager ContainerManager instance
-   * @param containerId Id of the container
-   * @param replicaProto replica of the container
-   * @param datanodeDetails datanode where the replica resides
-   * @param publisher event publisher
-   * @param logger for logging
-   * @throws IOException
-   */
-  static void processContainerReplica(final ContainerManager containerManager,
-      final ContainerID containerId, final ContainerReplicaProto replicaProto,
-      final DatanodeDetails datanodeDetails, final EventPublisher publisher,
-      final Logger logger) throws IOException {
-
-    final ContainerReplica replica = ContainerReplica.newBuilder()
-        .setContainerID(containerId)
-        .setContainerState(replicaProto.getState())
-        .setDatanodeDetails(datanodeDetails)
-        .setOriginNodeId(UUID.fromString(replicaProto.getOriginNodeId()))
-        .setSequenceId(replicaProto.getBlockCommitSequenceId())
-        .build();
-
-    // This is an in-memory update.
-    containerManager.updateContainerReplica(containerId, replica);
-    ReportHandlerHelper.reconcileContainerState(containerManager,
-        containerId, publisher, logger);
-
-    final ContainerInfo containerInfo = containerManager
-        .getContainer(containerId);
-    if (containerInfo.getUsedBytes() < replicaProto.getUsed()) {
-      containerInfo.setUsedBytes(replicaProto.getUsed());
-    }
-
-    if (containerInfo.getNumberOfKeys() < replicaProto.getKeyCount()) {
-      containerInfo.setNumberOfKeys(replicaProto.getKeyCount());
-    }
-
-    // Now we have reconciled the container state. If the container state and
-    // the replica state doesn't match, then take appropriate action.
-    ReportHandlerHelper.sendReplicaCommands(
-        datanodeDetails, containerInfo, replica, publisher, logger);
-  }
-
-
-  /**
-   * Reconcile the container state based on the ContainerReplica states.
-   * ContainerState is updated after the reconciliation.
-   *
-   * @param manager ContainerManager
-   * @param containerId container id
-   * @throws ContainerNotFoundException
-   */
-  private static void reconcileContainerState(final ContainerManager manager,
-      final ContainerID containerId, final EventPublisher publisher,
-      final Logger logger) throws IOException {
-    // TODO: handle unhealthy replica.
-    synchronized (manager.getContainer(containerId)) {
-      final ContainerInfo container = manager.getContainer(containerId);
-      final Set<ContainerReplica> replicas = manager.getContainerReplicas(
-          containerId);
-      final LifeCycleState containerState = container.getState();
-      switch (containerState) {
-      case OPEN:
-        /*
-         * If the container state is OPEN.
-         * None of the replica should be in any other state.
-         *
-         */
-        List<ContainerReplica> invalidReplicas = replicas.stream()
-            .filter(replica -> replica.getState() != State.OPEN)
-            .collect(Collectors.toList());
-        if (!invalidReplicas.isEmpty()) {
-          logger.warn("Container {} has invalid replica state." +
-              "Invalid Replicas: {}", containerId, invalidReplicas);
-        }
-        // A container cannot be over replicated when in OPEN state.
-        break;
-      case CLOSING:
-        /*
-         * SCM has asked DataNodes to close the container. Now the replicas
-         * can be in any of the following states.
-         *
-         * 1. OPEN
-         * 2. CLOSING
-         * 3. QUASI_CLOSED
-         * 4. CLOSED
-         *
-         * If all the replica are either in OPEN or CLOSING state, do nothing.
-         *
-         * If any one of the replica is in QUASI_CLOSED state, move the
-         * container to QUASI_CLOSED state.
-         *
-         * If any one of the replica is in CLOSED state, mark the container as
-         * CLOSED. The close has happened via Ratis.
-         *
-         */
-        Optional<ContainerReplica> closedReplica = replicas.stream()
-            .filter(replica -> replica.getState() == State.CLOSED)
-            .findFirst();
-        if (closedReplica.isPresent()) {
-          container.updateSequenceId(closedReplica.get().getSequenceId());
-          manager.updateContainerState(
-              containerId, HddsProtos.LifeCycleEvent.CLOSE);
-
-          // TODO: remove container from OPEN pipeline, since the container is
-          // closed we can go ahead and remove it from Ratis pipeline.
-        } else if (replicas.stream()
-            .anyMatch(replica -> replica.getState() == State.QUASI_CLOSED)) {
-          manager.updateContainerState(
-              containerId, HddsProtos.LifeCycleEvent.QUASI_CLOSE);
-        }
-        break;
-      case QUASI_CLOSED:
-        /*
-         * The container is in QUASI_CLOSED state, this means that at least
-         * one of the replica is in QUASI_CLOSED/CLOSED state.
-         * Other replicas can be in any of the following state.
-         *
-         * 1. OPEN
-         * 2. CLOSING
-         * 3. QUASI_CLOSED
-         * 4. CLOSED
-         *
-         * If <50% of container replicas are in QUASI_CLOSED state and all
-         * the other replica are either in OPEN or CLOSING state, do nothing.
-         * We cannot identify the correct replica since we don't have quorum
-         * yet.
-         *
-         * If >50% (quorum) of replicas are in QUASI_CLOSED state and other
-         * replicas are either in OPEN or CLOSING state, try to identify
-         * the latest container replica using originNodeId and sequenceId.
-         * Force close those replica(s) which have the latest sequenceId.
-         *
-         * If at least one of the replica is in CLOSED state, mark the
-         * container as CLOSED. Force close the replicas which matches the
-         * sequenceId of the CLOSED replica.
-         *
-         */
-        if (replicas.stream()
-            .anyMatch(replica -> replica.getState() == State.CLOSED)) {
-          manager.updateContainerState(
-              containerId, HddsProtos.LifeCycleEvent.FORCE_CLOSE);
-          // TODO: remove container from OPEN pipeline, since the container is
-          // closed we can go ahead and remove it from Ratis pipeline.
-        } else {
-          final int replicationFactor = container
-              .getReplicationFactor().getNumber();
-          final List<ContainerReplica> quasiClosedReplicas = replicas.stream()
-              .filter(replica -> replica.getState() == State.QUASI_CLOSED)
-              .collect(Collectors.toList());
-          final long uniqueQuasiClosedReplicaCount = quasiClosedReplicas
-              .stream()
-              .map(ContainerReplica::getOriginDatanodeId)
-              .distinct()
-              .count();
-
-          if (uniqueQuasiClosedReplicaCount > (replicationFactor / 2)) {
-            // Quorum of unique replica has been QUASI_CLOSED
-            long sequenceId = forceCloseContainerReplicaWithHighestSequenceId(
-                container, quasiClosedReplicas, publisher);
-            if (sequenceId != -1L) {
-              container.updateSequenceId(sequenceId);
-            }
-          }
-        }
-        break;
-      case CLOSED:
-        /*
-         * The container is already in closed state. do nothing.
-         */
-        break;
-      case DELETING:
-        // Not handled.
-        throw new UnsupportedOperationException("Unsupported container state" +
-            " 'DELETING'.");
-      case DELETED:
-        // Not handled.
-        throw new UnsupportedOperationException("Unsupported container state" +
-            " 'DELETED'.");
-      default:
-        break;
-      }
-    }
-  }
-
-  /**
-   * Compares the QUASI_CLOSED replicas of a container and sends close command.
-   *
-   * @param quasiClosedReplicas list of quasi closed replicas
-   * @return the sequenceId of the closed replica.
-   */
-  private static long forceCloseContainerReplicaWithHighestSequenceId(
-      final ContainerInfo container,
-      final List<ContainerReplica> quasiClosedReplicas,
-      final EventPublisher publisher) {
-
-    final long highestSequenceId = quasiClosedReplicas.stream()
-        .map(ContainerReplica::getSequenceId)
-        .max(Long::compare)
-        .orElse(-1L);
-
-    if (highestSequenceId != -1L) {
-      quasiClosedReplicas.stream()
-          .filter(replica -> replica.getSequenceId() == highestSequenceId)
-          .forEach(replica -> {
-            CloseContainerCommand closeContainerCommand =
-                new CloseContainerCommand(container.getContainerID(),
-                    container.getPipelineID(), true);
-            publisher.fireEvent(DATANODE_COMMAND,
-                new CommandForDatanode<>(
-                    replica.getDatanodeDetails().getUuid(),
-                    closeContainerCommand));
-          });
-    }
-    return highestSequenceId;
-  }
-
-  /**
-   * Based on the container and replica state, send command to datanode if
-   * required.
-   *
-   * @param datanodeDetails datanode where the replica resides
-   * @param containerInfo container information
-   * @param replica replica information
-   * @param publisher queue to publish the datanode command event
-   * @param log for logging
-   */
-  static void sendReplicaCommands(
-      final DatanodeDetails datanodeDetails,
-      final ContainerInfo containerInfo,
-      final ContainerReplica replica,
-      final EventPublisher publisher,
-      final Logger log) {
-    final HddsProtos.LifeCycleState containerState = containerInfo.getState();
-    final ContainerReplicaProto.State replicaState = replica.getState();
-
-    if(!ReportHandlerHelper.compareState(containerState, replicaState)) {
-      if (containerState == HddsProtos.LifeCycleState.OPEN) {
-        // When a container state in SCM is OPEN, there is no way a datanode
-        // can quasi close/close the container.
-        log.warn("Invalid container replica state for container {}" +
-                " from datanode {}. Expected state is OPEN.",
-            containerInfo.containerID(), datanodeDetails);
-        // The replica can go CORRUPT, we have to handle it.
-      }
-      if (containerState == HddsProtos.LifeCycleState.CLOSING ||
-          containerState == HddsProtos.LifeCycleState.QUASI_CLOSED) {
-        // Resend close container event for this datanode if the container
-        // replica state is OPEN/CLOSING.
-        if (replicaState == ContainerReplicaProto.State.OPEN ||
-            replicaState == ContainerReplicaProto.State.CLOSING) {
-          CloseContainerCommand closeContainerCommand =
-              new CloseContainerCommand(containerInfo.getContainerID(),
-                  containerInfo.getPipelineID());
-          publisher.fireEvent(DATANODE_COMMAND,
-              new CommandForDatanode<>(
-                  replica.getDatanodeDetails().getUuid(),
-                  closeContainerCommand));
-        }
-      }
-      if (containerState == HddsProtos.LifeCycleState.CLOSED) {
-        if (replicaState == ContainerReplicaProto.State.OPEN ||
-            replicaState == ContainerReplicaProto.State.CLOSING ||
-            replicaState == ContainerReplicaProto.State.QUASI_CLOSED) {
-          // Send force close container event for this datanode if the container
-          // replica state is OPEN/CLOSING/QUASI_CLOSED.
-
-          // Close command will be send only if this replica matches the
-          // sequence of the container.
-          if (containerInfo.getSequenceId() ==
-              replica.getSequenceId()) {
-            CloseContainerCommand closeContainerCommand =
-                new CloseContainerCommand(containerInfo.getContainerID(),
-                    containerInfo.getPipelineID(), true);
-            publisher.fireEvent(DATANODE_COMMAND,
-                new CommandForDatanode<>(
-                    replica.getDatanodeDetails().getUuid(),
-                    closeContainerCommand));
-          }
-          // TODO: delete the replica if the BCSID doesn't match.
-        }
-      }
-    }
-
-  }
-
-  /**
-   * Compares the container and replica state.
-   *
-   * @param containerState container state
-   * @param replicaState replica state
-   * @return true if the states are same, else false
-   */
-  private static boolean compareState(final LifeCycleState containerState,
-                              final State replicaState) {
-    // TODO: handle unhealthy replica.
-    switch (containerState) {
-    case OPEN:
-      return replicaState == State.OPEN;
-    case CLOSING:
-      return replicaState == State.CLOSING;
-    case QUASI_CLOSED:
-      return replicaState == State.QUASI_CLOSED;
-    case CLOSED:
-      return replicaState == State.CLOSED;
-    case DELETING:
-      return false;
-    case DELETED:
-      return false;
-    default:
-      return false;
-    }
-  }
-
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index fe52669..1fa8395 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -295,18 +295,20 @@ public class SCMContainerManager implements ContainerManager {
     // Should we return the updated ContainerInfo instead of LifeCycleState?
     lock.lock();
     try {
-      ContainerInfo container = containerStateManager.getContainer(containerID);
-      ContainerInfo updatedContainer =
-          updateContainerStateInternal(containerID, event);
-      if (updatedContainer.getState() != LifeCycleState.OPEN
-          && container.getState() == LifeCycleState.OPEN) {
+      final ContainerInfo container = containerStateManager
+          .getContainer(containerID);
+      final LifeCycleState oldState = container.getState();
+      containerStateManager.updateContainerState(containerID, event);
+      final LifeCycleState newState = container.getState();
+
+      if (oldState == LifeCycleState.OPEN && newState != LifeCycleState.OPEN) {
         pipelineManager
-            .removeContainerFromPipeline(updatedContainer.getPipelineID(),
+            .removeContainerFromPipeline(container.getPipelineID(),
                 containerID);
       }
       final byte[] dbKey = Longs.toByteArray(containerID.getId());
-      containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
-      return updatedContainer.getState();
+      containerStore.put(dbKey, container.getProtobuf().toByteArray());
+      return newState;
     } catch (ContainerNotFoundException cnfe) {
       throw new SCMException(
           "Failed to update container state"
@@ -318,11 +320,6 @@ public class SCMContainerManager implements ContainerManager {
     }
   }
 
-  private ContainerInfo updateContainerStateInternal(ContainerID containerID,
-      HddsProtos.LifeCycleEvent event) throws IOException {
-    return containerStateManager.updateContainerState(containerID, event);
-  }
-
 
     /**
      * Update deleteTransactionId according to deleteTransactionMap.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 2aba724..7411055 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -296,8 +296,7 @@ public class ContainerStateMap {
       checkIfContainerExist(containerID);
       final ContainerInfo currentInfo = containerMap.get(containerID);
       try {
-        final ContainerInfo newInfo = new ContainerInfo(currentInfo);
-        newInfo.setState(newState);
+        currentInfo.setState(newState);
 
         // We are updating two places before this update is done, these can
         // fail independently, since the code needs to handle it.
@@ -309,13 +308,12 @@ public class ContainerStateMap {
         // roll back the earlier change we did. If the rollback fails, we can
         // be in an inconsistent state,
 
-        containerMap.put(containerID, newInfo);
         lifeCycleStateMap.update(currentState, newState, containerID);
         LOG.trace("Updated the container {} to new state. Old = {}, new = " +
             "{}", containerID, currentState, newState);
 
         // Just flush both old and new data sets from the result cache.
-        flushCache(currentInfo, newInfo);
+        flushCache(currentInfo);
       } catch (SCMException ex) {
         LOG.error("Unable to update the container state. {}", ex);
         // we need to revert the change in this attribute since we are not
@@ -324,7 +322,7 @@ public class ContainerStateMap {
                 "old state. Old = {}, Attempted state = {}", currentState,
             newState);
 
-        containerMap.put(containerID, currentInfo);
+        currentInfo.setState(currentState);
 
         // if this line throws, the state map can be in an inconsistent
         // state, since we will have modified the attribute by the
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
index 9c955033..a6b0704 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
@@ -23,7 +23,7 @@ package org.apache.hadoop.hdds.scm.server;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
 import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index b51b537..79565f0 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -55,8 +55,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacem
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
@@ -99,7 +98,6 @@ import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.utils.HddsVersionInfo;
-import org.apache.hadoop.utils.Scheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -176,7 +174,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
   private SCMMetadataStore scmMetadataStore;
 
   private final EventQueue eventQueue;
-  private final Scheduler commonScheduler;
   /*
    * HTTP endpoint for JMX access.
    */
@@ -199,7 +196,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
 
   private final LeaseManager<Long> commandWatcherLeaseManager;
 
-  private final ReplicationActivityStatus replicationStatus;
   private SCMChillModeManager scmChillModeManager;
   private CertificateServer certificateServer;
 
@@ -287,8 +283,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher",
         watcherTimeout);
     initalizeSystemManagers(conf, configurator);
-    commonScheduler = new Scheduler("SCMCommonScheduler", false, 1);
-    replicationStatus = new ReplicationActivityStatus(commonScheduler);
 
     CloseContainerEventHandler closeContainerHandler =
         new CloseContainerEventHandler(pipelineManager, containerManager);
@@ -311,12 +305,10 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService());
 
     ContainerReportHandler containerReportHandler =
-        new ContainerReportHandler(scmNodeManager, pipelineManager,
-            containerManager, replicationStatus);
+        new ContainerReportHandler(scmNodeManager, containerManager);
 
     IncrementalContainerReportHandler incrementalContainerReportHandler =
-        new IncrementalContainerReportHandler(
-            pipelineManager, containerManager);
+        new IncrementalContainerReportHandler(containerManager);
 
     PipelineActionHandler pipelineActionHandler =
         new PipelineActionHandler(pipelineManager, conf);
@@ -343,7 +335,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     httpServer = new StorageContainerManagerHttpServer(conf);
 
     chillModeHandler = new ChillModeHandler(configuration,
-        clientProtocolServer, scmBlockManager, replicationStatus);
+        clientProtocolServer, scmBlockManager, replicationManager);
 
     eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
     eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager);
@@ -422,8 +414,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     if (configurator.getReplicationManager() != null) {
       replicationManager = configurator.getReplicationManager();
     }  else {
-      replicationManager = new ReplicationManager(containerPlacementPolicy,
-          containerManager, eventQueue, commandWatcherLeaseManager);
+      replicationManager = new ReplicationManager(conf,
+          containerManager, containerPlacementPolicy, eventQueue);
     }
     if(configurator.getScmChillModeManager() != null) {
       scmChillModeManager = configurator.getScmChillModeManager();
@@ -917,8 +909,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
 
     httpServer.start();
     scmBlockManager.start();
-    replicationStatus.start();
-    replicationManager.start();
 
     // Start jvm monitor
     jvmPauseMonitor = new JvmPauseMonitor();
@@ -934,14 +924,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
   public void stop() {
 
     try {
-      LOG.info("Stopping Replication Activity Status tracker.");
-      replicationStatus.close();
-    } catch (Exception ex) {
-      LOG.error("Replication Activity Status tracker stop failed.", ex);
-    }
-
-
-    try {
       LOG.info("Stopping Replication Manager Service.");
       replicationManager.stop();
     } catch (Exception ex) {
@@ -1017,13 +999,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
       LOG.error("SCM Event Queue stop failed", ex);
     }
 
-    try {
-      LOG.info("Stopping SCM Common Scheduler.");
-      commonScheduler.close();
-    } catch (Exception ex) {
-      LOG.error("SCM Common Scheduler close failed {}", ex);
-    }
-
     if (jvmPauseMonitor != null) {
       jvmPauseMonitor.stop();
     }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 19c35fd..d61924a 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -533,6 +533,7 @@ public final class TestUtils {
         .setReplicationType(HddsProtos.ReplicationType.RATIS)
         .setReplicationFactor(HddsProtos.ReplicationFactor.THREE)
         .setState(state)
+        .setSequenceId(10000L)
         .setOwner("TEST")
         .build();
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestChillModeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestChillModeHandler.java
index efd69fd..7c9f98e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestChillModeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestChillModeHandler.java
@@ -22,16 +22,20 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
 import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.utils.Scheduler;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.HashSet;
+
 /**
  * Tests ChillModeHandler behavior.
  */
@@ -40,7 +44,7 @@ public class TestChillModeHandler {
 
   private OzoneConfiguration configuration;
   private SCMClientProtocolServer scmClientProtocolServer;
-  private ReplicationActivityStatus replicationActivityStatus;
+  private ReplicationManager replicationManager;
   private BlockManager blockManager;
   private ChillModeHandler chillModeHandler;
   private EventQueue eventQueue;
@@ -54,15 +58,19 @@ public class TestChillModeHandler {
         "3s");
     scmClientProtocolServer =
         Mockito.mock(SCMClientProtocolServer.class);
-    replicationActivityStatus = new ReplicationActivityStatus(
-        new Scheduler("SCMCommonScheduler", false, 1));
+    eventQueue = new EventQueue();
+    final ContainerManager containerManager =
+        Mockito.mock(ContainerManager.class);
+    Mockito.when(containerManager.getContainerIDs())
+        .thenReturn(new HashSet<>());
+    replicationManager = new ReplicationManager(configuration,
+        containerManager, Mockito.mock(ContainerPlacementPolicy.class),
+        eventQueue);
     blockManager = Mockito.mock(BlockManagerImpl.class);
     chillModeHandler =
         new ChillModeHandler(configuration, scmClientProtocolServer,
-            blockManager, replicationActivityStatus);
+            blockManager, replicationManager);
 
-
-    eventQueue = new EventQueue();
     eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler);
     chillModeStatus = new SCMChillModeManager.ChillModeStatus(false);
 
@@ -82,7 +90,7 @@ public class TestChillModeHandler {
     Assert.assertFalse(scmClientProtocolServer.getChillModeStatus());
     Assert.assertFalse(((BlockManagerImpl) blockManager).isScmInChillMode());
     GenericTestUtils.waitFor(() ->
-            replicationActivityStatus.isReplicationEnabled(), 1000, 5000);
+            replicationManager.isRunning(), 1000, 5000);
   }
 
 
@@ -99,6 +107,6 @@ public class TestChillModeHandler {
     Assert.assertFalse(scmClientProtocolServer.getChillModeStatus());
     Assert.assertFalse(((BlockManagerImpl) blockManager).isScmInChillMode());
     GenericTestUtils.waitFor(() ->
-        replicationActivityStatus.isReplicationEnabled(), 1000, 5000);
+        replicationManager.isRunning(), 1000, 5000);
   }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index 0b7cae4..41585bc 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -16,124 +16,146 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.scm.container.replication
-    .ReplicationActivityStatus;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.server
     .SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.utils.Scheduler;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.hadoop.hdds.scm.TestUtils
-    .getReplicas;
-import static org.apache.hadoop.hdds.scm.TestUtils
-    .getContainer;
-import static org.apache.hadoop.hdds.scm.container
-    .TestContainerReportHelper.addContainerToContainerManager;
-import static org.apache.hadoop.hdds.scm.container
-    .TestContainerReportHelper.mockUpdateContainerReplica;
-import static org.apache.hadoop.hdds.scm.container
-    .TestContainerReportHelper.mockUpdateContainerState;
+import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
+import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
 
 /**
  * Test the behaviour of the ContainerReportHandler.
  */
 public class TestContainerReportHandler {
 
-  private static Scheduler scheduler;
+  private NodeManager nodeManager;
+  private ContainerManager containerManager;
+  private ContainerStateManager containerStateManager;
+  private EventPublisher publisher;
+
+  @Before
+  public void setup() throws IOException {
+    final Configuration conf = new OzoneConfiguration();
+    this.nodeManager = new MockNodeManager(true, 10);
+    this.containerManager = Mockito.mock(ContainerManager.class);
+    this.containerStateManager = new ContainerStateManager(conf);
+    this.publisher = Mockito.mock(EventPublisher.class);
+
+
+    Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
+        .thenAnswer(invocation -> containerStateManager
+            .getContainer((ContainerID)invocation.getArguments()[0]));
+
+    Mockito.when(containerManager.getContainerReplicas(
+        Mockito.any(ContainerID.class)))
+        .thenAnswer(invocation -> containerStateManager
+            .getContainerReplicas((ContainerID)invocation.getArguments()[0]));
+
+    Mockito.doAnswer(invocation -> {
+      containerStateManager
+          .updateContainerState((ContainerID)invocation.getArguments()[0],
+              (HddsProtos.LifeCycleEvent)invocation.getArguments()[1]);
+      return null;
+    }).when(containerManager).updateContainerState(
+        Mockito.any(ContainerID.class),
+        Mockito.any(HddsProtos.LifeCycleEvent.class));
+
+    Mockito.doAnswer(invocation -> {
+      containerStateManager.updateContainerReplica(
+          (ContainerID) invocation.getArguments()[0],
+          (ContainerReplica) invocation.getArguments()[1]);
+      return null;
+    }).when(containerManager).updateContainerReplica(
+        Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class));
+
+    Mockito.doAnswer(invocation -> {
+      containerStateManager.removeContainerReplica(
+          (ContainerID) invocation.getArguments()[0],
+          (ContainerReplica) invocation.getArguments()[1]);
+      return null;
+    }).when(containerManager).removeContainerReplica(
+        Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class));
 
-  @BeforeClass
-  public static void setup() {
-    scheduler = new Scheduler("SCMCommonScheduler", false, 1);
   }
 
-  @AfterClass
-  public static void tearDown() {
-    scheduler.close();
+  @After
+  public void tearDown() throws IOException {
+    containerStateManager.close();
   }
 
   @Test
   public void testUnderReplicatedContainer()
-      throws NodeNotFoundException, ContainerNotFoundException,
-      ContainerReplicaNotFoundException {
-
-    final NodeManager nodeManager = new MockNodeManager(true, 10);
-    final ContainerManager containerManager = Mockito.mock(
-        ContainerManager.class);
-    final ReplicationActivityStatus replicationActivityStatus =
-        new ReplicationActivityStatus(scheduler);
-    replicationActivityStatus.enableReplication();
+      throws NodeNotFoundException, ContainerNotFoundException, SCMException {
 
-    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
     final ContainerReportHandler reportHandler = new ContainerReportHandler(
-        nodeManager, pipelineManager, containerManager,
-        replicationActivityStatus);
+        nodeManager, containerManager);
     final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
         NodeState.HEALTHY).iterator();
     final DatanodeDetails datanodeOne = nodeIterator.next();
     final DatanodeDetails datanodeTwo = nodeIterator.next();
     final DatanodeDetails datanodeThree = nodeIterator.next();
+
     final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED);
     final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
     final Set<ContainerID> containerIDSet = Stream.of(
         containerOne.containerID(), containerTwo.containerID())
         .collect(Collectors.toSet());
-    final Set<ContainerReplica> containerOneReplicas = getReplicas(
-        containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
-        datanodeOne, datanodeTwo, datanodeThree);
-    final Set<ContainerReplica> containerTwoReplicas = getReplicas(
-        containerTwo.containerID(), ContainerReplicaProto.State.CLOSED,
-        datanodeOne, datanodeTwo, datanodeThree);
 
     nodeManager.setContainers(datanodeOne, containerIDSet);
     nodeManager.setContainers(datanodeTwo, containerIDSet);
     nodeManager.setContainers(datanodeThree, containerIDSet);
 
-    addContainerToContainerManager(
-        containerManager, containerOne, containerOneReplicas);
-    addContainerToContainerManager(
-        containerManager, containerTwo, containerTwoReplicas);
+    containerStateManager.loadContainer(containerOne);
+    containerStateManager.loadContainer(containerTwo);
 
-    Mockito.doAnswer((Answer<Void>) invocation -> {
-      Object[] args = invocation.getArguments();
-      if (args[0].equals(containerOne.containerID())) {
-        ContainerReplica replica = (ContainerReplica) args[1];
-        containerOneReplicas.remove(replica);
-      }
-      return null;
-    }).when(containerManager).removeContainerReplica(
-        Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class));
+    getReplicas(containerOne.containerID(),
+        ContainerReplicaProto.State.CLOSED,
+        datanodeOne, datanodeTwo, datanodeThree)
+        .forEach(r -> {
+          try {
+            containerStateManager.updateContainerReplica(
+                containerOne.containerID(), r);
+          } catch (ContainerNotFoundException ignored) {
+
+          }
+        });
+
+    getReplicas(containerTwo.containerID(),
+        ContainerReplicaProto.State.CLOSED,
+        datanodeOne, datanodeTwo, datanodeThree)
+        .forEach(r -> {
+          try {
+            containerStateManager.updateContainerReplica(
+                containerTwo.containerID(), r);
+          } catch (ContainerNotFoundException ignored) {
 
+          }
+        });
 
-    Mockito.when(
-        containerManager.getContainerReplicas(containerOne.containerID()))
-        .thenReturn(containerOneReplicas);
-    Mockito.when(
-        containerManager.getContainerReplicas(containerTwo.containerID()))
-        .thenReturn(containerTwoReplicas);
 
     // SCM expects both containerOne and containerTwo to be in all the three
     // datanodes datanodeOne, datanodeTwo and datanodeThree
@@ -145,70 +167,65 @@ public class TestContainerReportHandler {
     final ContainerReportsProto containerReport = getContainerReportsProto(
         containerTwo.containerID(), ContainerReplicaProto.State.CLOSED,
         datanodeOne.getUuidString());
-    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
     final ContainerReportFromDatanode containerReportFromDatanode =
         new ContainerReportFromDatanode(datanodeOne, containerReport);
     reportHandler.onMessage(containerReportFromDatanode, publisher);
+    Assert.assertEquals(2, containerManager.getContainerReplicas(
+        containerOne.containerID()).size());
 
-    // Now we should get a replication request for containerOne
-    Mockito.verify(publisher, Mockito.times(1))
-        .fireEvent(Mockito.any(), Mockito.any());
-
-    // TODO: verify whether are actually getting a replication request event
-    // for containerOne
   }
 
   @Test
   public void testOverReplicatedContainer() throws NodeNotFoundException,
-      ContainerNotFoundException {
-
-    final NodeManager nodeManager = new MockNodeManager(true, 10);
-    final ContainerManager containerManager = Mockito.mock(
-        ContainerManager.class);
-    final ReplicationActivityStatus replicationActivityStatus =
-        new ReplicationActivityStatus(scheduler);
-    replicationActivityStatus.enableReplication();
+      SCMException, ContainerNotFoundException {
 
-    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
     final ContainerReportHandler reportHandler = new ContainerReportHandler(
-        nodeManager, pipelineManager, containerManager,
-        replicationActivityStatus);
+        nodeManager, containerManager);
+
     final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
         NodeState.HEALTHY).iterator();
     final DatanodeDetails datanodeOne = nodeIterator.next();
     final DatanodeDetails datanodeTwo = nodeIterator.next();
     final DatanodeDetails datanodeThree = nodeIterator.next();
     final DatanodeDetails datanodeFour = nodeIterator.next();
+
     final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED);
     final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
+
     final Set<ContainerID> containerIDSet = Stream.of(
         containerOne.containerID(), containerTwo.containerID())
         .collect(Collectors.toSet());
-    final Set<ContainerReplica> containerOneReplicas = getReplicas(
-        containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
-        datanodeOne, datanodeTwo, datanodeThree);
-    final Set<ContainerReplica> containerTwoReplicas = getReplicas(
-        containerTwo.containerID(), ContainerReplicaProto.State.CLOSED,
-        datanodeOne, datanodeTwo, datanodeThree);
 
     nodeManager.setContainers(datanodeOne, containerIDSet);
     nodeManager.setContainers(datanodeTwo, containerIDSet);
     nodeManager.setContainers(datanodeThree, containerIDSet);
 
-    addContainerToContainerManager(
-        containerManager, containerOne, containerOneReplicas);
-    addContainerToContainerManager(
-        containerManager, containerTwo, containerTwoReplicas);
+    containerStateManager.loadContainer(containerOne);
+    containerStateManager.loadContainer(containerTwo);
 
-    Mockito.doAnswer((Answer<Void>) invocation -> {
-      Object[] args = invocation.getArguments();
-      if (args[0].equals(containerOne.containerID())) {
-        containerOneReplicas.add((ContainerReplica) args[1]);
-      }
-      return null;
-    }).when(containerManager).updateContainerReplica(
-        Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class));
+    getReplicas(containerOne.containerID(),
+        ContainerReplicaProto.State.CLOSED,
+        datanodeOne, datanodeTwo, datanodeThree)
+        .forEach(r -> {
+          try {
+            containerStateManager.updateContainerReplica(
+                containerOne.containerID(), r);
+          } catch (ContainerNotFoundException ignored) {
 
+          }
+        });
+
+    getReplicas(containerTwo.containerID(),
+        ContainerReplicaProto.State.CLOSED,
+        datanodeOne, datanodeTwo, datanodeThree)
+        .forEach(r -> {
+          try {
+            containerStateManager.updateContainerReplica(
+                containerTwo.containerID(), r);
+          } catch (ContainerNotFoundException ignored) {
+
+          }
+        });
 
 
     // SCM expects both containerOne and containerTwo to be in all the three
@@ -220,114 +237,15 @@ public class TestContainerReportHandler {
 
     final ContainerReportsProto containerReport = getContainerReportsProto(
         containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
-        datanodeOne.getUuidString());
-    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
+        datanodeFour.getUuidString());
     final ContainerReportFromDatanode containerReportFromDatanode =
         new ContainerReportFromDatanode(datanodeFour, containerReport);
     reportHandler.onMessage(containerReportFromDatanode, publisher);
-    Mockito.verify(publisher, Mockito.times(1))
-        .fireEvent(Mockito.any(), Mockito.any());
 
-    // TODO: verify whether are actually getting a replication request event
-    // for containerOne
+    Assert.assertEquals(4, containerManager.getContainerReplicas(
+        containerOne.containerID()).size());
   }
 
-  @Test
-  public void testOpenToClosing()
-      throws NodeNotFoundException, ContainerNotFoundException {
-    /*
-     * The container is in CLOSING state and all the replicas are either in
-     * OPEN or CLOSING state.
-     *
-     * The datanode reports that the replica is still in OPEN state.
-     *
-     * In this case SCM should trigger close container event to the datanode.
-     */
-
-    final NodeManager nodeManager = new MockNodeManager(true, 10);
-    final ContainerManager containerManager = Mockito.mock(
-        ContainerManager.class);
-    final ReplicationActivityStatus replicationActivityStatus =
-        new ReplicationActivityStatus(scheduler);
-    replicationActivityStatus.enableReplication();
-
-    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
-    final ContainerReportHandler reportHandler = new ContainerReportHandler(
-        nodeManager, pipelineManager, containerManager,
-        replicationActivityStatus);
-    final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
-        NodeState.HEALTHY).iterator();
-    final DatanodeDetails datanodeOne = nodeIterator.next();
-    final DatanodeDetails datanodeTwo = nodeIterator.next();
-    final DatanodeDetails datanodeThree = nodeIterator.next();
-    final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSING);
-    final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
-    final Set<ContainerID> containerIDSet = Stream.of(
-        containerOne.containerID(), containerTwo.containerID())
-        .collect(Collectors.toSet());
-    final Set<ContainerReplica> containerOneReplicas = getReplicas(
-        containerOne.containerID(), ContainerReplicaProto.State.OPEN,
-        datanodeOne);
-
-    containerOneReplicas.addAll(getReplicas(
-        containerOne.containerID(), ContainerReplicaProto.State.CLOSING,
-        datanodeTwo, datanodeThree));
-
-    final Set<ContainerReplica> containerTwoReplicas = getReplicas(
-        containerTwo.containerID(), ContainerReplicaProto.State.CLOSED,
-        datanodeOne, datanodeTwo, datanodeThree);
-
-    nodeManager.setContainers(datanodeOne, containerIDSet);
-    nodeManager.setContainers(datanodeTwo, containerIDSet);
-    nodeManager.setContainers(datanodeThree, containerIDSet);
-
-    addContainerToContainerManager(
-        containerManager, containerOne, containerOneReplicas);
-    addContainerToContainerManager(
-        containerManager, containerTwo, containerTwoReplicas);
-    mockUpdateContainerReplica(
-        containerManager, containerOne, containerOneReplicas);
-
-    // Replica in datanodeOne of containerOne is in OPEN state.
-    final ContainerReportsProto containerReport = getContainerReportsProto(
-        containerOne.containerID(), ContainerReplicaProto.State.OPEN,
-        datanodeOne.getUuidString());
-    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
-    final ContainerReportFromDatanode containerReportFromDatanode =
-        new ContainerReportFromDatanode(datanodeOne, containerReport);
-    reportHandler.onMessage(containerReportFromDatanode, publisher);
-
-    // Now we should get close container event for containerOne on datanodeOne
-    Mockito.verify(publisher, Mockito.times(1))
-        .fireEvent(Mockito.any(), Mockito.any());
-
-    // TODO: verify whether are actually getting a close container
-    // datanode command for containerOne/datanodeOne
-
-    /*
-     * The container is in CLOSING state and all the replicas are either in
-     * OPEN or CLOSING state.
-     *
-     * The datanode reports that the replica is in CLOSING state.
-     *
-     * In this case SCM should trigger close container event to the datanode.
-     */
-
-    // Replica in datanodeOne of containerOne is in OPEN state.
-    final ContainerReportsProto containerReportTwo = getContainerReportsProto(
-        containerOne.containerID(), ContainerReplicaProto.State.OPEN,
-        datanodeOne.getUuidString());
-    final ContainerReportFromDatanode containerReportTwoFromDatanode =
-        new ContainerReportFromDatanode(datanodeOne, containerReportTwo);
-    reportHandler.onMessage(containerReportTwoFromDatanode, publisher);
-
-    // Now we should get close container event for containerOne on datanodeOne
-    Mockito.verify(publisher, Mockito.times(2))
-        .fireEvent(Mockito.any(), Mockito.any());
-
-    // TODO: verify whether are actually getting a close container
-    // datanode command for containerOne/datanodeOne
-  }
 
   @Test
   public void testClosingToClosed() throws NodeNotFoundException, IOException {
@@ -339,27 +257,23 @@ public class TestContainerReportHandler {
      *
      * In this case SCM should mark the container as CLOSED.
      */
-    final NodeManager nodeManager = new MockNodeManager(true, 10);
-    final ContainerManager containerManager = Mockito.mock(
-        ContainerManager.class);
-    final ReplicationActivityStatus replicationActivityStatus =
-        new ReplicationActivityStatus(scheduler);
-    replicationActivityStatus.enableReplication();
-
-    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+
     final ContainerReportHandler reportHandler = new ContainerReportHandler(
-        nodeManager, pipelineManager, containerManager,
-        replicationActivityStatus);
+        nodeManager, containerManager);
+
     final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
         NodeState.HEALTHY).iterator();
     final DatanodeDetails datanodeOne = nodeIterator.next();
     final DatanodeDetails datanodeTwo = nodeIterator.next();
     final DatanodeDetails datanodeThree = nodeIterator.next();
+
     final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSING);
     final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
+
     final Set<ContainerID> containerIDSet = Stream.of(
         containerOne.containerID(), containerTwo.containerID())
         .collect(Collectors.toSet());
+
     final Set<ContainerReplica> containerOneReplicas = getReplicas(
         containerOne.containerID(),
         ContainerReplicaProto.State.CLOSING,
@@ -379,25 +293,36 @@ public class TestContainerReportHandler {
     nodeManager.setContainers(datanodeTwo, containerIDSet);
     nodeManager.setContainers(datanodeThree, containerIDSet);
 
-    addContainerToContainerManager(
-        containerManager, containerOne, containerOneReplicas);
-    addContainerToContainerManager(
-        containerManager, containerTwo, containerTwoReplicas);
-    mockUpdateContainerReplica(
-        containerManager, containerOne, containerOneReplicas);
-    mockUpdateContainerState(containerManager, containerOne,
-        LifeCycleEvent.CLOSE, LifeCycleState.CLOSED);
+    containerStateManager.loadContainer(containerOne);
+    containerStateManager.loadContainer(containerTwo);
+
+    containerOneReplicas.forEach(r -> {
+      try {
+        containerStateManager.updateContainerReplica(
+            containerTwo.containerID(), r);
+      } catch (ContainerNotFoundException ignored) {
+
+      }
+    });
+
+    containerTwoReplicas.forEach(r -> {
+      try {
+        containerStateManager.updateContainerReplica(
+            containerTwo.containerID(), r);
+      } catch (ContainerNotFoundException ignored) {
+
+      }
+    });
+
 
     final ContainerReportsProto containerReport = getContainerReportsProto(
         containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
         datanodeOne.getUuidString());
-    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
     final ContainerReportFromDatanode containerReportFromDatanode =
         new ContainerReportFromDatanode(datanodeOne, containerReport);
     reportHandler.onMessage(containerReportFromDatanode, publisher);
 
-    Assert.assertEquals(
-        LifeCycleState.CLOSED, containerOne.getState());
+    Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState());
   }
 
   @Test
@@ -411,29 +336,23 @@ public class TestContainerReportHandler {
      *
      * In this case SCM should move the container to QUASI_CLOSED.
      */
-    final NodeManager nodeManager = new MockNodeManager(true, 10);
-    final ContainerManager containerManager = Mockito.mock(
-        ContainerManager.class);
-    final ReplicationActivityStatus replicationActivityStatus =
-        new ReplicationActivityStatus(scheduler);
-    replicationActivityStatus.enableReplication();
-
-    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+
     final ContainerReportHandler reportHandler = new ContainerReportHandler(
-        nodeManager, pipelineManager, containerManager,
-        replicationActivityStatus);
+        nodeManager, containerManager);
+
     final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
         NodeState.HEALTHY).iterator();
     final DatanodeDetails datanodeOne = nodeIterator.next();
     final DatanodeDetails datanodeTwo = nodeIterator.next();
     final DatanodeDetails datanodeThree = nodeIterator.next();
-    final ContainerInfo containerOne =
-        getContainer(LifeCycleState.CLOSING);
-    final ContainerInfo containerTwo =
-        getContainer(LifeCycleState.CLOSED);
+
+    final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSING);
+    final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
+
     final Set<ContainerID> containerIDSet = Stream.of(
         containerOne.containerID(), containerTwo.containerID())
         .collect(Collectors.toSet());
+
     final Set<ContainerReplica> containerOneReplicas = getReplicas(
         containerOne.containerID(),
         ContainerReplicaProto.State.CLOSING,
@@ -451,60 +370,65 @@ public class TestContainerReportHandler {
     nodeManager.setContainers(datanodeTwo, containerIDSet);
     nodeManager.setContainers(datanodeThree, containerIDSet);
 
-    addContainerToContainerManager(
-        containerManager, containerOne, containerOneReplicas);
-    addContainerToContainerManager(
-        containerManager, containerTwo, containerTwoReplicas);
-    mockUpdateContainerReplica(
-        containerManager, containerOne, containerOneReplicas);
-    mockUpdateContainerState(containerManager, containerOne,
-        LifeCycleEvent.QUASI_CLOSE, LifeCycleState.QUASI_CLOSED);
+    containerStateManager.loadContainer(containerOne);
+    containerStateManager.loadContainer(containerTwo);
+
+    containerOneReplicas.forEach(r -> {
+      try {
+        containerStateManager.updateContainerReplica(
+            containerTwo.containerID(), r);
+      } catch (ContainerNotFoundException ignored) {
+
+      }
+    });
+
+    containerTwoReplicas.forEach(r -> {
+      try {
+        containerStateManager.updateContainerReplica(
+            containerTwo.containerID(), r);
+      } catch (ContainerNotFoundException ignored) {
+
+      }
+    });
+
 
     final ContainerReportsProto containerReport = getContainerReportsProto(
         containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED,
         datanodeOne.getUuidString());
-    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
     final ContainerReportFromDatanode containerReportFromDatanode =
         new ContainerReportFromDatanode(datanodeOne, containerReport);
     reportHandler.onMessage(containerReportFromDatanode, publisher);
 
-    Assert.assertEquals(
-        LifeCycleState.QUASI_CLOSED, containerOne.getState());
+    Assert.assertEquals(LifeCycleState.QUASI_CLOSED, containerOne.getState());
   }
 
   @Test
-  public void testQuasiClosedWithDifferentOriginNodeReplica()
+  public void testQuasiClosedToClosed()
       throws NodeNotFoundException, IOException {
     /*
      * The container is in QUASI_CLOSED state.
      *  - One of the replica is in QUASI_CLOSED state
      *  - The other two replica are in OPEN/CLOSING state
      *
-     * The datanode reports the second replica is now QUASI_CLOSED.
+     * The datanode reports the second replica is now CLOSED.
      *
-     * In this case SCM should CLOSE the container with highest BCSID and
-     * send force close command to the datanode.
+     * In this case SCM should CLOSE the container.
      */
-    final NodeManager nodeManager = new MockNodeManager(true, 10);
-    final ContainerManager containerManager = Mockito.mock(
-        ContainerManager.class);
-    final ReplicationActivityStatus replicationActivityStatus =
-        new ReplicationActivityStatus(scheduler);
-    replicationActivityStatus.enableReplication();
-
-    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+
     final ContainerReportHandler reportHandler = new ContainerReportHandler(
-        nodeManager, pipelineManager, containerManager,
-        replicationActivityStatus);
+        nodeManager, containerManager);
     final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
         NodeState.HEALTHY).iterator();
+
     final DatanodeDetails datanodeOne = nodeIterator.next();
     final DatanodeDetails datanodeTwo = nodeIterator.next();
     final DatanodeDetails datanodeThree = nodeIterator.next();
+
     final ContainerInfo containerOne =
         getContainer(LifeCycleState.QUASI_CLOSED);
     final ContainerInfo containerTwo =
         getContainer(LifeCycleState.CLOSED);
+
     final Set<ContainerID> containerIDSet = Stream.of(
         containerOne.containerID(), containerTwo.containerID())
         .collect(Collectors.toSet());
@@ -526,139 +450,42 @@ public class TestContainerReportHandler {
     nodeManager.setContainers(datanodeTwo, containerIDSet);
     nodeManager.setContainers(datanodeThree, containerIDSet);
 
-    addContainerToContainerManager(
-        containerManager, containerOne, containerOneReplicas);
-    addContainerToContainerManager(
-        containerManager, containerTwo, containerTwoReplicas);
-    mockUpdateContainerReplica(
-        containerManager, containerOne, containerOneReplicas);
-    mockUpdateContainerState(containerManager, containerOne,
-        LifeCycleEvent.FORCE_CLOSE, LifeCycleState.CLOSED);
-
-    // Container replica with datanodeOne as originNodeId is already
-    // QUASI_CLOSED. Now we will tell SCM that container replica from
-    // datanodeTwo is also QUASI_CLOSED, but has higher sequenceId.
-    final ContainerReportsProto containerReport = getContainerReportsProto(
-        containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED,
-        datanodeTwo.getUuidString(), 999999L);
+    containerStateManager.loadContainer(containerOne);
+    containerStateManager.loadContainer(containerTwo);
 
-    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
-    final ContainerReportFromDatanode containerReportFromDatanode =
-        new ContainerReportFromDatanode(datanodeTwo, containerReport);
-    reportHandler.onMessage(containerReportFromDatanode, publisher);
+    containerOneReplicas.forEach(r -> {
+      try {
+        containerStateManager.updateContainerReplica(
+            containerTwo.containerID(), r);
+      } catch (ContainerNotFoundException ignored) {
 
-    // Now we should get force close container event for containerOne on
-    // datanodeTwo
-    Mockito.verify(publisher, Mockito.times(1))
-        .fireEvent(Mockito.any(), Mockito.any());
-    // TODO: verify whether are actually getting a force close container
-    // datanode command for containerOne/datanodeTwo
-
-    // The sequence id of the container should have been updated.
-    Assert.assertEquals(999999L, containerOne.getSequenceId());
+      }
+    });
 
-    // Now datanodeTwo should close containerOne.
-    final ContainerReportsProto containerReportTwo = getContainerReportsProto(
-        containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
-        datanodeTwo.getUuidString(), 999999L);
-    final ContainerReportFromDatanode containerReportFromDatanodeTwo =
-        new ContainerReportFromDatanode(datanodeTwo, containerReportTwo);
-    reportHandler.onMessage(containerReportFromDatanodeTwo, publisher);
+    containerTwoReplicas.forEach(r -> {
+      try {
+        containerStateManager.updateContainerReplica(
+            containerTwo.containerID(), r);
+      } catch (ContainerNotFoundException ignored) {
 
-    // The container should be closed in SCM now.
-    Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState());
-  }
+      }
+    });
 
-  @Test
-  public void testQuasiClosedWithSameOriginNodeReplica()
-      throws NodeNotFoundException, IOException {
-    /*
-     * The container is in QUASI_CLOSED state.
-     *  - One of the replica is in QUASI_CLOSED state
-     *  - The other two replica are in OPEN/CLOSING state
-     *
-     * The datanode reports a QUASI_CLOSED replica which has the same
-     * origin node id as the existing QUASI_CLOSED replica.
-     *
-     * In this case SCM should not CLOSE the container.
-     */
-    final NodeManager nodeManager = new MockNodeManager(true, 10);
-    final ContainerManager containerManager = Mockito.mock(
-        ContainerManager.class);
-    final ReplicationActivityStatus replicationActivityStatus =
-        new ReplicationActivityStatus(scheduler);
-    replicationActivityStatus.enableReplication();
-
-    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
-    final ContainerReportHandler reportHandler = new ContainerReportHandler(
-        nodeManager, pipelineManager, containerManager,
-        replicationActivityStatus);
-    final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
-        NodeState.HEALTHY).iterator();
-    final DatanodeDetails datanodeOne = nodeIterator.next();
-    final DatanodeDetails datanodeTwo = nodeIterator.next();
-    final DatanodeDetails datanodeThree = nodeIterator.next();
-    final ContainerInfo containerOne =
-        getContainer(LifeCycleState.QUASI_CLOSED);
-    final ContainerInfo containerTwo =
-        getContainer(LifeCycleState.CLOSED);
-    final Set<ContainerID> containerIDSet = Stream.of(
-        containerOne.containerID(), containerTwo.containerID())
-        .collect(Collectors.toSet());
-    final Set<ContainerReplica> containerOneReplicas = getReplicas(
-        containerOne.containerID(),
-        ContainerReplicaProto.State.QUASI_CLOSED,
-        datanodeOne);
-    containerOneReplicas.addAll(getReplicas(
-        containerOne.containerID(),
-        ContainerReplicaProto.State.CLOSING,
-        datanodeTwo));
-    final Set<ContainerReplica> containerTwoReplicas = getReplicas(
-        containerTwo.containerID(),
-        ContainerReplicaProto.State.CLOSED,
-        datanodeOne, datanodeTwo, datanodeThree);
 
-    nodeManager.setContainers(datanodeOne, containerIDSet);
-    nodeManager.setContainers(datanodeTwo, containerIDSet);
-    nodeManager.setContainers(datanodeThree,
-        Collections.singleton(containerTwo.containerID()));
-
-    addContainerToContainerManager(
-        containerManager, containerOne, containerOneReplicas);
-    addContainerToContainerManager(
-        containerManager, containerTwo, containerTwoReplicas);
-
-    mockUpdateContainerReplica(
-        containerManager, containerOne, containerOneReplicas);
-    mockUpdateContainerState(containerManager, containerOne,
-        LifeCycleEvent.FORCE_CLOSE, LifeCycleState.CLOSED);
-
-    // containerOne is QUASI_CLOSED in datanodeOne and CLOSING in datanodeTwo.
-    // Now datanodeThree is sending container report which says that it has
-    // containerOne replica, but the originNodeId of this replica is
-    // datanodeOne. In this case we should not force close the container even
-    // though we got two QUASI_CLOSED replicas.
     final ContainerReportsProto containerReport = getContainerReportsProto(
-        containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED,
+        containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
         datanodeOne.getUuidString());
-    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
+
     final ContainerReportFromDatanode containerReportFromDatanode =
-        new ContainerReportFromDatanode(datanodeThree, containerReport);
+        new ContainerReportFromDatanode(datanodeOne, containerReport);
     reportHandler.onMessage(containerReportFromDatanode, publisher);
 
-    Mockito.verify(publisher, Mockito.times(0))
-        .fireEvent(Mockito.any(), Mockito.any());
+    Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState());
   }
 
   private static ContainerReportsProto getContainerReportsProto(
       final ContainerID containerId, final ContainerReplicaProto.State state,
       final String originNodeId) {
-    return getContainerReportsProto(containerId, state, originNodeId, 100L);
-  }
-
-  private static ContainerReportsProto getContainerReportsProto(
-      final ContainerID containerId, final ContainerReplicaProto.State state,
-      final String originNodeId, final long bcsid) {
     final ContainerReportsProto.Builder crBuilder =
         ContainerReportsProto.newBuilder();
     final ContainerReplicaProto replicaProto =
@@ -674,7 +501,7 @@ public class TestContainerReportHandler {
             .setWriteCount(100000000L)
             .setReadBytes(2000000000L)
             .setWriteBytes(2000000000L)
-            .setBlockCommitSequenceId(bcsid)
+            .setBlockCommitSequenceId(10000L)
             .setDeleteTransactionId(0)
             .build();
     return crBuilder.addReports(replicaProto).build();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java
deleted file mode 100644
index 860ec4d..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
-import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.util.Set;
-
-/**
- * Helper methods for testing ContainerReportHandler and
- * IncrementalContainerReportHandler.
- */
-public final class TestContainerReportHelper {
-
-  private TestContainerReportHelper() {}
-
-  static void addContainerToContainerManager(
-      final ContainerManager containerManager, final ContainerInfo container,
-      final Set<ContainerReplica> replicas) throws ContainerNotFoundException {
-    Mockito.when(containerManager.getContainer(container.containerID()))
-        .thenReturn(container);
-    Mockito.when(
-        containerManager.getContainerReplicas(container.containerID()))
-        .thenReturn(replicas);
-  }
-
-  static void mockUpdateContainerReplica(
-      final ContainerManager containerManager,
-      final ContainerInfo containerInfo, final Set<ContainerReplica> replicas)
-      throws ContainerNotFoundException {
-    Mockito.doAnswer((Answer<Void>) invocation -> {
-      Object[] args = invocation.getArguments();
-      if (args[0].equals(containerInfo.containerID())) {
-        ContainerReplica replica = (ContainerReplica) args[1];
-        replicas.remove(replica);
-        replicas.add(replica);
-      }
-      return null;
-    }).when(containerManager).updateContainerReplica(
-        Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class));
-  }
-
-  static void mockUpdateContainerState(
-      final ContainerManager containerManager,
-      final ContainerInfo containerInfo,
-      final LifeCycleEvent event, final LifeCycleState state)
-      throws IOException {
-    Mockito.doAnswer((Answer<LifeCycleState>) invocation -> {
-      containerInfo.setState(state);
-      return containerInfo.getState();
-    }).when(containerManager).updateContainerState(
-        containerInfo.containerID(), event);
-  }
-
-}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
index 6c9383f..7b8f9fc7b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
@@ -17,47 +17,78 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
 import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .IncrementalContainerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.Set;
 
-import static org.apache.hadoop.hdds.scm.container
-    .TestContainerReportHelper.addContainerToContainerManager;
 import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
 import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
-import static org.apache.hadoop.hdds.scm.container
-    .TestContainerReportHelper.mockUpdateContainerReplica;
-import static org.apache.hadoop.hdds.scm.container
-    .TestContainerReportHelper.mockUpdateContainerState;
 
 /**
  * Test cases to verify the functionality of IncrementalContainerReportHandler.
  */
 public class TestIncrementalContainerReportHandler {
 
+  private ContainerManager containerManager;
+  private ContainerStateManager containerStateManager;
+  private EventPublisher publisher;
+
+  @Before
+  public void setup() throws IOException {
+    final Configuration conf = new OzoneConfiguration();
+    this.containerManager = Mockito.mock(ContainerManager.class);
+    this.containerStateManager = new ContainerStateManager(conf);
+    this.publisher = Mockito.mock(EventPublisher.class);
+
+
+    Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
+        .thenAnswer(invocation -> containerStateManager
+            .getContainer((ContainerID)invocation.getArguments()[0]));
+
+    Mockito.when(containerManager.getContainerReplicas(
+        Mockito.any(ContainerID.class)))
+        .thenAnswer(invocation -> containerStateManager
+            .getContainerReplicas((ContainerID)invocation.getArguments()[0]));
+
+    Mockito.doAnswer(invocation -> {
+      containerStateManager
+          .updateContainerState((ContainerID)invocation.getArguments()[0],
+              (HddsProtos.LifeCycleEvent)invocation.getArguments()[1]);
+      return null;
+    }).when(containerManager).updateContainerState(
+        Mockito.any(ContainerID.class),
+        Mockito.any(HddsProtos.LifeCycleEvent.class));
+
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    containerStateManager.close();
+  }
+
+
   @Test
   public void testClosingToClosed() throws IOException {
-    final ContainerManager containerManager = Mockito.mock(
-        ContainerManager.class);
-    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
     final IncrementalContainerReportHandler reportHandler =
-        new IncrementalContainerReportHandler(
-            pipelineManager, containerManager);
+        new IncrementalContainerReportHandler(containerManager);
     final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
     final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
     final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
@@ -67,34 +98,31 @@ public class TestIncrementalContainerReportHandler {
         ContainerReplicaProto.State.CLOSING,
         datanodeOne, datanodeTwo, datanodeThree);
 
-    addContainerToContainerManager(
-        containerManager, container, containerReplicas);
-    mockUpdateContainerReplica(
-        containerManager, container, containerReplicas);
-    mockUpdateContainerState(containerManager, container,
-        LifeCycleEvent.CLOSE, LifeCycleState.CLOSED);
+    containerStateManager.loadContainer(container);
+    containerReplicas.forEach(r -> {
+      try {
+        containerStateManager.updateContainerReplica(
+            container.containerID(), r);
+      } catch (ContainerNotFoundException ignored) {
+
+      }
+    });
 
     final IncrementalContainerReportProto containerReport =
         getIncrementalContainerReportProto(container.containerID(),
             ContainerReplicaProto.State.CLOSED,
             datanodeOne.getUuidString());
-    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
     final IncrementalContainerReportFromDatanode icrFromDatanode =
         new IncrementalContainerReportFromDatanode(
             datanodeOne, containerReport);
     reportHandler.onMessage(icrFromDatanode, publisher);
-    Assert.assertEquals(
-        LifeCycleState.CLOSED, container.getState());
+    Assert.assertEquals(LifeCycleState.CLOSED, container.getState());
   }
 
   @Test
   public void testClosingToQuasiClosed() throws IOException {
-    final ContainerManager containerManager = Mockito.mock(
-        ContainerManager.class);
-    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
     final IncrementalContainerReportHandler reportHandler =
-        new IncrementalContainerReportHandler(
-            pipelineManager, containerManager);
+        new IncrementalContainerReportHandler(containerManager);
     final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
     final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
     final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
@@ -104,34 +132,32 @@ public class TestIncrementalContainerReportHandler {
         ContainerReplicaProto.State.CLOSING,
         datanodeOne, datanodeTwo, datanodeThree);
 
-    addContainerToContainerManager(
-        containerManager, container, containerReplicas);
-    mockUpdateContainerReplica(
-        containerManager, container, containerReplicas);
-    mockUpdateContainerState(containerManager, container,
-        LifeCycleEvent.QUASI_CLOSE, LifeCycleState.QUASI_CLOSED);
+    containerStateManager.loadContainer(container);
+    containerReplicas.forEach(r -> {
+      try {
+        containerStateManager.updateContainerReplica(
+            container.containerID(), r);
+      } catch (ContainerNotFoundException ignored) {
+
+      }
+    });
+
 
     final IncrementalContainerReportProto containerReport =
         getIncrementalContainerReportProto(container.containerID(),
             ContainerReplicaProto.State.QUASI_CLOSED,
             datanodeOne.getUuidString());
-    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
     final IncrementalContainerReportFromDatanode icrFromDatanode =
         new IncrementalContainerReportFromDatanode(
             datanodeOne, containerReport);
     reportHandler.onMessage(icrFromDatanode, publisher);
-    Assert.assertEquals(
-        LifeCycleState.QUASI_CLOSED, container.getState());
+    Assert.assertEquals(LifeCycleState.QUASI_CLOSED, container.getState());
   }
 
   @Test
   public void testQuasiClosedToClosed() throws IOException {
-    final ContainerManager containerManager = Mockito.mock(
-        ContainerManager.class);
-    final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
     final IncrementalContainerReportHandler reportHandler =
-        new IncrementalContainerReportHandler(
-            pipelineManager, containerManager);
+        new IncrementalContainerReportHandler(containerManager);
     final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
     final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
     final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
@@ -145,38 +171,25 @@ public class TestIncrementalContainerReportHandler {
         ContainerReplicaProto.State.QUASI_CLOSED,
         datanodeThree));
 
+    containerStateManager.loadContainer(container);
+    containerReplicas.forEach(r -> {
+      try {
+        containerStateManager.updateContainerReplica(
+            container.containerID(), r);
+      } catch (ContainerNotFoundException ignored) {
 
-    addContainerToContainerManager(
-        containerManager, container, containerReplicas);
-    mockUpdateContainerReplica(
-        containerManager, container, containerReplicas);
-    mockUpdateContainerState(containerManager, container,
-        LifeCycleEvent.FORCE_CLOSE, LifeCycleState.CLOSED);
+      }
+    });
 
     final IncrementalContainerReportProto containerReport =
         getIncrementalContainerReportProto(container.containerID(),
-            ContainerReplicaProto.State.QUASI_CLOSED,
-            datanodeOne.getUuidString(), 999999L);
-    final EventPublisher publisher = Mockito.mock(EventPublisher.class);
-    final IncrementalContainerReportFromDatanode icrFromDatanode =
-        new IncrementalContainerReportFromDatanode(
-            datanodeOne, containerReport);
-    reportHandler.onMessage(icrFromDatanode, publisher);
-
-    // SCM should issue force close.
-    Mockito.verify(publisher, Mockito.times(1))
-        .fireEvent(Mockito.any(), Mockito.any());
-
-    final IncrementalContainerReportProto containerReportTwo =
-        getIncrementalContainerReportProto(container.containerID(),
             ContainerReplicaProto.State.CLOSED,
-            datanodeOne.getUuidString(), 999999L);
-    final IncrementalContainerReportFromDatanode icrTwoFromDatanode =
+            datanodeThree.getUuidString());
+    final IncrementalContainerReportFromDatanode icr =
         new IncrementalContainerReportFromDatanode(
-            datanodeOne, containerReportTwo);
-    reportHandler.onMessage(icrTwoFromDatanode, publisher);
-    Assert.assertEquals(
-        LifeCycleState.CLOSED, container.getState());
+            datanodeOne, containerReport);
+    reportHandler.onMessage(icr, publisher);
+    Assert.assertEquals(LifeCycleState.CLOSED, container.getState());
   }
 
   private static IncrementalContainerReportProto
@@ -184,15 +197,6 @@ public class TestIncrementalContainerReportHandler {
           final ContainerID containerId,
           final ContainerReplicaProto.State state,
           final String originNodeId) {
-    return getIncrementalContainerReportProto(
-        containerId, state, originNodeId, 100L);
-  }
-
-  private static IncrementalContainerReportProto
-      getIncrementalContainerReportProto(
-          final ContainerID containerId,
-          final ContainerReplicaProto.State state,
-          final String originNodeId, final long bcsid) {
     final IncrementalContainerReportProto.Builder crBuilder =
         IncrementalContainerReportProto.newBuilder();
     final ContainerReplicaProto replicaProto =
@@ -208,7 +212,7 @@ public class TestIncrementalContainerReportHandler {
             .setWriteCount(100000000L)
             .setReadBytes(2000000000L)
             .setWriteBytes(2000000000L)
-            .setBlockCommitSequenceId(bcsid)
+            .setBlockCommitSequenceId(10000L)
             .setDeleteTransactionId(0)
             .build();
     return crBuilder.addReport(replicaProto).build();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationActivityStatus.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationActivityStatus.java
deleted file mode 100644
index c36ba75..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationActivityStatus.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import static org.junit.Assert.*;
-
-import java.util.concurrent.TimeoutException;
-
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.block.BlockManager;
-import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
-import org.apache.hadoop.hdds.scm.chillmode.ChillModeHandler;
-import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
-import org.apache.hadoop.hdds.server.events.EventQueue;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.utils.Scheduler;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-/**
- * Tests for ReplicationActivityStatus.
- */
-public class TestReplicationActivityStatus {
-
-  private static EventQueue eventQueue;
-  private static ReplicationActivityStatus replicationActivityStatus;
-
-  @BeforeClass
-  public static void setup() {
-    eventQueue = new EventQueue();
-    replicationActivityStatus = new ReplicationActivityStatus(
-        new Scheduler("SCMCommonScheduler", false, 1));
-
-    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
-    ozoneConfiguration.set(HddsConfigKeys.
-        HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT, "3s");
-
-    SCMClientProtocolServer scmClientProtocolServer =
-        Mockito.mock(SCMClientProtocolServer.class);
-    BlockManager blockManager = Mockito.mock(BlockManagerImpl.class);
-    ChillModeHandler chillModeHandler =
-        new ChillModeHandler(ozoneConfiguration, scmClientProtocolServer,
-            blockManager, replicationActivityStatus);
-    eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler);
-
-  }
-
-  @Test
-  public void testReplicationStatusForChillMode()
-      throws TimeoutException, InterruptedException {
-    assertFalse(replicationActivityStatus.isReplicationEnabled());
-    // In chill mode replication process should be stopped.
-    eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS,
-        new SCMChillModeManager.ChillModeStatus(true));
-    assertFalse(replicationActivityStatus.isReplicationEnabled());
-
-    // Replication should be enabled when chill mode if off.
-    eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS,
-        new SCMChillModeManager.ChillModeStatus(false));
-    GenericTestUtils.waitFor(() -> {
-      return replicationActivityStatus.isReplicationEnabled();
-    }, 10, 1000*5);
-    assertTrue(replicationActivityStatus.isReplicationEnabled());
-  }
-}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java
index 1846b0c..ea59af3 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
 import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
 import org.apache.hadoop.hdds.scm.chillmode.ChillModeHandler;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -48,10 +48,10 @@ public class TestSCMClientProtocolServer {
     eventQueue = new EventQueue();
     scmClientProtocolServer = new SCMClientProtocolServer(config, null);
     BlockManager blockManager = Mockito.mock(BlockManagerImpl.class);
-    ReplicationActivityStatus replicationActivityStatus =
-        Mockito.mock(ReplicationActivityStatus.class);
+    ReplicationManager replicationManager =
+        Mockito.mock(ReplicationManager.class);
     ChillModeHandler chillModeHandler = new ChillModeHandler(config,
-        scmClientProtocolServer, blockManager, replicationActivityStatus);
+        scmClientProtocolServer, blockManager, replicationManager);
     eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org