You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2019/03/22 21:36:36 UTC

[hadoop] branch trunk updated: HDDS-1205. Refactor ReplicationManager to handle QUASI_CLOSED containers. Contributed by Nanda kumar. (#620)

This is an automated email from the ASF dual-hosted git repository.

arp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f854a89  HDDS-1205. Refactor ReplicationManager to handle QUASI_CLOSED containers. Contributed by Nanda kumar. (#620)
f854a89 is described below

commit f854a89190bd2453ccb1bfaa123d63d546e913cd
Author: Arpit Agarwal <ar...@users.noreply.github.com>
AuthorDate: Fri Mar 22 14:36:29 2019 -0700

    HDDS-1205. Refactor ReplicationManager to handle QUASI_CLOSED containers. Contributed by Nanda kumar. (#620)
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |  12 +
 .../common/src/main/resources/ozone-default.xml    |  20 +
 .../hdds/scm/container/ContainerManager.java       |   8 +
 .../hdds/scm/container/ReplicationManager.java     | 748 +++++++++++++++++++++
 .../hdds/scm/container/SCMContainerManager.java    |  10 +
 .../scm/container/states/ContainerStateMap.java    |   9 +-
 .../java/org/apache/hadoop/hdds/scm/TestUtils.java |  53 +-
 .../scm/container/TestContainerReportHandler.java  |   8 +-
 .../scm/container/TestContainerReportHelper.java   |  40 --
 .../TestIncrementalContainerReportHandler.java     |   6 +-
 .../hdds/scm/container/TestReplicationManager.java | 625 +++++++++++++++++
 11 files changed, 1485 insertions(+), 54 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 4e197d3..3b45b89 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -348,6 +348,18 @@ public final class ScmConfigKeys {
   public static final String HDDS_SCM_WATCHER_TIMEOUT_DEFAULT =
       "10m";
 
+  public static final String HDDS_SCM_REPLICATION_THREAD_INTERVAL =
+      "hdds.scm.replication.thread.interval";
+
+  public static final String HDDS_SCM_REPLICATION_THREAD_INTERVAL_DEFAULT =
+      "5m";
+
+  public static final String HDDS_SCM_REPLICATION_EVENT_TIMEOUT =
+      "hdds.scm.replication.event.timeout";
+
+  public static final String HDDS_SCM_REPLICATION_EVENT_TIMEOUT_DEFAULT =
+      "10m";
+
   public static final String
       HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY =
       "hdds.scm.http.kerberos.principal";
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 462a07b..9fd4ef3 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2357,4 +2357,24 @@
       Request to flush the OM DB before taking checkpoint snapshot.
     </description>
   </property>
+  <property>
+    <name>hdds.scm.replication.thread.interval</name>
+    <value>5m</value>
+    <tag>OZONE, SCM</tag>
+    <description>
+      There is a replication monitor thread running inside SCM which
+      takes care of replicating the containers in the cluster. This
+      property is used to configure the interval in which that thread
+      runs.
+    </description>
+  </property>
+  <property>
+    <name>hdds.scm.replication.event.timeout</name>
+    <value>10m</value>
+    <tag>OZONE, SCM</tag>
+    <description>
+      Timeout for the container replication/deletion commands sent
+      to datanodes. After this timeout the command will be retried.
+    </description>
+  </property>
 </configuration>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
index b2fe4b4..717d58d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
@@ -34,6 +34,14 @@ import java.util.Set;
  */
 public interface ContainerManager extends Closeable {
 
+
+  /**
+   * Returns all the container Ids managed by ContainerManager.
+   *
+   * @return Set of ContainerID
+   */
+  Set<ContainerID> getContainerIDs();
+
   /**
    * Returns all the containers managed by ContainerManager.
    *
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
new file mode 100644
index 0000000..97c600b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -0,0 +1,748 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.GeneratedMessage;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.lock.LockManager;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * Replication Manager (RM) is the one which is responsible for making sure
+ * that the containers are properly replicated. Replication Manager deals only
+ * with Quasi Closed / Closed container.
+ */
+public class ReplicationManager {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReplicationManager.class);
+
+  /**
+   * Reference to the ContainerManager.
+   */
+  private final ContainerManager containerManager;
+
+  /**
+   * PlacementPolicy which is used to identify where a container
+   * should be replicated.
+   */
+  private final ContainerPlacementPolicy containerPlacement;
+
+  /**
+   * EventPublisher to fire Replicate and Delete container events.
+   */
+  private final EventPublisher eventPublisher;
+
+  /**
+   * Used for locking a container using its ID while processing it.
+   */
+  private final LockManager<ContainerID> lockManager;
+
+  /**
+   * This is used for tracking container replication commands which are issued
+   * by ReplicationManager and not yet complete.
+   */
+  private final Map<ContainerID, List<InflightAction>> inflightReplication;
+
+  /**
+   * This is used for tracking container deletion commands which are issued
+   * by ReplicationManager and not yet complete.
+   */
+  private final Map<ContainerID, List<InflightAction>> inflightDeletion;
+
+  /**
+   * ReplicationMonitor thread is the one which wakes up at configured
+   * interval and processes all the containers.
+   */
+  private final Thread replicationMonitor;
+
+  /**
+   * The frequency in which ReplicationMonitor thread should run.
+   */
+  private final long interval;
+
+  /**
+   * Timeout for container replication & deletion command issued by
+   * ReplicationManager.
+   */
+  private final long eventTimeout;
+
+  /**
+   * Flag used for checking if the ReplicationMonitor thread is running or
+   * not.
+   */
+  private volatile boolean running;
+
+  /**
+   * Constructs ReplicationManager instance with the given configuration.
+   *
+   * @param conf OzoneConfiguration
+   * @param containerManager ContainerManager
+   * @param containerPlacement ContainerPlacementPolicy
+   * @param eventPublisher EventPublisher
+   */
+  public ReplicationManager(final Configuration conf,
+                            final ContainerManager containerManager,
+                            final ContainerPlacementPolicy containerPlacement,
+                            final EventPublisher eventPublisher) {
+    this.containerManager = containerManager;
+    this.containerPlacement = containerPlacement;
+    this.eventPublisher = eventPublisher;
+    this.lockManager = new LockManager<>(conf);
+    this.inflightReplication = new HashMap<>();
+    this.inflightDeletion = new HashMap<>();
+    this.replicationMonitor = new Thread(this::run);
+    this.replicationMonitor.setName("ReplicationMonitor");
+    this.replicationMonitor.setDaemon(true);
+    this.interval = conf.getTimeDuration(
+        ScmConfigKeys.HDDS_SCM_REPLICATION_THREAD_INTERVAL,
+        ScmConfigKeys.HDDS_SCM_REPLICATION_THREAD_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    this.eventTimeout = conf.getTimeDuration(
+        ScmConfigKeys.HDDS_SCM_REPLICATION_EVENT_TIMEOUT,
+        ScmConfigKeys.HDDS_SCM_REPLICATION_EVENT_TIMEOUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    this.running = false;
+  }
+
+  /**
+   * Starts Replication Monitor thread.
+   */
+  public synchronized void start() {
+    if (!running) {
+      LOG.info("Starting Replication Monitor Thread.");
+      running = true;
+      replicationMonitor.start();
+    } else {
+      LOG.info("Replication Monitor Thread is already running.");
+    }
+  }
+
+  /**
+   * Process all the containers immediately.
+   */
+  @VisibleForTesting
+  @SuppressFBWarnings(value="NN_NAKED_NOTIFY",
+      justification="Used only for testing")
+  synchronized void processContainersNow() {
+    notify();
+  }
+
+  /**
+   * Stops Replication Monitor thread.
+   */
+  public synchronized void stop() {
+    if (running) {
+      LOG.info("Stopping Replication Monitor Thread.");
+      running = false;
+      notify();
+    } else {
+      LOG.info("Replication Monitor Thread is not running.");
+    }
+  }
+
+  /**
+   * ReplicationMonitor thread runnable. This wakes up at configured
+   * interval and processes all the containers in the system.
+   */
+  private synchronized void run() {
+    try {
+      while (running) {
+        final long start = Time.monotonicNow();
+        final Set<ContainerID> containerIds =
+            containerManager.getContainerIDs();
+        containerIds.forEach(this::processContainer);
+
+        LOG.info("Replication Monitor Thread took {} milliseconds for" +
+                " processing {} containers.", Time.monotonicNow() - start,
+            containerIds.size());
+
+        wait(interval);
+      }
+    } catch (Throwable t) {
+      // When we get runtime exception, we should terminate SCM.
+      LOG.error("Exception in Replication Monitor Thread.", t);
+      ExitUtil.terminate(1, t);
+    }
+  }
+
+  /**
+   * Process the given container.
+   *
+   * @param id ContainerID
+   */
+  private void processContainer(ContainerID id) {
+    lockManager.lock(id);
+    try {
+      final ContainerInfo container = containerManager.getContainer(id);
+      final Set<ContainerReplica> replicas = containerManager
+          .getContainerReplicas(container.containerID());
+      final LifeCycleState state = container.getState();
+
+      /*
+       * We don't take any action if the container is in OPEN state.
+       */
+      if (state == LifeCycleState.OPEN) {
+        return;
+      }
+
+      /*
+       * If the container is in CLOSING state, the replicas can either
+       * be in OPEN or in CLOSING state. In both of this cases
+       * we have to resend close container command to the datanodes.
+       */
+      if (state == LifeCycleState.CLOSING) {
+        replicas.forEach(replica -> sendCloseCommand(
+            container, replica.getDatanodeDetails(), false));
+        return;
+      }
+
+      /*
+       * If the container is in QUASI_CLOSED state, check and close the
+       * container if possible.
+       */
+      if (state == LifeCycleState.QUASI_CLOSED &&
+          canForceCloseContainer(container, replicas)) {
+        forceCloseContainer(container, replicas);
+        return;
+      }
+
+      /*
+       * Before processing the container we have to reconcile the
+       * inflightReplication and inflightDeletion actions.
+       *
+       * We remove the entry from inflightReplication and inflightDeletion
+       * list, if the operation is completed or if it has timed out.
+       */
+      updateInflightAction(container, inflightReplication,
+          action -> replicas.stream()
+              .anyMatch(r -> r.getDatanodeDetails().equals(action.datanode)));
+
+      updateInflightAction(container, inflightDeletion,
+          action -> replicas.stream()
+              .noneMatch(r -> r.getDatanodeDetails().equals(action.datanode)));
+
+
+      /*
+       * We don't have to take any action if the container is healthy.
+       *
+       * According to ReplicationMonitor container is considered healthy if
+       * the container is either in QUASI_CLOSED or in CLOSED state and has
+       * exact number of replicas in the same state.
+       */
+      if (isContainerHealthy(container, replicas)) {
+        return;
+      }
+
+      /*
+       * Check if the container is under replicated and take appropriate
+       * action.
+       */
+      if (isContainerUnderReplicated(container, replicas)) {
+        handleUnderReplicatedContainer(container, replicas);
+        return;
+      }
+
+      /*
+       * Check if the container is over replicated and take appropriate
+       * action.
+       */
+      if (isContainerOverReplicated(container, replicas)) {
+        handleOverReplicatedContainer(container, replicas);
+        return;
+      }
+
+      /*
+       * The container is neither under nor over replicated and the container
+       * is not healthy. This means that the container has unhealthy/corrupted
+       * replica.
+       */
+      handleUnstableContainer(container, replicas);
+
+    } catch (ContainerNotFoundException ex) {
+      LOG.warn("Missing container {}.", id);
+    } finally {
+      lockManager.unlock(id);
+    }
+  }
+
+  /**
+   * Reconciles the InflightActions for a given container.
+   *
+   * @param container Container to update
+   * @param inflightActions inflightReplication (or) inflightDeletion
+   * @param filter filter to check if the operation is completed
+   */
+  private void updateInflightAction(final ContainerInfo container,
+      final Map<ContainerID, List<InflightAction>> inflightActions,
+      final Predicate<InflightAction> filter) {
+    final ContainerID id = container.containerID();
+    final long deadline = Time.monotonicNow() - eventTimeout;
+    if (inflightActions.containsKey(id)) {
+      final List<InflightAction> actions = inflightActions.get(id);
+      actions.removeIf(action -> action.time < deadline);
+      actions.removeIf(filter);
+      if (actions.isEmpty()) {
+        inflightActions.remove(id);
+      }
+    }
+  }
+
+  /**
+   * Returns true if the container is healthy according to ReplicationMonitor.
+   *
+   * According to ReplicationMonitor container is considered healthy if
+   * it has exact number of replicas in the same state as the container.
+   *
+   * @param container Container to check
+   * @param replicas Set of ContainerReplicas
+   * @return true if the container is healthy, false otherwise
+   */
+  private boolean isContainerHealthy(final ContainerInfo container,
+                                     final Set<ContainerReplica> replicas) {
+    return container.getReplicationFactor().getNumber() == replicas.size() &&
+        replicas.stream().allMatch(
+            r -> compareState(container.getState(), r.getState()));
+  }
+
+  /**
+   * Checks if the container is under replicated or not.
+   *
+   * @param container Container to check
+   * @param replicas Set of ContainerReplicas
+   * @return true if the container is under replicated, false otherwise
+   */
+  private boolean isContainerUnderReplicated(final ContainerInfo container,
+      final Set<ContainerReplica> replicas) {
+    return container.getReplicationFactor().getNumber() >
+        getReplicaCount(container.containerID(), replicas);
+  }
+
+  /**
+   * Checks if the container is over replicated or not.
+   *
+   * @param container Container to check
+   * @param replicas Set of ContainerReplicas
+   * @return true if the container if over replicated, false otherwise
+   */
+  private boolean isContainerOverReplicated(final ContainerInfo container,
+      final Set<ContainerReplica> replicas) {
+    return container.getReplicationFactor().getNumber() <
+        getReplicaCount(container.containerID(), replicas);
+  }
+
+  /**
+   * Returns the replication count of the given container. This also
+   * considers inflight replication and deletion.
+   *
+   * @param id ContainerID
+   * @param replicas Set of existing replicas
+   * @return number of estimated replicas for this container
+   */
+  private int getReplicaCount(final ContainerID id,
+                              final Set<ContainerReplica> replicas) {
+    return replicas.size()
+        + inflightReplication.getOrDefault(id, Collections.emptyList()).size()
+        - inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
+  }
+
+  /**
+   * Returns true if more than 50% of the container replicas with unique
+   * originNodeId are in QUASI_CLOSED state.
+   *
+   * @param container Container to check
+   * @param replicas Set of ContainerReplicas
+   * @return true if we can force close the container, false otherwise
+   */
+  private boolean canForceCloseContainer(final ContainerInfo container,
+      final Set<ContainerReplica> replicas) {
+    Preconditions.assertTrue(container.getState() ==
+        LifeCycleState.QUASI_CLOSED);
+    final int replicationFactor = container.getReplicationFactor().getNumber();
+    final long uniqueQuasiClosedReplicaCount = replicas.stream()
+        .filter(r -> r.getState() == State.QUASI_CLOSED)
+        .map(ContainerReplica::getOriginDatanodeId)
+        .distinct()
+        .count();
+    return uniqueQuasiClosedReplicaCount > (replicationFactor / 2);
+  }
+
+  /**
+   * Force close the container replica(s) with highest sequence Id.
+   *
+   * <p>
+   *   Note: We should force close the container only if >50% (quorum)
+   *   of replicas with unique originNodeId are in QUASI_CLOSED state.
+   * </p>
+   *
+   * @param container ContainerInfo
+   * @param replicas Set of ContainerReplicas
+   */
+  private void forceCloseContainer(final ContainerInfo container,
+                                   final Set<ContainerReplica> replicas) {
+    Preconditions.assertTrue(container.getState() ==
+        LifeCycleState.QUASI_CLOSED);
+
+    final List<ContainerReplica> quasiClosedReplicas = replicas.stream()
+        .filter(r -> r.getState() == State.QUASI_CLOSED)
+        .collect(Collectors.toList());
+
+    final Long sequenceId = quasiClosedReplicas.stream()
+        .map(ContainerReplica::getSequenceId)
+        .max(Long::compare)
+        .orElse(-1L);
+
+    LOG.info("Force closing container {} with BCSID {}," +
+        " which is in QUASI_CLOSED state.",
+        container.containerID(), sequenceId);
+
+    quasiClosedReplicas.stream()
+        .filter(r -> sequenceId != -1L)
+        .filter(replica -> replica.getSequenceId().equals(sequenceId))
+        .forEach(replica -> sendCloseCommand(
+            container, replica.getDatanodeDetails(), true));
+  }
+
+  /**
+   * If the given container is under replicated, identify a new set of
+   * datanode(s) to replicate the container using ContainerPlacementPolicy
+   * and send replicate container command to the identified datanode(s).
+   *
+   * @param container ContainerInfo
+   * @param replicas Set of ContainerReplicas
+   */
+  private void handleUnderReplicatedContainer(final ContainerInfo container,
+      final Set<ContainerReplica> replicas) {
+    try {
+      final ContainerID id = container.containerID();
+      final List<DatanodeDetails> deletionInFlight = inflightDeletion
+          .getOrDefault(id, Collections.emptyList())
+          .stream()
+          .map(action -> action.datanode)
+          .collect(Collectors.toList());
+      final List<DatanodeDetails> source = replicas.stream()
+          .filter(r ->
+              r.getState() == State.QUASI_CLOSED ||
+              r.getState() == State.CLOSED)
+          .filter(r -> !deletionInFlight.contains(r.getDatanodeDetails()))
+          .sorted((r1, r2) -> r2.getSequenceId().compareTo(r1.getSequenceId()))
+          .map(ContainerReplica::getDatanodeDetails)
+          .collect(Collectors.toList());
+      if (source.size() > 0) {
+        final int replicationFactor = container
+            .getReplicationFactor().getNumber();
+        final int delta = replicationFactor - getReplicaCount(id, replicas);
+        final List<DatanodeDetails> selectedDatanodes = containerPlacement
+            .chooseDatanodes(source, delta, container.getUsedBytes());
+
+        LOG.info("Container {} is under replicated. Expected replica count" +
+                " is {}, but found {}.", id, replicationFactor,
+            replicationFactor - delta);
+
+        for (DatanodeDetails datanode : selectedDatanodes) {
+          sendReplicateCommand(container, datanode, source);
+        }
+      } else {
+        LOG.warn("Cannot replicate container {}, no healthy replica found.",
+            container.containerID());
+      }
+    } catch (IOException ex) {
+      LOG.warn("Exception while replicating container {}.",
+          container.getContainerID(), ex);
+    }
+  }
+
+  /**
+   * If the given container is over replicated, identify the datanode(s)
+   * to delete the container and send delete container command to the
+   * identified datanode(s).
+   *
+   * @param container ContainerInfo
+   * @param replicas Set of ContainerReplicas
+   */
+  private void handleOverReplicatedContainer(final ContainerInfo container,
+      final Set<ContainerReplica> replicas) {
+
+    final ContainerID id = container.containerID();
+    final int replicationFactor = container.getReplicationFactor().getNumber();
+    // Dont consider inflight replication while calculating excess here.
+    final int excess = replicas.size() - replicationFactor -
+        inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
+
+    if (excess > 0) {
+
+      LOG.info("Container {} is over replicated. Expected replica count" +
+              " is {}, but found {}.", id, replicationFactor,
+          replicationFactor + excess);
+
+      final Map<UUID, ContainerReplica> uniqueReplicas =
+          new LinkedHashMap<>();
+
+      replicas.stream()
+          .filter(r -> compareState(container.getState(), r.getState()))
+          .forEach(r -> uniqueReplicas
+              .putIfAbsent(r.getOriginDatanodeId(), r));
+
+      // Retain one healthy replica per origin node Id.
+      final List<ContainerReplica> eligibleReplicas = new ArrayList<>(replicas);
+      eligibleReplicas.removeAll(uniqueReplicas.values());
+
+      final List<ContainerReplica> unhealthyReplicas = eligibleReplicas
+          .stream()
+          .filter(r -> !compareState(container.getState(), r.getState()))
+          .collect(Collectors.toList());
+
+      //Move the unhealthy replicas to the front of eligible replicas to delete
+      eligibleReplicas.removeAll(unhealthyReplicas);
+      eligibleReplicas.addAll(0, unhealthyReplicas);
+
+      for (int i = 0; i < excess; i++) {
+        sendDeleteCommand(container,
+            eligibleReplicas.get(i).getDatanodeDetails(), true);
+      }
+    }
+  }
+
+  /**
+   * Handles unstable container.
+   * A container is inconsistent if any of the replica state doesn't
+   * match the container state. We have to take appropriate action
+   * based on state of the replica.
+   *
+   * @param container ContainerInfo
+   * @param replicas Set of ContainerReplicas
+   */
+  private void handleUnstableContainer(final ContainerInfo container,
+      final Set<ContainerReplica> replicas) {
+    // Find unhealthy replicas
+    List<ContainerReplica> unhealthyReplicas = replicas.stream()
+        .filter(r -> !compareState(container.getState(), r.getState()))
+        .collect(Collectors.toList());
+
+    Iterator<ContainerReplica> iterator = unhealthyReplicas.iterator();
+    while (iterator.hasNext()) {
+      final ContainerReplica replica = iterator.next();
+      final State state = replica.getState();
+      if (state == State.OPEN || state == State.CLOSING) {
+        sendCloseCommand(container, replica.getDatanodeDetails(), false);
+        iterator.remove();
+      }
+
+      if (state == State.QUASI_CLOSED) {
+        // Send force close command if the BCSID matches
+        if (container.getSequenceId() == replica.getSequenceId()) {
+          sendCloseCommand(container, replica.getDatanodeDetails(), true);
+          iterator.remove();
+        }
+      }
+    }
+
+    // Now we are left with the replicas which are either unhealthy or
+    // the BCSID doesn't match. These replicas should be deleted.
+
+    /*
+     * If we have unhealthy replicas we go under replicated and then
+     * replicate the healthy copy.
+     *
+     * We also make sure that we delete only one unhealthy replica at a time.
+     *
+     * If there are two unhealthy replica:
+     *  - Delete first unhealthy replica
+     *  - Re-replicate the healthy copy
+     *  - Delete second unhealthy replica
+     *  - Re-replicate the healthy copy
+     *
+     * Note: Only one action will be executed in a single ReplicationMonitor
+     *       iteration. So to complete all the above actions we need four
+     *       ReplicationMonitor iterations.
+     */
+
+    unhealthyReplicas.stream().findFirst().ifPresent(replica ->
+        sendDeleteCommand(container, replica.getDatanodeDetails(), false));
+
+  }
+
+  /**
+   * Sends close container command for the given container to the given
+   * datanode.
+   *
+   * @param container Container to be closed
+   * @param datanode The datanode on which the container
+   *                  has to be closed
+   * @param force Should be set to true if we want to close a
+   *               QUASI_CLOSED container
+   */
+  private void sendCloseCommand(final ContainerInfo container,
+                                final DatanodeDetails datanode,
+                                final boolean force) {
+
+    LOG.info("Sending close container command for container {}" +
+            " to datanode {}.", container.containerID(), datanode);
+
+    CloseContainerCommand closeContainerCommand =
+        new CloseContainerCommand(container.getContainerID(),
+            container.getPipelineID(), force);
+    eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
+        new CommandForDatanode<>(datanode.getUuid(), closeContainerCommand));
+  }
+
+  /**
+   * Sends replicate container command for the given container to the given
+   * datanode.
+   *
+   * @param container Container to be replicated
+   * @param datanode The destination datanode to replicate
+   * @param sources List of source nodes from where we can replicate
+   */
+  private void sendReplicateCommand(final ContainerInfo container,
+                                    final DatanodeDetails datanode,
+                                    final List<DatanodeDetails> sources) {
+
+    LOG.info("Sending replicate container command for container {}" +
+            " to datanode {}", container.containerID(), datanode);
+
+    final ContainerID id = container.containerID();
+    final ReplicateContainerCommand replicateCommand =
+        new ReplicateContainerCommand(id.getId(), sources);
+    inflightReplication.computeIfAbsent(id, k -> new ArrayList<>());
+    sendAndTrackDatanodeCommand(datanode, replicateCommand,
+        action -> inflightReplication.get(id).add(action));
+  }
+
+  /**
+   * Sends delete container command for the given container to the given
+   * datanode.
+   *
+   * @param container Container to be deleted
+   * @param datanode The datanode on which the replica should be deleted
+   * @param force Should be set to true to delete an OPEN replica
+   */
+  private void sendDeleteCommand(final ContainerInfo container,
+                                 final DatanodeDetails datanode,
+                                 final boolean force) {
+
+    LOG.info("Sending delete container command for container {}" +
+            " to datanode {}", container.containerID(), datanode);
+
+    final ContainerID id = container.containerID();
+    final DeleteContainerCommand deleteCommand =
+        new DeleteContainerCommand(id.getId(), force);
+    inflightDeletion.computeIfAbsent(id, k -> new ArrayList<>());
+    sendAndTrackDatanodeCommand(datanode, deleteCommand,
+        action -> inflightDeletion.get(id).add(action));
+  }
+
+  /**
+   * Creates CommandForDatanode with the given SCMCommand and fires
+   * DATANODE_COMMAND event to event queue.
+   *
+   * Tracks the command using the given tracker.
+   *
+   * @param datanode Datanode to which the command has to be sent
+   * @param command SCMCommand to be sent
+   * @param tracker Tracker which tracks the inflight actions
+   * @param <T> Type of SCMCommand
+   */
+  private <T extends GeneratedMessage> void sendAndTrackDatanodeCommand(
+      final DatanodeDetails datanode,
+      final SCMCommand<T> command,
+      final Consumer<InflightAction> tracker) {
+    final CommandForDatanode<T> datanodeCommand =
+        new CommandForDatanode<>(datanode.getUuid(), command);
+    eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+    tracker.accept(new InflightAction(datanode, Time.monotonicNow()));
+  }
+
+  /**
+   * Compares the container state with the replica state.
+   *
+   * @param containerState ContainerState
+   * @param replicaState ReplicaState
+   * @return true if the state matches, false otherwise
+   */
+  private static boolean compareState(final LifeCycleState containerState,
+                                      final State replicaState) {
+    switch (containerState) {
+    case OPEN:
+      return replicaState == State.OPEN;
+    case CLOSING:
+      return replicaState == State.CLOSING;
+    case QUASI_CLOSED:
+      return replicaState == State.QUASI_CLOSED;
+    case CLOSED:
+      return replicaState == State.CLOSED;
+    case DELETING:
+      return false;
+    case DELETED:
+      return false;
+    default:
+      return false;
+    }
+  }
+
+  /**
+   * Wrapper class to hold the InflightAction with its start time.
+   */
+  private static final class InflightAction {
+
+    private final DatanodeDetails datanode;
+    private final long time;
+
+    private InflightAction(final DatanodeDetails datanode,
+                           final long time) {
+      this.datanode = datanode;
+      this.time = time;
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index 2615289..6dd1949 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -132,6 +132,16 @@ public class SCMContainerManager implements ContainerManager {
   }
 
   @Override
+  public Set<ContainerID> getContainerIDs() {
+    lock.lock();
+    try {
+      return containerStateManager.getAllContainerIDs();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
   public List<ContainerInfo> getContainers() {
     lock.lock();
     try {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 0c738b2..2aba724 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.TreeSet;
@@ -107,9 +106,9 @@ public class ContainerStateMap {
     this.ownerMap = new ContainerAttribute<>();
     this.factorMap = new ContainerAttribute<>();
     this.typeMap = new ContainerAttribute<>();
-    this.containerMap = new HashMap<>();
+    this.containerMap = new ConcurrentHashMap<>();
     this.lock = new ReentrantReadWriteLock();
-    this.replicaMap = new HashMap<>();
+    this.replicaMap = new ConcurrentHashMap<>();
     this.resultCache = new ConcurrentHashMap<>();
   }
 
@@ -208,7 +207,7 @@ public class ContainerStateMap {
     try {
       checkIfContainerExist(containerID);
       return Collections
-          .unmodifiableSet(new HashSet<>(replicaMap.get(containerID)));
+          .unmodifiableSet(replicaMap.get(containerID));
     } finally {
       lock.readLock().unlock();
     }
@@ -342,7 +341,7 @@ public class ContainerStateMap {
   }
 
   public Set<ContainerID> getAllContainerIDs() {
-    return containerMap.keySet();
+    return Collections.unmodifiableSet(containerMap.keySet());
   }
 
   /**
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index a5284e4..19c35fd 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.hdds.scm;
 
 import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineAction;
@@ -30,6 +31,8 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineReport;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server
@@ -66,7 +69,9 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -98,7 +103,7 @@ public final class TestUtils {
    *
    * @return DatanodeDetails
    */
-  private static DatanodeDetails createDatanodeDetails(UUID uuid) {
+  public static DatanodeDetails createDatanodeDetails(UUID uuid) {
     String ipAddress = random.nextInt(256)
         + "." + random.nextInt(256)
         + "." + random.nextInt(256)
@@ -521,4 +526,50 @@ public final class TestUtils {
     return new StorageContainerManager(conf, configurator);
   }
 
+  public static ContainerInfo getContainer(
+      final HddsProtos.LifeCycleState state) {
+    return new ContainerInfo.Builder()
+        .setContainerID(RandomUtils.nextLong())
+        .setReplicationType(HddsProtos.ReplicationType.RATIS)
+        .setReplicationFactor(HddsProtos.ReplicationFactor.THREE)
+        .setState(state)
+        .setOwner("TEST")
+        .build();
+  }
+
+  public static Set<ContainerReplica> getReplicas(
+      final ContainerID containerId,
+      final ContainerReplicaProto.State state,
+      final DatanodeDetails... datanodeDetails) {
+    return getReplicas(containerId, state, 10000L, datanodeDetails);
+  }
+
+  public static Set<ContainerReplica> getReplicas(
+      final ContainerID containerId,
+      final ContainerReplicaProto.State state,
+      final long sequenceId,
+      final DatanodeDetails... datanodeDetails) {
+    Set<ContainerReplica> replicas = new HashSet<>();
+    for (DatanodeDetails datanode : datanodeDetails) {
+      replicas.add(getReplicas(containerId, state,
+          sequenceId, datanode.getUuid(), datanode));
+    }
+    return replicas;
+  }
+
+  public static ContainerReplica getReplicas(
+      final ContainerID containerId,
+      final ContainerReplicaProto.State state,
+      final long sequenceId,
+      final UUID originNodeId,
+      final DatanodeDetails datanodeDetails) {
+    return ContainerReplica.newBuilder()
+        .setContainerID(containerId)
+        .setContainerState(state)
+        .setDatanodeDetails(datanodeDetails)
+        .setOriginNodeId(originNodeId)
+        .setSequenceId(sequenceId)
+        .build();
+  }
+
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index 864a1a9..0b7cae4 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -47,10 +47,10 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.hadoop.hdds.scm.container
-    .TestContainerReportHelper.getReplicas;
-import static org.apache.hadoop.hdds.scm.container
-    .TestContainerReportHelper.getContainer;
+import static org.apache.hadoop.hdds.scm.TestUtils
+    .getReplicas;
+import static org.apache.hadoop.hdds.scm.TestUtils
+    .getContainer;
 import static org.apache.hadoop.hdds.scm.container
     .TestContainerReportHelper.addContainerToContainerManager;
 import static org.apache.hadoop.hdds.scm.container
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java
index 0fb50a4..860ec4d 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java
@@ -16,19 +16,12 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
-import org.apache.commons.lang3.RandomUtils;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.Set;
 
 /**
@@ -77,37 +70,4 @@ public final class TestContainerReportHelper {
         containerInfo.containerID(), event);
   }
 
-  public static ContainerInfo getContainer(final LifeCycleState state) {
-    return new ContainerInfo.Builder()
-        .setContainerID(RandomUtils.nextLong())
-        .setReplicationType(ReplicationType.RATIS)
-        .setReplicationFactor(ReplicationFactor.THREE)
-        .setState(state)
-        .build();
-  }
-
-  static Set<ContainerReplica> getReplicas(
-      final ContainerID containerId,
-      final ContainerReplicaProto.State state,
-      final DatanodeDetails... datanodeDetails) {
-    return getReplicas(containerId, state, 10000L, datanodeDetails);
-  }
-
-  static Set<ContainerReplica> getReplicas(
-      final ContainerID containerId,
-      final ContainerReplicaProto.State state,
-      final long sequenceId,
-      final DatanodeDetails... datanodeDetails) {
-    Set<ContainerReplica> replicas = new HashSet<>();
-    for (DatanodeDetails datanode : datanodeDetails) {
-      replicas.add(ContainerReplica.newBuilder()
-          .setContainerID(containerId)
-          .setContainerState(state)
-          .setDatanodeDetails(datanode)
-          .setOriginNodeId(datanode.getUuid())
-          .setSequenceId(sequenceId)
-          .build());
-    }
-    return replicas;
-  }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
index 23e96dd..6c9383f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
@@ -38,10 +38,8 @@ import java.util.Set;
 
 import static org.apache.hadoop.hdds.scm.container
     .TestContainerReportHelper.addContainerToContainerManager;
-import static org.apache.hadoop.hdds.scm.container
-    .TestContainerReportHelper.getContainer;
-import static org.apache.hadoop.hdds.scm.container
-    .TestContainerReportHelper.getReplicas;
+import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
+import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
 import static org.apache.hadoop.hdds.scm.container
     .TestContainerReportHelper.mockUpdateContainerReplica;
 import static org.apache.hadoop.hdds.scm.container
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
new file mode 100644
index 0000000..83b9aa3
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -0,0 +1,625 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.hadoop.hdds.scm.TestUtils.createDatanodeDetails;
+import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
+import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
+import static org.apache.hadoop.hdds.scm.TestUtils.randomDatanodeDetails;
+
+/**
+ * Test cases to verify the functionality of ReplicationManager.
+ */
+public class TestReplicationManager {
+
+  private ReplicationManager replicationManager;
+  private ContainerStateManager containerStateManager;
+  private ContainerPlacementPolicy containerPlacementPolicy;
+  private EventQueue eventQueue;
+  private DatanodeCommandHandler datanodeCommandHandler;
+
+  @Before
+  public void setup() throws IOException, InterruptedException {
+    final Configuration conf = new OzoneConfiguration();
+    final ContainerManager containerManager =
+        Mockito.mock(ContainerManager.class);
+    eventQueue = new EventQueue();
+    containerStateManager = new ContainerStateManager(conf);
+
+    datanodeCommandHandler = new DatanodeCommandHandler();
+    eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, datanodeCommandHandler);
+
+    Mockito.when(containerManager.getContainerIDs())
+        .thenAnswer(invocation -> containerStateManager.getAllContainerIDs());
+
+    Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
+        .thenAnswer(invocation -> containerStateManager
+            .getContainer((ContainerID)invocation.getArguments()[0]));
+
+    Mockito.when(containerManager.getContainerReplicas(
+        Mockito.any(ContainerID.class)))
+        .thenAnswer(invocation -> containerStateManager
+            .getContainerReplicas((ContainerID)invocation.getArguments()[0]));
+
+    containerPlacementPolicy = Mockito.mock(ContainerPlacementPolicy.class);
+
+    Mockito.when(containerPlacementPolicy.chooseDatanodes(
+        Mockito.anyListOf(DatanodeDetails.class),
+        Mockito.anyInt(), Mockito.anyLong()))
+        .thenAnswer(invocation -> {
+          int count = (int) invocation.getArguments()[1];
+          return IntStream.range(0, count)
+              .mapToObj(i -> randomDatanodeDetails())
+              .collect(Collectors.toList());
+        });
+
+    replicationManager = new ReplicationManager(
+        conf, containerManager, containerPlacementPolicy, eventQueue);
+    replicationManager.start();
+    Thread.sleep(100L);
+  }
+
+  /**
+   * Open containers are not handled by ReplicationManager.
+   * This test-case makes sure that ReplicationManages doesn't take
+   * any action on OPEN containers.
+   */
+  @Test
+  public void testOpenContainer() throws SCMException, InterruptedException {
+    final ContainerInfo container = getContainer(LifeCycleState.OPEN);
+    containerStateManager.loadContainer(container);
+    replicationManager.processContainersNow();
+    // Wait for EventQueue to call the event handler
+    Thread.sleep(100L);
+    Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
+
+  }
+
+  /**
+   * If the container is in CLOSING state we resend close container command
+   * to all the datanodes.
+   */
+  @Test
+  public void testClosingContainer() throws
+      SCMException, ContainerNotFoundException, InterruptedException {
+    final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
+    final ContainerID id = container.containerID();
+
+    containerStateManager.loadContainer(container);
+
+    // Two replicas in CLOSING state
+    final Set<ContainerReplica> replicas = getReplicas(id, State.CLOSING,
+        randomDatanodeDetails(),
+        randomDatanodeDetails());
+
+    // One replica in OPEN state
+    final DatanodeDetails datanode = randomDatanodeDetails();
+    replicas.addAll(getReplicas(id, State.OPEN, datanode));
+
+    for (ContainerReplica replica : replicas) {
+      containerStateManager.updateContainerReplica(id, replica);
+    }
+
+    final int currentCloseCommandCount = datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
+
+    replicationManager.processContainersNow();
+    // Wait for EventQueue to call the event handler
+    Thread.sleep(100L);
+    Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
+
+    // Update the OPEN to CLOSING
+    for (ContainerReplica replica : getReplicas(id, State.CLOSING, datanode)) {
+      containerStateManager.updateContainerReplica(id, replica);
+    }
+
+    replicationManager.processContainersNow();
+    // Wait for EventQueue to call the event handler
+    Thread.sleep(100L);
+    Assert.assertEquals(currentCloseCommandCount + 6, datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
+  }
+
+
+  /**
+   * The container is QUASI_CLOSED but two of the replica is still in
+   * open state. ReplicationManager should resend close command to those
+   * datanodes.
+   */
+  @Test
+  public void testQuasiClosedContainerWithTwoOpenReplica() throws
+      SCMException, ContainerNotFoundException, InterruptedException {
+    final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+    final ContainerID id = container.containerID();
+    final UUID originNodeId = UUID.randomUUID();
+    final ContainerReplica replicaOne = getReplicas(
+        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+    final ContainerReplica replicaTwo = getReplicas(
+        id, State.OPEN, 1000L, originNodeId, randomDatanodeDetails());
+    final DatanodeDetails datanodeDetails = randomDatanodeDetails();
+    final ContainerReplica replicaThree = getReplicas(
+        id, State.OPEN, 1000L, datanodeDetails.getUuid(), datanodeDetails);
+
+    containerStateManager.loadContainer(container);
+    containerStateManager.updateContainerReplica(id, replicaOne);
+    containerStateManager.updateContainerReplica(id, replicaTwo);
+    containerStateManager.updateContainerReplica(id, replicaThree);
+
+    final int currentCloseCommandCount = datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
+    // Two of the replicas are in OPEN state
+    replicationManager.processContainersNow();
+    // Wait for EventQueue to call the event handler
+    Thread.sleep(100L);
+    Assert.assertEquals(currentCloseCommandCount + 2, datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
+    Assert.assertTrue(datanodeCommandHandler.received(
+        SCMCommandProto.Type.closeContainerCommand,
+        replicaTwo.getDatanodeDetails()));
+    Assert.assertTrue(datanodeCommandHandler.received(
+        SCMCommandProto.Type.closeContainerCommand,
+        replicaThree.getDatanodeDetails()));
+  }
+
+  /**
+   * When the container is in QUASI_CLOSED state and all the replicas are
+   * also in QUASI_CLOSED state and doesn't have a quorum to force close
+   * the container, ReplicationManager will not do anything.
+   */
+  @Test
+  public void testHealthyQuasiClosedContainer() throws
+      SCMException, ContainerNotFoundException, InterruptedException {
+    final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+    final ContainerID id = container.containerID();
+    final UUID originNodeId = UUID.randomUUID();
+    final ContainerReplica replicaOne = getReplicas(
+        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+    final ContainerReplica replicaTwo = getReplicas(
+        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+    final ContainerReplica replicaThree = getReplicas(
+        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+
+    containerStateManager.loadContainer(container);
+    containerStateManager.updateContainerReplica(id, replicaOne);
+    containerStateManager.updateContainerReplica(id, replicaTwo);
+    containerStateManager.updateContainerReplica(id, replicaThree);
+
+    // All the QUASI_CLOSED replicas have same originNodeId, so the
+    // container will not be closed. ReplicationManager should take no action.
+    replicationManager.processContainersNow();
+    // Wait for EventQueue to call the event handler
+    Thread.sleep(100L);
+    Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
+  }
+
+  /**
+   * When a container is QUASI_CLOSED and we don't have quorum to force close
+   * the container, the container should have all the replicas in QUASI_CLOSED
+   * state, else ReplicationManager will take action.
+   *
+   * In this test case we make one of the replica unhealthy, replication manager
+   * will send delete container command to the datanode which has the unhealthy
+   * replica.
+   */
+  @Test
+  public void testQuasiClosedContainerWithUnhealthyReplica()
+      throws SCMException, ContainerNotFoundException, InterruptedException,
+      ContainerReplicaNotFoundException {
+    final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+    final ContainerID id = container.containerID();
+    final UUID originNodeId = UUID.randomUUID();
+    final ContainerReplica replicaOne = getReplicas(
+        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+    final ContainerReplica replicaTwo = getReplicas(
+        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+    final ContainerReplica replicaThree = getReplicas(
+        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+
+    containerStateManager.loadContainer(container);
+    containerStateManager.updateContainerReplica(id, replicaOne);
+    containerStateManager.updateContainerReplica(id, replicaTwo);
+    containerStateManager.updateContainerReplica(id, replicaThree);
+
+    final int currentDeleteCommandCount = datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
+    final int currentReplicateCommandCount = datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
+
+    // All the QUASI_CLOSED replicas have same originNodeId, so the
+    // container will not be closed. ReplicationManager should take no action.
+    replicationManager.processContainersNow();
+    // Wait for EventQueue to call the event handler
+    Thread.sleep(100L);
+    Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
+
+    // Make the first replica unhealthy
+    final ContainerReplica unhealthyReplica = getReplicas(
+        id, State.UNHEALTHY, 1000L, originNodeId,
+        replicaOne.getDatanodeDetails());
+    containerStateManager.updateContainerReplica(id, unhealthyReplica);
+
+    replicationManager.processContainersNow();
+    // Wait for EventQueue to call the event handler
+    Thread.sleep(100L);
+    Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+    Assert.assertTrue(datanodeCommandHandler.received(
+        SCMCommandProto.Type.deleteContainerCommand,
+        replicaOne.getDatanodeDetails()));
+
+    // Now we will delete the unhealthy replica from in-memory.
+    containerStateManager.removeContainerReplica(id, replicaOne);
+
+    // The container is under replicated as unhealthy replica is removed
+    replicationManager.processContainersNow();
+    // Wait for EventQueue to call the event handler
+    Thread.sleep(100L);
+
+    // We should get replicate command
+    Assert.assertEquals(currentReplicateCommandCount + 1,
+        datanodeCommandHandler.getInvocationCount(
+            SCMCommandProto.Type.replicateContainerCommand));
+  }
+
+  /**
+   * When a QUASI_CLOSED container is over replicated, ReplicationManager
+   * deletes the excess replicas.
+   */
+  @Test
+  public void testOverReplicatedQuasiClosedContainer() throws
+      SCMException, ContainerNotFoundException, InterruptedException {
+    final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+    final ContainerID id = container.containerID();
+    final UUID originNodeId = UUID.randomUUID();
+    final ContainerReplica replicaOne = getReplicas(
+        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+    final ContainerReplica replicaTwo = getReplicas(
+        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+    final ContainerReplica replicaThree = getReplicas(
+        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+    final ContainerReplica replicaFour = getReplicas(
+        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+
+    containerStateManager.loadContainer(container);
+    containerStateManager.updateContainerReplica(id, replicaOne);
+    containerStateManager.updateContainerReplica(id, replicaTwo);
+    containerStateManager.updateContainerReplica(id, replicaThree);
+    containerStateManager.updateContainerReplica(id, replicaFour);
+
+    final int currentDeleteCommandCount = datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
+
+    replicationManager.processContainersNow();
+    // Wait for EventQueue to call the event handler
+    Thread.sleep(100L);
+    Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+  }
+
+  /**
+   * When a QUASI_CLOSED container is over replicated, ReplicationManager
+   * deletes the excess replicas. While choosing the replica for deletion
+   * ReplicationManager should prioritize unhealthy replica over QUASI_CLOSED
+   * replica.
+   */
+  @Test
+  public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica()
+      throws SCMException, ContainerNotFoundException, InterruptedException {
+    final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+    final ContainerID id = container.containerID();
+    final UUID originNodeId = UUID.randomUUID();
+    final ContainerReplica replicaOne = getReplicas(
+        id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
+    final ContainerReplica replicaTwo = getReplicas(
+        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+    final ContainerReplica replicaThree = getReplicas(
+        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+    final ContainerReplica replicaFour = getReplicas(
+        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+
+    containerStateManager.loadContainer(container);
+    containerStateManager.updateContainerReplica(id, replicaOne);
+    containerStateManager.updateContainerReplica(id, replicaTwo);
+    containerStateManager.updateContainerReplica(id, replicaThree);
+    containerStateManager.updateContainerReplica(id, replicaFour);
+
+    final int currentDeleteCommandCount = datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
+
+    replicationManager.processContainersNow();
+    // Wait for EventQueue to call the event handler
+    Thread.sleep(100L);
+    Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+    Assert.assertTrue(datanodeCommandHandler.received(
+        SCMCommandProto.Type.deleteContainerCommand,
+        replicaOne.getDatanodeDetails()));
+  }
+
+  /**
+   * ReplicationManager should replicate an QUASI_CLOSED replica if it is
+   * under replicated.
+   */
+  @Test
+  public void testUnderReplicatedQuasiClosedContainer() throws
+      SCMException, ContainerNotFoundException, InterruptedException {
+    final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+    final ContainerID id = container.containerID();
+    final UUID originNodeId = UUID.randomUUID();
+    final ContainerReplica replicaOne = getReplicas(
+        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+    final ContainerReplica replicaTwo = getReplicas(
+        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+
+    containerStateManager.loadContainer(container);
+    containerStateManager.updateContainerReplica(id, replicaOne);
+    containerStateManager.updateContainerReplica(id, replicaTwo);
+
+    final int currentReplicateCommandCount = datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
+
+    replicationManager.processContainersNow();
+    // Wait for EventQueue to call the event handler
+    Thread.sleep(100L);
+    Assert.assertEquals(currentReplicateCommandCount + 1,
+        datanodeCommandHandler.getInvocationCount(
+            SCMCommandProto.Type.replicateContainerCommand));
+  }
+
+  /**
+   * When a QUASI_CLOSED container is under replicated, ReplicationManager
+   * should re-replicate it. If there are any unhealthy replica, it has to
+   * be deleted.
+   *
+   * In this test case, the container is QUASI_CLOSED and is under replicated
+   * and also has an unhealthy replica.
+   *
+   * In the first iteration of ReplicationManager, it should re-replicate
+   * the container so that it has enough replicas.
+   *
+   * In the second iteration, ReplicationManager should delete the unhealthy
+   * replica.
+   *
+   * In the third iteration, ReplicationManager will re-replicate as the
+   * container has again become under replicated after the unhealthy
+   * replica has been deleted.
+   *
+   */
+  @Test
+  public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica()
+      throws SCMException, ContainerNotFoundException, InterruptedException,
+      ContainerReplicaNotFoundException {
+    final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+    final ContainerID id = container.containerID();
+    final UUID originNodeId = UUID.randomUUID();
+    final ContainerReplica replicaOne = getReplicas(
+        id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+    final ContainerReplica replicaTwo = getReplicas(
+        id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
+
+    containerStateManager.loadContainer(container);
+    containerStateManager.updateContainerReplica(id, replicaOne);
+    containerStateManager.updateContainerReplica(id, replicaTwo);
+
+    final int currentReplicateCommandCount = datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
+    final int currentDeleteCommandCount = datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
+
+    replicationManager.processContainersNow();
+    // Wait for EventQueue to call the event handler
+    Thread.sleep(100L);
+    Assert.assertEquals(currentReplicateCommandCount + 1,
+        datanodeCommandHandler.getInvocationCount(
+            SCMCommandProto.Type.replicateContainerCommand));
+
+    Optional<CommandForDatanode> replicateCommand = datanodeCommandHandler
+        .getReceivedCommands().stream()
+        .filter(c -> c.getCommand().getType()
+            .equals(SCMCommandProto.Type.replicateContainerCommand))
+        .findFirst();
+
+    Assert.assertTrue(replicateCommand.isPresent());
+
+    DatanodeDetails newNode = createDatanodeDetails(
+        replicateCommand.get().getDatanodeId());
+    ContainerReplica newReplica = getReplicas(
+        id, State.QUASI_CLOSED, 1000L, originNodeId, newNode);
+    containerStateManager.updateContainerReplica(id, newReplica);
+
+    /*
+     * We have report the replica to SCM, in the next ReplicationManager
+     * iteration it should delete the unhealthy replica.
+     */
+
+    replicationManager.processContainersNow();
+    // Wait for EventQueue to call the event handler
+    Thread.sleep(100L);
+    Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
+    // ReplicaTwo should be deleted, that is the unhealthy one
+    Assert.assertTrue(datanodeCommandHandler.received(
+        SCMCommandProto.Type.deleteContainerCommand,
+        replicaTwo.getDatanodeDetails()));
+
+    containerStateManager.removeContainerReplica(id, replicaTwo);
+
+    /*
+     * We have now removed unhealthy replica, next iteration of
+     * ReplicationManager should re-replicate the container as it
+     * is under replicated now
+     */
+
+    replicationManager.processContainersNow();
+    // Wait for EventQueue to call the event handler
+    Thread.sleep(100L);
+    Assert.assertEquals(currentReplicateCommandCount + 2,
+        datanodeCommandHandler.getInvocationCount(
+            SCMCommandProto.Type.replicateContainerCommand));
+  }
+
+
+  /**
+   * When a container is QUASI_CLOSED and it has >50% of its replica
+   * in QUASI_CLOSED state with unique origin node id,
+   * ReplicationManager should force close the replica(s) with
+   * highest BCSID.
+   */
+  @Test
+  public void testQuasiClosedToClosed() throws
+      SCMException, ContainerNotFoundException, InterruptedException {
+    final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
+    final ContainerID id = container.containerID();
+    final Set<ContainerReplica> replicas = getReplicas(id, State.QUASI_CLOSED,
+        randomDatanodeDetails(),
+        randomDatanodeDetails(),
+        randomDatanodeDetails());
+    containerStateManager.loadContainer(container);
+    for (ContainerReplica replica : replicas) {
+      containerStateManager.updateContainerReplica(id, replica);
+    }
+
+    final int currentCloseCommandCount = datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
+
+    replicationManager.processContainersNow();
+    // Wait for EventQueue to call the event handler
+    Thread.sleep(100L);
+
+    // All the replicas have same BCSID, so all of them will be closed.
+    Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
+        .getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
+
+  }
+
+
+  /**
+   * ReplicationManager should not take any action if the container is
+   * CLOSED and healthy.
+   */
+  @Test
+  public void testHealthyClosedContainer()
+      throws SCMException, ContainerNotFoundException, InterruptedException {
+    final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
+    final ContainerID id = container.containerID();
+    final Set<ContainerReplica> replicas = getReplicas(id, State.CLOSED,
+        randomDatanodeDetails(),
+        randomDatanodeDetails(),
+        randomDatanodeDetails());
+
+    containerStateManager.loadContainer(container);
+    for (ContainerReplica replica : replicas) {
+      containerStateManager.updateContainerReplica(id, replica);
+    }
+
+    replicationManager.processContainersNow();
+    // Wait for EventQueue to call the event handler
+    Thread.sleep(100L);
+    Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
+  }
+
+  @After
+  public void teardown() throws IOException {
+    containerStateManager.close();
+    replicationManager.stop();
+  }
+
+  private class DatanodeCommandHandler implements
+      EventHandler<CommandForDatanode> {
+
+    private AtomicInteger invocation = new AtomicInteger(0);
+    private Map<SCMCommandProto.Type, AtomicInteger> commandInvocation =
+        new HashMap<>();
+    private List<CommandForDatanode> commands = new ArrayList<>();
+
+    @Override
+    public void onMessage(final CommandForDatanode command,
+                          final EventPublisher publisher) {
+      final SCMCommandProto.Type type = command.getCommand().getType();
+      commandInvocation.computeIfAbsent(type, k -> new AtomicInteger(0));
+      commandInvocation.get(type).incrementAndGet();
+      invocation.incrementAndGet();
+      commands.add(command);
+    }
+
+    private int getInvocation() {
+      return invocation.get();
+    }
+
+    private int getInvocationCount(SCMCommandProto.Type type) {
+      return commandInvocation.containsKey(type) ?
+          commandInvocation.get(type).get() : 0;
+    }
+
+    private List<CommandForDatanode> getReceivedCommands() {
+      return commands;
+    }
+
+    /**
+     * Returns true if the command handler has received the given
+     * command type for the provided datanode.
+     *
+     * @param type Command Type
+     * @param datanode DatanodeDetails
+     * @return True if command was received, false otherwise
+     */
+    private boolean received(final SCMCommandProto.Type type,
+                             final DatanodeDetails datanode) {
+      return commands.stream().anyMatch(dc ->
+          dc.getCommand().getType().equals(type) &&
+              dc.getDatanodeId().equals(datanode.getUuid()));
+    }
+  }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org