You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/05/04 16:31:41 UTC

[ozone] branch master updated: HDDS-6589. Add a new replication manager and change the existing one to legacy (#3352)

This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new c99fd20da9 HDDS-6589. Add a new replication manager and change the existing one to legacy (#3352)
c99fd20da9 is described below

commit c99fd20da9fa2717f182531617f739335f9b7725
Author: Jackson Yao <ja...@tencent.com>
AuthorDate: Thu May 5 00:31:34 2022 +0800

    HDDS-6589. Add a new replication manager and change the existing one to legacy (#3352)
---
 .../hdds/scm/container/ContainerReplicaCount.java  |   3 +-
 .../scm/container/balancer/ContainerBalancer.java  |  14 +-
 .../ContainerBalancerSelectionCriteria.java        |   2 +-
 ...tivityStatusMXBean.java => InflightAction.java} |  27 +-
 .../LegacyReplicationManager.java}                 | 359 ++++------------
 .../replication/ReplicationActivityStatus.java     | 104 -----
 .../container/replication/ReplicationManager.java  | 450 +++++++++++++++++++++
 .../replication/ReplicationManagerMetrics.java     |   1 -
 .../hdds/scm/node/DatanodeAdminMonitorImpl.java    |   2 +-
 .../hdds/scm/node/NodeDecommissionManager.java     |   2 +-
 .../scm/server/OzoneStorageContainerManager.java   |   2 +-
 .../hadoop/hdds/scm/server/SCMConfigurator.java    |   2 +-
 .../hdds/scm/server/StorageContainerManager.java   |   2 +-
 .../hdds/scm/container/TestReplicationManager.java |  61 ++-
 .../container/balancer/TestContainerBalancer.java  |  13 +-
 .../replication/TestReplicationManagerMetrics.java |   1 -
 .../hdds/scm/node/TestDatanodeAdminMonitor.java    |   2 +-
 .../apache/hadoop/ozone/MiniOzoneChaosCluster.java |   2 +-
 .../ozone/insight/scm/ReplicaManagerInsight.java   |   2 +-
 .../safemode/TestSCMSafeModeWithPipelineRules.java |   2 +-
 .../hadoop/ozone/TestStorageContainerManager.java  |  88 ++--
 .../rpc/TestContainerReplicationEndToEnd.java      |   2 +-
 .../ozone/client/rpc/read/TestInputStreamBase.java |   2 +-
 .../ozone/container/TestContainerReplication.java  |   2 +-
 .../commandhandler/TestBlockDeletion.java          |   6 +-
 .../hadoop/ozone/scm/TestCloseContainer.java       |   2 +-
 .../scm/node/TestDecommissionAndMaintenance.java   |   2 +-
 .../scm/ReconStorageContainerManagerFacade.java    |   2 +-
 28 files changed, 653 insertions(+), 506 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java
index 3dab4ad83f..ec5a87a890 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.container;
 
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
 
 import java.util.Set;
 
@@ -266,7 +267,7 @@ public class ContainerReplicaCount {
         || container.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED)
         && replica.stream()
         .filter(r -> r.getDatanodeDetails().getPersistedOpState() == IN_SERVICE)
-        .allMatch(r -> ReplicationManager.compareState(
+        .allMatch(r -> LegacyReplicationManager.compareState(
             container.getState(), r.getState()));
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
index 09d289e817..c2d96ffbd2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java
@@ -29,7 +29,8 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMService;
@@ -106,7 +107,7 @@ public class ContainerBalancer implements SCMService {
   private FindTargetStrategy findTargetStrategy;
   private FindSourceStrategy findSourceStrategy;
   private Map<ContainerMoveSelection,
-      CompletableFuture<ReplicationManager.MoveResult>>
+      CompletableFuture<LegacyReplicationManager.MoveResult>>
       moveSelectionToFutureMap;
   private IterationResult iterationResult;
 
@@ -461,7 +462,6 @@ public class ContainerBalancer implements SCMService {
    */
   private void checkIterationMoveResults(Set<DatanodeDetails> selectedTargets) {
     this.countDatanodesInvolvedPerIteration = 0;
-
     CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(
         moveSelectionToFutureMap.values()
             .toArray(new CompletableFuture[moveSelectionToFutureMap.size()]));
@@ -586,7 +586,7 @@ public class ContainerBalancer implements SCMService {
   private boolean moveContainer(DatanodeDetails source,
                                 ContainerMoveSelection moveSelection) {
     ContainerID containerID = moveSelection.getContainerID();
-    CompletableFuture<ReplicationManager.MoveResult> future;
+    CompletableFuture<LegacyReplicationManager.MoveResult> future;
     try {
       ContainerInfo containerInfo = containerManager.getContainer(containerID);
       future = replicationManager
@@ -599,7 +599,7 @@ public class ContainerBalancer implements SCMService {
                   source.getUuidString(),
                   moveSelection.getTargetNode().getUuidString(), ex);
             } else {
-              if (result == ReplicationManager.MoveResult.COMPLETED) {
+              if (result == LegacyReplicationManager.MoveResult.COMPLETED) {
                 metrics.incrementDataSizeMovedGBInLatestIteration(
                     containerInfo.getUsedBytes() / OzoneConsts.GB);
                 metrics.incrementNumContainerMovesCompletedInLatestIteration(1);
@@ -630,9 +630,9 @@ public class ContainerBalancer implements SCMService {
       if (future.isCompletedExceptionally()) {
         return false;
       } else {
-        ReplicationManager.MoveResult result = future.join();
+        LegacyReplicationManager.MoveResult result = future.join();
         moveSelectionToFutureMap.put(moveSelection, future);
-        return result == ReplicationManager.MoveResult.COMPLETED;
+        return result == LegacyReplicationManager.MoveResult.COMPLETED;
       }
     } else {
       moveSelectionToFutureMap.put(moveSelection, future);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java
index b5f5acd8fa..bfcca778a4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.slf4j.Logger;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InflightAction.java
similarity index 58%
rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InflightAction.java
index 164bd247ef..153d5dc2c6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InflightAction.java
@@ -17,12 +17,29 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
 /**
- * JMX interface to monitor replication status.
- */
-public interface ReplicationActivityStatusMXBean {
+* InflightAction is a Wrapper class to hold the InflightAction
+* with its start time and the target datanode.
+*/
+public class InflightAction {
+  private final DatanodeDetails datanode;
+  private final long time;
+
+  public InflightAction(final DatanodeDetails datanode,
+                         final long time) {
+    this.datanode = datanode;
+    this.time = time;
+  }
 
-  boolean isReplicationEnabled();
+  @VisibleForTesting
+  public DatanodeDetails getDatanode() {
+    return datanode;
+  }
 
-  void setReplicationEnabled(boolean enabled);
+  public long getTime() {
+    return time;
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
similarity index 89%
rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index 279163b8f3..47ac3d3541 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -16,34 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hdds.scm.container;
+package org.apache.hadoop.hdds.scm.container.replication;
 
-import java.io.IOException;
-import java.lang.reflect.Proxy;
-import java.time.Clock;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdds.HddsConfigKeys;
+import com.google.protobuf.GeneratedMessage;
 import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigType;
@@ -52,31 +27,34 @@ import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
-import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
-import static org.apache.hadoop.hdds.protocol.proto.
-    SCMRatisProtocol.RequestType.MOVE;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
 import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManagerMetrics;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaCount;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport.HealthState;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
-import org.apache.hadoop.hdds.scm.ha.SCMService;
-import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
-import org.apache.hadoop.hdds.scm.metadata.Replicate;
 import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.ClientVersion;
 import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
@@ -85,30 +63,44 @@ import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport.HealthState;
-
-import com.google.protobuf.GeneratedMessage;
-import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
-import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
-
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.lang.reflect.Proxy;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
+import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
+import static org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType.MOVE;
+
 /**
- * Replication Manager (RM) is the one which is responsible for making sure
- * that the containers are properly replicated. Replication Manager deals only
- * with Quasi Closed / Closed container.
+ * Legacy Replication Manager (RM) is a legacy , which is used to process
+ * non-EC container, and hopefully to be replaced int the future.
  */
-public class ReplicationManager implements SCMService {
+public class LegacyReplicationManager {
 
   public static final Logger LOG =
-      LoggerFactory.getLogger(ReplicationManager.class);
-
-  public static final String METRICS_SOURCE_NAME = "SCMReplicationManager";
+      LoggerFactory.getLogger(LegacyReplicationManager.class);
 
   /**
    * Reference to the ContainerManager.
@@ -211,40 +203,19 @@ public class ReplicationManager implements SCMService {
    */
   private final ReplicationManagerConfiguration rmConf;
 
-  /**
-   * ReplicationMonitor thread is the one which wakes up at configured
-   * interval and processes all the containers.
-   */
-  private Thread replicationMonitor;
-
-  /**
-   * Flag used for checking if the ReplicationMonitor thread is running or
-   * not.
-   */
-  private volatile boolean running;
-
   /**
    * Minimum number of replica in a healthy state for maintenance.
    */
   private int minHealthyForMaintenance;
 
+  private final Clock clock;
+
   /**
    * Current container size as a bound for choosing datanodes with
    * enough space for a replica.
    */
   private long currentContainerSize;
 
-  /**
-   * SCMService related variables.
-   * After leaving safe mode, replicationMonitor needs to wait for a while
-   * before really take effect.
-   */
-  private final Lock serviceLock = new ReentrantLock();
-  private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
-  private final long waitTimeInMillis;
-  private long lastTimeToBeReadyInMillis = 0;
-  private final Clock clock;
-
   /**
    * Replication progress related metrics.
    */
@@ -255,10 +226,6 @@ public class ReplicationManager implements SCMService {
    */
   private final MoveScheduler moveScheduler;
 
-  /**
-   * Report object that is refreshed each time replication Manager runs.
-   */
-  private ReplicationManagerReport containerReport;
 
   /**
    * Constructs ReplicationManager instance with the given configuration.
@@ -269,16 +236,15 @@ public class ReplicationManager implements SCMService {
    * @param eventPublisher EventPublisher
    */
   @SuppressWarnings("parameternumber")
-  public ReplicationManager(final ConfigurationSource conf,
-             final ContainerManager containerManager,
-             final PlacementPolicy containerPlacement,
-             final EventPublisher eventPublisher,
-             final SCMContext scmContext,
-             final SCMServiceManager serviceManager,
-             final NodeManager nodeManager,
-             final java.time.Clock clock,
-             final SCMHAManager scmhaManager,
-             final Table<ContainerID, MoveDataNodePair> moveTable)
+  public LegacyReplicationManager(final ConfigurationSource conf,
+            final ContainerManager containerManager,
+            final PlacementPolicy containerPlacement,
+            final EventPublisher eventPublisher,
+            final SCMContext scmContext,
+            final NodeManager nodeManager,
+            final SCMHAManager scmhaManager,
+            final Clock clock,
+            final Table<ContainerID, MoveDataNodePair> moveTable)
              throws IOException {
     this.containerManager = containerManager;
     this.containerPlacement = containerPlacement;
@@ -286,18 +252,12 @@ public class ReplicationManager implements SCMService {
     this.scmContext = scmContext;
     this.nodeManager = nodeManager;
     this.rmConf = conf.getObject(ReplicationManagerConfiguration.class);
-    this.running = false;
     this.inflightReplication = new ConcurrentHashMap<>();
     this.inflightDeletion = new ConcurrentHashMap<>();
     this.inflightMoveFuture = new ConcurrentHashMap<>();
     this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum();
     this.clock = clock;
-    this.containerReport = new ReplicationManagerReport();
 
-    this.waitTimeInMillis = conf.getTimeDuration(
-        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
-        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
-        TimeUnit.MILLISECONDS);
     this.currentContainerSize = (long) conf.getStorageSize(
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
@@ -308,109 +268,16 @@ public class ReplicationManager implements SCMService {
         .setDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
         .setRatisServer(scmhaManager.getRatisServer())
         .setMoveTable(moveTable).build();
-
-    // register ReplicationManager to SCMServiceManager.
-    serviceManager.register(this);
-
-    // start ReplicationManager.
-    start();
-  }
-
-  /**
-   * Starts Replication Monitor thread.
-   */
-  @Override
-  public synchronized void start() {
-    if (!isRunning()) {
-      metrics = ReplicationManagerMetrics.create(this);
-      LOG.info("Starting Replication Monitor Thread.");
-      running = true;
-      replicationMonitor = new Thread(this::run);
-      replicationMonitor.setName("ReplicationMonitor");
-      replicationMonitor.setDaemon(true);
-      replicationMonitor.start();
-    } else {
-      LOG.info("Replication Monitor Thread is already running.");
-    }
-  }
-
-  /**
-   * Returns true if the Replication Monitor Thread is running.
-   *
-   * @return true if running, false otherwise
-   */
-  public boolean isRunning() {
-    if (!running) {
-      synchronized (this) {
-        return replicationMonitor != null
-            && replicationMonitor.isAlive();
-      }
-    }
-    return true;
   }
 
-  /**
-   * Stops Replication Monitor thread.
-   */
-  public synchronized void stop() {
-    if (running) {
-      LOG.info("Stopping Replication Monitor Thread.");
-      inflightReplication.clear();
-      inflightDeletion.clear();
-      running = false;
-      metrics.unRegister();
-      notifyAll();
-    } else {
-      LOG.info("Replication Monitor Thread is not running.");
-    }
-  }
-
-  /**
-   * Process all the containers now, and wait for the processing to complete.
-   * This in intended to be used in tests.
-   */
-  public synchronized void processAll() {
-    if (!shouldRun()) {
-      LOG.info("Replication Manager is not ready to run until {}ms after " +
-          "safemode exit", waitTimeInMillis);
-      return;
-    }
-    final long start = clock.millis();
-    final List<ContainerInfo> containers =
-        containerManager.getContainers();
-    ReplicationManagerReport report = new ReplicationManagerReport();
-    for (ContainerInfo c : containers) {
-      processContainer(c, report);
-    }
-    report.setComplete();
-    containerReport = report;
-    LOG.info("Replication Monitor Thread took {} milliseconds for" +
-            " processing {} containers.", clock.millis() - start,
-        containers.size());
-  }
 
-  public ReplicationManagerReport getContainerReport() {
-    return containerReport;
+  protected synchronized void clearInflightActions() {
+    inflightReplication.clear();
+    inflightDeletion.clear();
   }
 
-  /**
-   * ReplicationMonitor thread runnable. This wakes up at configured
-   * interval and processes all the containers in the system.
-   */
-  private synchronized void run() {
-    try {
-      while (running) {
-        processAll();
-        wait(rmConf.getInterval());
-      }
-    } catch (Throwable t) {
-      // When we get runtime exception, we should terminate SCM.
-      LOG.error("Exception in Replication Monitor Thread.", t);
-      if (t instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
-      }
-      ExitUtil.terminate(1, t);
-    }
+  protected synchronized void setMetrics(ReplicationManagerMetrics metrics) {
+    this.metrics = metrics;
   }
 
   /**
@@ -419,11 +286,8 @@ public class ReplicationManager implements SCMService {
    * @param container ContainerInfo
    */
   @SuppressWarnings("checkstyle:methodlength")
-  private void processContainer(ContainerInfo container,
+  protected void processContainer(ContainerInfo container,
       ReplicationManagerReport report) {
-    if (!shouldRun()) {
-      return;
-    }
     final ContainerID id = container.containerID();
     try {
       // synchronize on the containerInfo object to solve container
@@ -492,14 +356,14 @@ public class ReplicationManager implements SCMService {
          * list, if the operation is completed or if it has timed out.
          */
         updateInflightAction(container, inflightReplication,
-            action -> replicas.stream()
-                .anyMatch(r -> r.getDatanodeDetails().equals(action.datanode)),
+            action -> replicas.stream().anyMatch(
+                r -> r.getDatanodeDetails().equals(action.getDatanode())),
             () -> metrics.incrNumReplicationCmdsTimeout(),
             action -> updateCompletedReplicationMetrics(container, action));
 
         updateInflightAction(container, inflightDeletion,
-            action -> replicas.stream()
-                .noneMatch(r -> r.getDatanodeDetails().equals(action.datanode)),
+            action -> replicas.stream().noneMatch(
+                r -> r.getDatanodeDetails().equals(action.getDatanode())),
             () -> metrics.incrNumDeletionCmdsTimeout(),
             action -> updateCompletedDeletionMetrics(container, action));
 
@@ -601,14 +465,14 @@ public class ReplicationManager implements SCMService {
       InflightAction action) {
     metrics.incrNumReplicationCmdsCompleted();
     metrics.incrNumReplicationBytesCompleted(container.getUsedBytes());
-    metrics.addReplicationTime(clock.millis() - action.time);
+    metrics.addReplicationTime(clock.millis() - action.getTime());
   }
 
   private void updateCompletedDeletionMetrics(ContainerInfo container,
       InflightAction action) {
     metrics.incrNumDeletionCmdsCompleted();
     metrics.incrNumDeletionBytesCompleted(container.getUsedBytes());
-    metrics.addDeletionTime(clock.millis() - action.time);
+    metrics.addDeletionTime(clock.millis() - action.getTime());
   }
 
   /**
@@ -634,10 +498,10 @@ public class ReplicationManager implements SCMService {
       while (iter.hasNext()) {
         try {
           InflightAction a = iter.next();
-          NodeStatus status = nodeManager.getNodeStatus(a.datanode);
+          NodeStatus status = nodeManager.getNodeStatus(a.getDatanode());
           boolean isUnhealthy = status.getHealth() != NodeState.HEALTHY;
           boolean isCompleted = filter.test(a);
-          boolean isTimeout = a.time < deadline;
+          boolean isTimeout = a.getTime() < deadline;
           boolean isNotInService = status.getOperationalState() !=
               NodeOperationalState.IN_SERVICE;
           if (isCompleted || isUnhealthy || isTimeout || isNotInService) {
@@ -650,7 +514,7 @@ public class ReplicationManager implements SCMService {
             }
 
             updateMoveIfNeeded(isUnhealthy, isCompleted, isTimeout,
-                isNotInService, container, a.datanode, inflightActions);
+                isNotInService, container, a.getDatanode(), inflightActions);
           }
         } catch (NodeNotFoundException | ContainerNotFoundException e) {
           // Should not happen, but if it does, just remove the action as the
@@ -781,14 +645,10 @@ public class ReplicationManager implements SCMService {
    * @param cid Container to move
    * @param mp MoveDataNodePair which contains source and target datanodes
    */
-  public CompletableFuture<MoveResult> move(ContainerID cid,
+  private CompletableFuture<MoveResult> move(ContainerID cid,
       MoveDataNodePair mp)
       throws ContainerNotFoundException, NodeNotFoundException {
     CompletableFuture<MoveResult> ret = new CompletableFuture<>();
-    if (!isRunning()) {
-      ret.complete(MoveResult.FAIL_NOT_RUNNING);
-      return ret;
-    }
 
     if (!scmContext.isLeader()) {
       ret.complete(MoveResult.FAIL_NOT_LEADER);
@@ -1096,7 +956,7 @@ public class ReplicationManager implements SCMService {
       final List<DatanodeDetails> deletionInFlight = inflightDeletion
           .getOrDefault(container.containerID(), Collections.emptyList())
           .stream()
-          .map(action -> action.datanode)
+          .map(action -> action.getDatanode())
           .collect(Collectors.toList());
       Set<ContainerReplica> filteredReplicas = replicas.stream().filter(
           r -> !deletionInFlight.contains(r.getDatanodeDetails()))
@@ -1175,12 +1035,12 @@ public class ReplicationManager implements SCMService {
       final List<DatanodeDetails> deletionInFlight = inflightDeletion
           .getOrDefault(id, Collections.emptyList())
           .stream()
-          .map(action -> action.datanode)
+          .map(action -> action.getDatanode())
           .collect(Collectors.toList());
       final List<DatanodeDetails> replicationInFlight = inflightReplication
           .getOrDefault(id, Collections.emptyList())
           .stream()
-          .map(action -> action.datanode)
+          .map(action -> action.getDatanode())
           .collect(Collectors.toList());
       final List<DatanodeDetails> source = replicas.stream()
           .filter(r ->
@@ -1314,7 +1174,7 @@ public class ReplicationManager implements SCMService {
       // also many not be available
       eligibleReplicas.removeIf(r ->
           r.getDatanodeDetails().getPersistedOpState() !=
-              HddsProtos.NodeOperationalState.IN_SERVICE);
+              NodeOperationalState.IN_SERVICE);
 
       final List<ContainerReplica> unhealthyReplicas = eligibleReplicas
           .stream()
@@ -1704,7 +1564,7 @@ public class ReplicationManager implements SCMService {
       ContainerInfo container, Set<ContainerReplica> replicas) {
     LifeCycleState state = container.getState();
     return replicas.stream()
-        .allMatch(r -> ReplicationManager.compareState(state, r.getState()));
+        .allMatch(r -> compareState(state, r.getState()));
   }
 
   public boolean isContainerReplicatingOrDeleting(ContainerID containerID) {
@@ -1712,26 +1572,6 @@ public class ReplicationManager implements SCMService {
         inflightDeletion.containsKey(containerID);
   }
 
-  /**
-   * Wrapper class to hold the InflightAction with its start time.
-   */
-  static final class InflightAction {
-
-    private final DatanodeDetails datanode;
-    private final long time;
-
-    private InflightAction(final DatanodeDetails datanode,
-                           final long time) {
-      this.datanode = datanode;
-      this.time = time;
-    }
-
-    @VisibleForTesting
-    public DatanodeDetails getDatanode() {
-      return datanode;
-    }
-  }
-
   /**
    * Configuration used by the Replication Manager.
    */
@@ -1803,49 +1643,10 @@ public class ReplicationManager implements SCMService {
     }
   }
 
-  @Override
-  public void notifyStatusChanged() {
-    serviceLock.lock();
-    try {
-      // 1) SCMContext#isLeaderReady returns true.
-      // 2) not in safe mode.
-      if (scmContext.isLeaderReady() && !scmContext.isInSafeMode()) {
-        // transition from PAUSING to RUNNING
-        if (serviceStatus != ServiceStatus.RUNNING) {
-          LOG.info("Service {} transitions to RUNNING.", getServiceName());
-          lastTimeToBeReadyInMillis = clock.millis();
-          serviceStatus = ServiceStatus.RUNNING;
-        }
-        //now, as the current scm is leader and it`s state is up-to-date,
-        //we need to take some action about replicated inflight move options.
-        onLeaderReadyAndOutOfSafeMode();
-      } else {
-        serviceStatus = ServiceStatus.PAUSING;
-      }
-    } finally {
-      serviceLock.unlock();
-    }
-  }
-
-  @Override
-  public boolean shouldRun() {
-    serviceLock.lock();
-    try {
-      // If safe mode is off, then this SCMService starts to run with a delay.
-      return serviceStatus == ServiceStatus.RUNNING &&
-          clock.millis() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
-    } finally {
-      serviceLock.unlock();
-    }
-  }
-
-  @Override
-  public String getServiceName() {
-    return ReplicationManager.class.getSimpleName();
-  }
-
-  public ReplicationManagerMetrics getMetrics() {
-    return this.metrics;
+  protected void notifyStatusChanged() {
+    //now, as the current scm is leader and it`s state is up-to-date,
+    //we need to take some action about replicated inflight move options.
+    onLeaderReadyAndOutOfSafeMode();
   }
 
   public Map<ContainerID, List<InflightAction>> getInflightReplication() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java
deleted file mode 100644
index 92a30d5c26..0000000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import javax.management.ObjectName;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.metrics2.util.MBeans;
-
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdds.utils.Scheduler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Event listener to track the current state of replication.
- */
-public class ReplicationActivityStatus implements
-    ReplicationActivityStatusMXBean, Closeable {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ReplicationActivityStatus.class);
-
-  private Scheduler scheduler;
-  private AtomicBoolean replicationEnabled = new AtomicBoolean();
-  private ObjectName jmxObjectName;
-
-  public ReplicationActivityStatus(Scheduler scheduler) {
-    this.scheduler = scheduler;
-  }
-
-  @Override
-  public boolean isReplicationEnabled() {
-    return replicationEnabled.get();
-  }
-
-  @VisibleForTesting
-  @Override
-  public void setReplicationEnabled(boolean enabled) {
-    replicationEnabled.set(enabled);
-  }
-
-  @VisibleForTesting
-  public void enableReplication() {
-    replicationEnabled.set(true);
-  }
-
-
-  public void start() {
-    try {
-      this.jmxObjectName =
-          MBeans.register(
-              "StorageContainerManager", "ReplicationActivityStatus", this);
-    } catch (Exception ex) {
-      LOG.error("JMX bean for ReplicationActivityStatus can't be registered",
-          ex);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (this.jmxObjectName != null) {
-      MBeans.unregister(jmxObjectName);
-    }
-  }
-
-  /**
-   * Waits for
-   * {@link HddsConfigKeys#HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT} and set
-   * replicationEnabled to start replication monitor thread.
-   */
-  public void fireReplicationStart(boolean safeModeStatus,
-      long waitTime) {
-    if (!safeModeStatus) {
-      scheduler.schedule(() -> {
-        setReplicationEnabled(true);
-        LOG.info("Replication Timer sleep for {} ms completed. Enable "
-            + "Replication", waitTime);
-      }, waitTime, TimeUnit.MILLISECONDS);
-    }
-  }
-
-
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
new file mode 100644
index 0000000000..e4020955f7
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaCount;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.util.ExitUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
+import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
+
+/**
+ * Replication Manager (RM) is the one which is responsible for making sure
+ * that the containers are properly replicated. Replication Manager deals only
+ * with Quasi Closed / Closed container.
+ */
+public class ReplicationManager implements SCMService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ReplicationManager.class);
+
+  /**
+   * Reference to the ContainerManager.
+   */
+  private final ContainerManager containerManager;
+
+
+  /**
+   * SCMContext from StorageContainerManager.
+   */
+  private final SCMContext scmContext;
+
+
+  /**
+   * ReplicationManager specific configuration.
+   */
+  private final ReplicationManagerConfiguration rmConf;
+
+  /**
+   * ReplicationMonitor thread is the one which wakes up at configured
+   * interval and processes all the containers.
+   */
+  private Thread replicationMonitor;
+
+  /**
+   * Flag used for checking if the ReplicationMonitor thread is running or
+   * not.
+   */
+  private volatile boolean running;
+
+  /**
+   * Report object that is refreshed each time replication Manager runs.
+   */
+  private ReplicationManagerReport containerReport;
+
+  /**
+   * Replication progress related metrics.
+   */
+  private ReplicationManagerMetrics metrics;
+
+
+  /**
+   * Legacy RM will hopefully be removed after completing refactor
+   * for now, it is used to process non-EC container.
+   */
+  private LegacyReplicationManager legacyReplicationManager;
+
+  /**
+   * SCMService related variables.
+   * After leaving safe mode, replicationMonitor needs to wait for a while
+   * before really take effect.
+   */
+  private final Lock serviceLock = new ReentrantLock();
+  private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+  private final long waitTimeInMillis;
+  private long lastTimeToBeReadyInMillis = 0;
+  private final Clock clock;
+
+  /**
+   * Constructs ReplicationManager instance with the given configuration.
+   *
+   * @param conf OzoneConfiguration
+   * @param containerManager ContainerManager
+   * @param containerPlacement PlacementPolicy
+   * @param eventPublisher EventPublisher
+   */
+  @SuppressWarnings("parameternumber")
+  public ReplicationManager(final ConfigurationSource conf,
+             final ContainerManager containerManager,
+             final PlacementPolicy containerPlacement,
+             final EventPublisher eventPublisher,
+             final SCMContext scmContext,
+             final SCMServiceManager serviceManager,
+             final NodeManager nodeManager,
+             final Clock clock,
+             final SCMHAManager scmhaManager,
+             final Table<ContainerID, MoveDataNodePair> moveTable)
+             throws IOException {
+    this.containerManager = containerManager;
+    this.scmContext = scmContext;
+    this.rmConf = conf.getObject(ReplicationManagerConfiguration.class);
+    this.running = false;
+    this.clock = clock;
+    this.containerReport = new ReplicationManagerReport();
+    this.metrics = null;
+    this.waitTimeInMillis = conf.getTimeDuration(
+        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+        HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    this.legacyReplicationManager = new LegacyReplicationManager(
+        conf, containerManager, containerPlacement, eventPublisher,
+        scmContext, nodeManager, scmhaManager, clock, moveTable);
+
+    // register ReplicationManager to SCMServiceManager.
+    serviceManager.register(this);
+
+    // start ReplicationManager.
+    start();
+  }
+
+  /**
+   * Starts Replication Monitor thread.
+   */
+  @Override
+  public synchronized void start() {
+    if (!isRunning()) {
+      LOG.info("Starting Replication Monitor Thread.");
+      running = true;
+      metrics = ReplicationManagerMetrics.create(this);
+      legacyReplicationManager.setMetrics(metrics);
+      replicationMonitor = new Thread(this::run);
+      replicationMonitor.setName("ReplicationMonitor");
+      replicationMonitor.setDaemon(true);
+      replicationMonitor.start();
+    } else {
+      LOG.info("Replication Monitor Thread is already running.");
+    }
+  }
+
+  /**
+   * Returns true if the Replication Monitor Thread is running.
+   *
+   * @return true if running, false otherwise
+   */
+  public boolean isRunning() {
+    if (!running) {
+      synchronized (this) {
+        return replicationMonitor != null
+            && replicationMonitor.isAlive();
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Stops Replication Monitor thread.
+   */
+  public synchronized void stop() {
+    if (running) {
+      LOG.info("Stopping Replication Monitor Thread.");
+      running = false;
+      legacyReplicationManager.clearInflightActions();
+      metrics.unRegister();
+      replicationMonitor.interrupt();
+    } else {
+      LOG.info("Replication Monitor Thread is not running.");
+    }
+  }
+
+  /**
+   * Process all the containers now, and wait for the processing to complete.
+   * This in intended to be used in tests.
+   */
+  public synchronized void processAll() {
+    if (!shouldRun()) {
+      LOG.info("Replication Manager is not ready to run until {}ms after " +
+          "safemode exit", waitTimeInMillis);
+      return;
+    }
+    final long start = clock.millis();
+    final List<ContainerInfo> containers =
+        containerManager.getContainers();
+    ReplicationManagerReport report = new ReplicationManagerReport();
+    for (ContainerInfo c : containers) {
+      if (!shouldRun()) {
+        break;
+      }
+      switch (c.getReplicationType()) {
+      case EC:
+        break;
+      default:
+        legacyReplicationManager.processContainer(c, report);
+      }
+    }
+    report.setComplete();
+    this.containerReport = report;
+    LOG.info("Replication Monitor Thread took {} milliseconds for" +
+            " processing {} containers.", clock.millis() - start,
+        containers.size());
+  }
+
+  public ReplicationManagerReport getContainerReport() {
+    return containerReport;
+  }
+
+  /**
+   * ReplicationMonitor thread runnable. This wakes up at configured
+   * interval and processes all the containers in the system.
+   */
+  private synchronized void run() {
+    try {
+      while (running) {
+        processAll();
+        wait(rmConf.getInterval());
+      }
+    } catch (Throwable t) {
+      if (t instanceof InterruptedException) {
+        LOG.info("Replication Monitor Thread is stopped");
+        Thread.currentThread().interrupt();
+      } else {
+        // When we get runtime exception, we should terminate SCM.
+        LOG.error("Exception in Replication Monitor Thread.", t);
+        ExitUtil.terminate(1, t);
+      }
+    }
+  }
+
+  /**
+   * Given a ContainerID, lookup the ContainerInfo and then return a
+   * ContainerReplicaCount object for the container.
+   * @param containerID The ID of the container
+   * @return ContainerReplicaCount for the given container
+   * @throws ContainerNotFoundException
+   */
+  public ContainerReplicaCount getContainerReplicaCount(ContainerID containerID)
+      throws ContainerNotFoundException {
+    return legacyReplicationManager.getContainerReplicaCount(containerID);
+  }
+
+  /**
+   * Configuration used by the Replication Manager.
+   */
+  @ConfigGroup(prefix = "hdds.scm.replication")
+  public static class ReplicationManagerConfiguration {
+    /**
+     * The frequency in which ReplicationMonitor thread should run.
+     */
+    @Config(key = "thread.interval",
+        type = ConfigType.TIME,
+        defaultValue = "300s",
+        tags = {SCM, OZONE},
+        description = "There is a replication monitor thread running inside " +
+            "SCM which takes care of replicating the containers in the " +
+            "cluster. This property is used to configure the interval in " +
+            "which that thread runs."
+    )
+    private long interval = Duration.ofSeconds(300).toMillis();
+
+    /**
+     * Timeout for container replication & deletion command issued by
+     * ReplicationManager.
+     */
+    @Config(key = "event.timeout",
+        type = ConfigType.TIME,
+        defaultValue = "30m",
+        tags = {SCM, OZONE},
+        description = "Timeout for the container replication/deletion commands "
+            + "sent  to datanodes. After this timeout the command will be "
+            + "retried.")
+    private long eventTimeout = Duration.ofMinutes(30).toMillis();
+    public void setInterval(Duration interval) {
+      this.interval = interval.toMillis();
+    }
+
+    public void setEventTimeout(Duration timeout) {
+      this.eventTimeout = timeout.toMillis();
+    }
+
+    /**
+     * The number of container replica which must be available for a node to
+     * enter maintenance.
+     */
+    @Config(key = "maintenance.replica.minimum",
+        type = ConfigType.INT,
+        defaultValue = "2",
+        tags = {SCM, OZONE},
+        description = "The minimum number of container replicas which must " +
+            " be available for a node to enter maintenance. If putting a " +
+            " node into maintenance reduces the available replicas for any " +
+            " container below this level, the node will remain in the " +
+            " entering maintenance state until a new replica is created.")
+    private int maintenanceReplicaMinimum = 2;
+
+    public void setMaintenanceReplicaMinimum(int replicaCount) {
+      this.maintenanceReplicaMinimum = replicaCount;
+    }
+
+    public long getInterval() {
+      return interval;
+    }
+
+    public long getEventTimeout() {
+      return eventTimeout;
+    }
+
+    public int getMaintenanceReplicaMinimum() {
+      return maintenanceReplicaMinimum;
+    }
+  }
+
+  @Override
+  public void notifyStatusChanged() {
+    serviceLock.lock();
+    try {
+      // 1) SCMContext#isLeaderReady returns true.
+      // 2) not in safe mode.
+      if (scmContext.isLeaderReady() && !scmContext.isInSafeMode()) {
+        // transition from PAUSING to RUNNING
+        if (serviceStatus != ServiceStatus.RUNNING) {
+          LOG.info("Service {} transitions to RUNNING.", getServiceName());
+          lastTimeToBeReadyInMillis = clock.millis();
+          serviceStatus = ServiceStatus.RUNNING;
+        }
+        //now, as the current scm is leader and it`s state is up-to-date,
+        //we need to take some action about replicated inflight move options.
+        legacyReplicationManager.notifyStatusChanged();
+      } else {
+        serviceStatus = ServiceStatus.PAUSING;
+      }
+    } finally {
+      serviceLock.unlock();
+    }
+  }
+
+  @Override
+  public boolean shouldRun() {
+    serviceLock.lock();
+    try {
+      // If safe mode is off, then this SCMService starts to run with a delay.
+      return serviceStatus == ServiceStatus.RUNNING &&
+          clock.millis() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
+    } finally {
+      serviceLock.unlock();
+    }
+  }
+
+  @Override
+  public String getServiceName() {
+    return ReplicationManager.class.getSimpleName();
+  }
+
+  public ReplicationManagerMetrics getMetrics() {
+    return metrics;
+  }
+
+
+  /**
+  * following functions will be refactored in a seperate jira.
+  */
+  public CompletableFuture<LegacyReplicationManager.MoveResult> move(
+      ContainerID cid, DatanodeDetails src, DatanodeDetails tgt)
+      throws NodeNotFoundException, ContainerNotFoundException {
+    CompletableFuture<LegacyReplicationManager.MoveResult> ret =
+        new CompletableFuture<>();
+    if (!isRunning()) {
+      ret.complete(LegacyReplicationManager.MoveResult.FAIL_NOT_RUNNING);
+      return ret;
+    }
+
+    return legacyReplicationManager.move(cid, src, tgt);
+  }
+
+  public Map<ContainerID, List<InflightAction>>  getInflightReplication() {
+    return legacyReplicationManager.getInflightReplication();
+  }
+
+  public Map<ContainerID, List<InflightAction>> getInflightDeletion() {
+    return legacyReplicationManager.getInflightDeletion();
+  }
+
+  public Map<ContainerID,
+      CompletableFuture<LegacyReplicationManager.MoveResult>>
+      getInflightMove() {
+    return legacyReplicationManager.getInflightMove();
+  }
+
+  public LegacyReplicationManager.MoveScheduler getMoveScheduler() {
+    return legacyReplicationManager.getMoveScheduler();
+  }
+
+  @VisibleForTesting
+  public LegacyReplicationManager getLegacyReplicationManager() {
+    return legacyReplicationManager;
+  }
+
+  public boolean isContainerReplicatingOrDeleting(ContainerID containerID) {
+    return legacyReplicationManager
+        .isContainerReplicatingOrDeleting(containerID);
+  }
+}
+
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java
index 0f828aef1a..ef17b587d7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java
@@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.container.replication;
 
 import com.google.common.base.CaseFormat;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
 import org.apache.hadoop.metrics2.MetricsCollector;
 import org.apache.hadoop.metrics2.MetricsInfo;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
index a0fb6c99b2..f096f4e83f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.ContainerReplicaCount;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
index 47d7c53469..1ea04cdfc3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
 import org.apache.hadoop.hdds.scm.DatanodeAdminError;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
index fca789d56c..68adc76a45 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/OzoneStorageContainerManager.java
@@ -23,7 +23,7 @@ import java.net.InetSocketAddress;
 
 import org.apache.hadoop.hdds.scm.block.BlockManager;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
 import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
index d56cbf974d..d02f391585 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.pipeline.WritableContainerFactory;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index caa1043f5b..9abe25969b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -82,7 +82,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index 233bed7d45..c7e36dbe67 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -31,12 +31,14 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
     .ReplicationManagerConfiguration;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager.MoveResult;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager.MoveResult;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
@@ -127,7 +129,8 @@ public class TestReplicationManager {
         HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
         0, TimeUnit.SECONDS);
 
-    scmLogs = GenericTestUtils.LogCapturer.captureLogs(ReplicationManager.LOG);
+    scmLogs = GenericTestUtils.LogCapturer.
+        captureLogs(LegacyReplicationManager.LOG);
     containerManager = Mockito.mock(ContainerManager.class);
     nodeManager = new SimpleMockNodeManager();
     eventQueue = new EventQueue();
@@ -1479,8 +1482,7 @@ public class TestReplicationManager {
         new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
     DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
     CompletableFuture<MoveResult> cf =
-        replicationManager.move(id,
-            new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+        replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
     Assert.assertTrue(scmLogs.getOutput().contains(
         "receive a move request about container"));
     Thread.sleep(100L);
@@ -1521,8 +1523,7 @@ public class TestReplicationManager {
     addReplica(container,
         new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
     DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
-    replicationManager.move(id,
-        new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+    replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
     Assert.assertTrue(scmLogs.getOutput().contains(
         "receive a move request about container"));
     Thread.sleep(100L);
@@ -1617,8 +1618,7 @@ public class TestReplicationManager {
         new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
     DatanodeDetails dn4 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
     CompletableFuture<MoveResult> cf =
-        replicationManager.move(id,
-            new MoveDataNodePair(dn1.getDatanodeDetails(), dn4));
+        replicationManager.move(id, dn1.getDatanodeDetails(), dn4);
     Assert.assertTrue(scmLogs.getOutput().contains(
         "receive a move request about container"));
     Thread.sleep(100L);
@@ -1659,8 +1659,7 @@ public class TestReplicationManager {
         new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
     DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
     CompletableFuture<MoveResult> cf =
-        replicationManager.move(id,
-            new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+        replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
     Assert.assertTrue(scmLogs.getOutput().contains(
         "receive a move request about container"));
 
@@ -1672,8 +1671,7 @@ public class TestReplicationManager {
         MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
 
     nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
-    cf = replicationManager.move(id,
-        new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
     addReplicaToDn(container, dn3, CLOSED);
     replicationManager.processAll();
     eventQueue.processAll(1000);
@@ -1712,30 +1710,26 @@ public class TestReplicationManager {
     //we don't need a running replicationManamger now
     replicationManager.stop();
     Thread.sleep(100L);
-    cf = replicationManager.move(id,
-        new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
     Assert.assertTrue(cf.isDone() && cf.get() ==
         MoveResult.FAIL_NOT_RUNNING);
     replicationManager.start();
     Thread.sleep(100L);
 
     //container in not in OPEN state
-    cf = replicationManager.move(id,
-        new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
     Assert.assertTrue(cf.isDone() && cf.get() ==
         MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
     //open -> closing
     containerStateManager.updateContainerState(id.getProtobuf(),
         LifeCycleEvent.FINALIZE);
-    cf = replicationManager.move(id,
-        new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
     Assert.assertTrue(cf.isDone() && cf.get() ==
         MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
     //closing -> quasi_closed
     containerStateManager.updateContainerState(id.getProtobuf(),
         LifeCycleEvent.QUASI_CLOSE);
-    cf = replicationManager.move(id,
-        new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
     Assert.assertTrue(cf.isDone() && cf.get() ==
         MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
 
@@ -1750,12 +1744,10 @@ public class TestReplicationManager {
       if (state != HEALTHY) {
         nodeManager.setNodeStatus(dn3,
             new NodeStatus(IN_SERVICE, state));
-        cf = replicationManager.move(id,
-            new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+        cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
         Assert.assertTrue(cf.isDone() && cf.get() ==
             MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
-        cf = replicationManager.move(id,
-            new MoveDataNodePair(dn3, dn1.getDatanodeDetails()));
+        cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
         Assert.assertTrue(cf.isDone() && cf.get() ==
             MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
       }
@@ -1768,12 +1760,10 @@ public class TestReplicationManager {
       if (state != IN_SERVICE) {
         nodeManager.setNodeStatus(dn3,
             new NodeStatus(state, HEALTHY));
-        cf = replicationManager.move(id,
-            new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+        cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
         Assert.assertTrue(cf.isDone() && cf.get() ==
             MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
-        cf = replicationManager.move(id,
-            new MoveDataNodePair(dn3, dn1.getDatanodeDetails()));
+        cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
         Assert.assertTrue(cf.isDone() && cf.get() ==
             MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
       }
@@ -1781,14 +1771,13 @@ public class TestReplicationManager {
     nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
 
     //container exists in target datanode
-    cf = replicationManager.move(id,
-        new MoveDataNodePair(dn1.getDatanodeDetails(),
-        dn2.getDatanodeDetails()));
+    cf = replicationManager.move(id, dn1.getDatanodeDetails(),
+        dn2.getDatanodeDetails());
     Assert.assertTrue(cf.isDone() && cf.get() ==
         MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
 
     //container does not exist in source datanode
-    cf = replicationManager.move(id, new MoveDataNodePair(dn3, dn3));
+    cf = replicationManager.move(id, dn3, dn3);
     Assert.assertTrue(cf.isDone() && cf.get() ==
         MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
 
@@ -1799,8 +1788,7 @@ public class TestReplicationManager {
     replicationManager.processAll();
     //waiting for inflightDeletion generation
     eventQueue.processAll(1000);
-    cf = replicationManager.move(id,
-        new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
     Assert.assertTrue(cf.isDone() && cf.get() ==
         MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
     resetReplicationManager();
@@ -1813,8 +1801,7 @@ public class TestReplicationManager {
     replicationManager.processAll();
     //waiting for inflightReplication generation
     eventQueue.processAll(1000);
-    cf = replicationManager.move(id,
-        new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+    cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
     Assert.assertTrue(cf.isDone() && cf.get() ==
         MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
index 48069c505d..4901617f9f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestContainerBalancer.java
@@ -32,7 +32,9 @@ import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager.MoveResult;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
@@ -134,8 +136,7 @@ public class TestContainerBalancer {
     Mockito.when(replicationManager.move(Mockito.any(ContainerID.class),
         Mockito.any(DatanodeDetails.class),
         Mockito.any(DatanodeDetails.class)))
-        .thenReturn(CompletableFuture.completedFuture(
-            ReplicationManager.MoveResult.COMPLETED));
+        .thenReturn(CompletableFuture.completedFuture(MoveResult.COMPLETED));
 
     when(containerManager.getContainerReplicas(Mockito.any(ContainerID.class)))
         .thenAnswer(invocationOnMock -> {
@@ -631,7 +632,7 @@ public class TestContainerBalancer {
             Mockito.any(DatanodeDetails.class),
             Mockito.any(DatanodeDetails.class)))
         .thenReturn(CompletableFuture.completedFuture(
-            ReplicationManager.MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY));
+            MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY));
     balancerConfiguration.setMaxSizeToMovePerIteration(10 * OzoneConsts.GB);
 
     startBalancer(balancerConfiguration);
@@ -868,7 +869,7 @@ public class TestContainerBalancer {
     }
   }
 
-  private CompletableFuture<ReplicationManager.MoveResult>
+  private CompletableFuture<LegacyReplicationManager.MoveResult>
       genCompletableFuture(int sleepMilSec) {
     return CompletableFuture.supplyAsync(() -> {
       try {
@@ -876,7 +877,7 @@ public class TestContainerBalancer {
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
-      return ReplicationManager.MoveResult.COMPLETED;
+      return LegacyReplicationManager.MoveResult.COMPLETED;
     });
   }
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java
index 60ebbb0b86..977194d1d5 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdds.scm.container.replication;
 
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
 import org.junit.After;
 import org.junit.Assert;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
index 82822a1060..ba03004e6a 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.ContainerReplicaCount;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.SimpleMockNodeManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
diff --git a/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java b/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
index b6ef8651b2..87a852b875 100644
--- a/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
+++ b/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.failure.FailureManager;
diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ReplicaManagerInsight.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ReplicaManagerInsight.java
index 6d607b702f..e42f7fc855 100644
--- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ReplicaManagerInsight.java
+++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ReplicaManagerInsight.java
@@ -21,7 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.ozone.insight.BaseInsightPoint;
 import org.apache.hadoop.ozone.insight.Component.Type;
 import org.apache.hadoop.ozone.insight.LoggerSource;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
index 2b52564059..09f7b1909f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 2149dc3271..7fc05e92b1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -17,45 +17,7 @@
  */
 package org.apache.hadoop.ozone;
 
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
-    .NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
-import static org.apache.hadoop.hdds.HddsConfigKeys
-    .HDDS_COMMAND_STATUS_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.HddsConfigKeys
-    .HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.HddsConfigKeys
-    .HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
-import static org.junit.Assert.fail;
-
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Collections;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Stream;
-import java.util.Arrays;
-
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.io.FileUtils;
@@ -66,10 +28,10 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmInfo;
@@ -78,8 +40,8 @@ import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
 import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.RatisUtil;
@@ -108,16 +70,16 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.ozone.upgrade.LayoutVersionManager;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.ozone.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
+import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.Timeout;
 import org.mockito.ArgumentMatcher;
@@ -125,6 +87,38 @@ import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+
 /**
  * Test class that exercises the StorageContainerManager.
  */
@@ -782,8 +776,10 @@ public class TestStorageContainerManager {
       cluster.restartStorageContainerManager(false);
       scm = cluster.getStorageContainerManager();
       EventPublisher publisher = mock(EventPublisher.class);
-      ReplicationManager replicationManager = scm.getReplicationManager();
-      Field f = ReplicationManager.class.getDeclaredField("eventPublisher");
+      LegacyReplicationManager replicationManager =
+          scm.getReplicationManager().getLegacyReplicationManager();
+      Field f = LegacyReplicationManager.class.
+          getDeclaredField("eventPublisher");
       f.setAccessible(true);
       Field modifiersField = Field.class.getDeclaredField("modifiers");
       modifiersField.setAccessible(true);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
index bc4012c0d1..5ac3f15fd3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.ozone.HddsDatanodeService;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
index 07c24336aa..f3a5ecdef0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.ObjectStore;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
index 1eb153de6f..6d49f46092 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
@@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackAware;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index e2f2e1631a..98b524c5f1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
 import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
 import org.apache.hadoop.hdds.scm.block.ScmBlockDeletingServiceMetrics;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.utils.db.Table;
@@ -115,7 +115,7 @@ public class TestBlockDeletion {
     conf = new OzoneConfiguration();
     GenericTestUtils.setLogLevel(DeletedBlockLogImpl.LOG, Level.DEBUG);
     GenericTestUtils.setLogLevel(SCMBlockDeletingService.LOG, Level.DEBUG);
-    GenericTestUtils.setLogLevel(ReplicationManager.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(LegacyReplicationManager.LOG, Level.DEBUG);
 
     String path =
         GenericTestUtils.getTempPath(TestBlockDeletion.class.getSimpleName());
@@ -367,7 +367,7 @@ public class TestBlockDeletion {
         Assert.assertEquals(HddsProtos.LifeCycleState.DELETING,
             container.getState()));
     LogCapturer logCapturer =
-        LogCapturer.captureLogs(ReplicationManager.LOG);
+        LogCapturer.captureLogs(LegacyReplicationManager.LOG);
     logCapturer.clearOutput();
 
     scm.getReplicationManager().processAll();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCloseContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCloseContainer.java
index 2e67a88347..3336408287 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCloseContainer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestCloseContainer.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
index 65eb4448dc..9c40c84339 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.ContainerReplicaCount;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index c693bd2cbd..07d9f3fd19 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
-import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org