You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2022/05/20 08:29:10 UTC

[ozone] branch master updated: HDDS-6744. EC: ReplicationManager - create ContainerReplicaPendingOps class and integrate with ContainerManager (#3425)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 78a018814c HDDS-6744. EC: ReplicationManager - create ContainerReplicaPendingOps class and integrate with ContainerManager (#3425)
78a018814c is described below

commit 78a018814c1f3bdca90d16e8246c349e3d0e390b
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Fri May 20 09:29:05 2022 +0100

    HDDS-6744. EC: ReplicationManager - create ContainerReplicaPendingOps class and integrate with ContainerManager (#3425)
---
 .../hdds/scm/container/ContainerManagerImpl.java   |  16 +-
 .../container/replication/ContainerReplicaOp.java  |  63 ++++++
 .../replication/ContainerReplicaPendingOps.java    | 227 +++++++++++++++++++++
 .../container/replication/ReplicationManager.java  |   5 +-
 .../hdds/scm/server/StorageContainerManager.java   |  11 +-
 .../hadoop/hdds/scm/block/TestBlockManager.java    |   6 +-
 .../container/TestCloseContainerEventHandler.java  |   6 +-
 .../scm/container/TestContainerManagerImpl.java    |  50 ++++-
 .../hdds/scm/container/TestReplicationManager.java |   6 +-
 .../TestContainerReplicaPendingOps.java            | 226 ++++++++++++++++++++
 .../hdds/scm/node/TestContainerPlacement.java      |   7 +-
 .../ozone/recon/scm/ReconContainerManager.java     |   7 +-
 .../scm/ReconStorageContainerManagerFacade.java    |   7 +-
 .../scm/AbstractReconContainerManagerTest.java     |   8 +-
 14 files changed, 632 insertions(+), 13 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index f3f1a8a520..f3e8025450 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -97,6 +98,10 @@ public class ContainerManagerImpl implements ContainerManager {
 
   @SuppressWarnings("java:S2245") // no need for secure random
   private final Random random = new Random();
+  // Used to track pending replication and delete for container replicas. In
+  // ContainerManager, we try to remove any replicas we see added or deleted
+  // in case they have been created by replication / delete command
+  private final ContainerReplicaPendingOps containerReplicaPendingOps;
 
   /**
    *
@@ -106,7 +111,8 @@ public class ContainerManagerImpl implements ContainerManager {
       final SCMHAManager scmHaManager,
       final SequenceIdGenerator sequenceIdGen,
       final PipelineManager pipelineManager,
-      final Table<ContainerID, ContainerInfo> containerStore)
+      final Table<ContainerID, ContainerInfo> containerStore,
+      final ContainerReplicaPendingOps containerReplicaPendingOps)
       throws IOException {
     // Introduce builder for this class?
     this.lock = new ReentrantLock();
@@ -126,6 +132,7 @@ public class ContainerManagerImpl implements ContainerManager {
             ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
 
     this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create();
+    this.containerReplicaPendingOps = containerReplicaPendingOps;
   }
 
   @Override
@@ -320,6 +327,9 @@ public class ContainerManagerImpl implements ContainerManager {
       throws ContainerNotFoundException {
     if (containerExist(cid)) {
       containerStateManager.updateContainerReplica(cid, replica);
+      // Clear any pending additions for this replica as we have now seen it.
+      containerReplicaPendingOps.completeAddReplica(cid,
+          replica.getDatanodeDetails(), replica.getReplicaIndex());
     } else {
       throwContainerNotFoundException(cid);
     }
@@ -331,6 +341,10 @@ public class ContainerManagerImpl implements ContainerManager {
       throws ContainerNotFoundException, ContainerReplicaNotFoundException {
     if (containerExist(cid)) {
       containerStateManager.removeContainerReplica(cid, replica);
+      // Remove any pending delete replication operations for the deleted
+      // replica.
+      containerReplicaPendingOps.completeDeleteReplica(cid,
+          replica.getDatanodeDetails(), replica.getReplicaIndex());
     } else {
       throwContainerNotFoundException(cid);
     }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
new file mode 100644
index 0000000000..869b0f0cf2
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+/**
+ * Class to wrap details used to track pending replications.
+ */
+public class ContainerReplicaOp {
+
+  /**
+   * Enum representing different types of pending Ops.
+   */
+  public enum PendingOpType {
+    ADD, DELETE
+  }
+
+  private PendingOpType opType;
+  private DatanodeDetails target;
+  private int replicaIndex;
+  private long scheduledEpochMillis;
+
+  public ContainerReplicaOp(PendingOpType opType,
+      DatanodeDetails target, int replicaIndex, long scheduledTime) {
+    this.opType = opType;
+    this.target = target;
+    this.replicaIndex = replicaIndex;
+    this.scheduledEpochMillis = scheduledTime;
+  }
+
+  public PendingOpType getOpType() {
+    return opType;
+  }
+
+  public DatanodeDetails getTarget() {
+    return target;
+  }
+
+  public int getReplicaIndex() {
+    return replicaIndex;
+  }
+
+  public long getScheduledEpochMillis() {
+    return scheduledEpochMillis;
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
new file mode 100644
index 0000000000..691d5e9dfb
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import com.google.common.util.concurrent.Striped;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.ADD;
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE;
+
+/**
+ * Class to track pending replication operations across the cluster. For
+ * each container with a pending replication or pending delete, there will
+ * be an entry in this class mapping ContainerID to a list of the pending
+ * operations.
+ */
+public class ContainerReplicaPendingOps {
+
+  private final ConfigurationSource config;
+  private final Clock clock;
+  private final ConcurrentHashMap<ContainerID, List<ContainerReplicaOp>>
+      pendingOps = new ConcurrentHashMap<>();
+  private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(64);
+
+  public ContainerReplicaPendingOps(final ConfigurationSource conf,
+      Clock clock) {
+    this.config = conf;
+    this.clock = clock;
+  }
+
+  /**
+   * Get all the ContainerReplicaOp's associated with the given ContainerID.
+   * A new list is created and returned, so it can be modified by the caller,
+   * but any changes will not be reflected in the internal map.
+   * @param containerID The ContainerID for which to retrieve the pending
+   *                      ops.
+   * @return Standalone list of ContainerReplica or an empty list if none exist.
+   */
+  public List<ContainerReplicaOp> getPendingOps(ContainerID containerID) {
+    Lock lock = readLock(containerID);
+    lock.lock();
+    try {
+      List<ContainerReplicaOp> ops = pendingOps.get(containerID);
+      if (ops == null) {
+        return Collections.emptyList();
+      }
+      return new ArrayList<>(ops);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Store a ContainerReplicaOp to add a replica for the given ContainerID.
+   * @param containerID ContainerID for which to add a replica
+   * @param target The target datanode
+   * @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
+   */
+  public void scheduleAddReplica(ContainerID containerID,
+      DatanodeDetails target, int replicaIndex) {
+    addReplica(ADD, containerID, target, replicaIndex);
+  }
+
+  /**
+   * Store a ContainerReplicaOp to delete a replica for the given ContainerID.
+   * @param containerID ContainerID for which to delete a replica
+   * @param target The target datanode
+   * @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
+   */
+  public void scheduleDeleteReplica(ContainerID containerID,
+      DatanodeDetails target, int replicaIndex) {
+    addReplica(DELETE, containerID, target, replicaIndex);
+  }
+
+  /**
+   * Remove a stored ContainerReplicaOp from the given ContainerID as it has
+   * been replicated successfully.
+   * @param containerID ContainerID for which to complete the replication
+   * @param target The target Datanode
+   * @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
+   * @return True if a pending replica was found and removed, false otherwise.
+   */
+  public boolean completeAddReplica(ContainerID containerID,
+      DatanodeDetails target, int replicaIndex) {
+    return completeOp(ADD, containerID, target, replicaIndex);
+  }
+
+
+  /**
+   * Remove a stored ContainerReplicaOp from the given ContainerID as it has
+   * been deleted successfully.
+   * @param containerID ContainerID for which to complete the deletion
+   * @param target The target Datanode
+   * @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
+   * @return True if a pending replica was found and removed, false otherwise.
+   */
+  public boolean completeDeleteReplica(ContainerID containerID,
+      DatanodeDetails target, int replicaIndex) {
+    return completeOp(DELETE, containerID, target, replicaIndex);
+  }
+
+  /**
+   * Remove a stored pending operation from the given ContainerID.
+   * @param containerID ContainerID for which to remove the op.
+   * @param op ContainerReplicaOp to remove
+   * @return True if an element was found and deleted, false otherwise.
+   */
+  public boolean removeOp(ContainerID containerID,
+      ContainerReplicaOp op) {
+    return completeOp(op.getOpType(), containerID, op.getTarget(),
+        op.getReplicaIndex());
+  }
+
+  /**
+   * Iterate over all pending entries and remove any which have expired, meaning
+   * they have not completed the operation inside the given time.
+   * @param expiryMilliSeconds
+   */
+  public void removeExpiredEntries(long expiryMilliSeconds) {
+    for (ContainerID containerID : pendingOps.keySet()) {
+      // Rather than use an entry set, we get the map entry again. This is
+      // to protect against another thread modifying the value after this
+      // iterator started. Once we lock on the ContainerID object, no other
+      // changes can occur to the list of ops associated with it.
+      Lock lock = writeLock(containerID);
+      lock.lock();
+      try {
+        List<ContainerReplicaOp> ops = pendingOps.get(containerID);
+        if (ops == null) {
+          // There should not be null entries, but another thread may have
+          // removed the map entry after the iterator was started.
+          continue;
+        }
+        Iterator<ContainerReplicaOp> iterator = ops.listIterator();
+        while (iterator.hasNext()) {
+          ContainerReplicaOp op = iterator.next();
+          if (op.getScheduledEpochMillis() + expiryMilliSeconds
+              < clock.millis()) {
+            iterator.remove();
+          }
+        }
+        if (ops.size() == 0) {
+          pendingOps.remove(containerID);
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  private void addReplica(ContainerReplicaOp.PendingOpType opType,
+      ContainerID containerID, DatanodeDetails target, int replicaIndex) {
+    Lock lock = writeLock(containerID);
+    lock.lock();
+    try {
+      List<ContainerReplicaOp> ops = pendingOps.computeIfAbsent(
+          containerID, s -> new ArrayList<>());
+      ops.add(new ContainerReplicaOp(opType,
+          target, replicaIndex, clock.millis()));
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private boolean completeOp(ContainerReplicaOp.PendingOpType opType,
+      ContainerID containerID, DatanodeDetails target, int replicaIndex) {
+    boolean found = false;
+    Lock lock = writeLock(containerID);
+    lock.lock();
+    try {
+      List<ContainerReplicaOp> ops = pendingOps.get(containerID);
+      if (ops != null) {
+        Iterator<ContainerReplicaOp> iterator = ops.listIterator();
+        while (iterator.hasNext()) {
+          ContainerReplicaOp op = iterator.next();
+          if (op.getOpType() == opType
+              && op.getTarget().equals(target)
+              && op.getReplicaIndex() == replicaIndex) {
+            found = true;
+            iterator.remove();
+          }
+        }
+        if (ops.size() == 0) {
+          pendingOps.remove(containerID);
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+    return found;
+  }
+
+  private Lock writeLock(ContainerID containerID) {
+    return stripedLock.get(containerID).writeLock();
+  }
+
+  private Lock readLock(ContainerID containerID) {
+    return stripedLock.get(containerID).readLock();
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index e4020955f7..7ae7d64478 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -124,6 +124,7 @@ public class ReplicationManager implements SCMService {
   private final long waitTimeInMillis;
   private long lastTimeToBeReadyInMillis = 0;
   private final Clock clock;
+  private final ContainerReplicaPendingOps containerReplicaPendingOps;
 
   /**
    * Constructs ReplicationManager instance with the given configuration.
@@ -143,7 +144,8 @@ public class ReplicationManager implements SCMService {
              final NodeManager nodeManager,
              final Clock clock,
              final SCMHAManager scmhaManager,
-             final Table<ContainerID, MoveDataNodePair> moveTable)
+             final Table<ContainerID, MoveDataNodePair> moveTable,
+             final ContainerReplicaPendingOps replicaPendingOps)
              throws IOException {
     this.containerManager = containerManager;
     this.scmContext = scmContext;
@@ -156,6 +158,7 @@ public class ReplicationManager implements SCMService {
         HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
         HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
         TimeUnit.MILLISECONDS);
+    this.containerReplicaPendingOps = replicaPendingOps;
     this.legacyReplicationManager = new LegacyReplicationManager(
         conf, containerManager, containerPlacement, eventPublisher,
         scmContext, nodeManager, scmhaManager, clock, moveTable);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 7c58def298..4353d23b45 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
 import org.apache.hadoop.hdds.scm.crl.CRLStatusReportHandler;
 import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
@@ -278,6 +279,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
 
   private ContainerBalancer containerBalancer;
   private StatefulServiceStateManager statefulServiceStateManager;
+  // Used to keep track of pending replication and pending deletes for
+  // container replicas.
+  private ContainerReplicaPendingOps containerReplicaPendingOps;
 
   /**
    * Creates a new StorageContainerManager. Configuration will be
@@ -620,11 +624,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
               );
     }
 
+    containerReplicaPendingOps = new ContainerReplicaPendingOps(conf, clock);
     if (configurator.getContainerManager() != null) {
       containerManager = configurator.getContainerManager();
     } else {
       containerManager = new ContainerManagerImpl(conf, scmHAManager,
-          sequenceIdGen, pipelineManager, scmMetadataStore.getContainerTable());
+          sequenceIdGen, pipelineManager, scmMetadataStore.getContainerTable(),
+          containerReplicaPendingOps);
     }
 
     pipelineChoosePolicy = PipelineChoosePolicyFactory.getPolicy(conf);
@@ -651,7 +657,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
           scmNodeManager,
           clock,
           scmHAManager,
-          getScmMetadataStore().getMoveTable());
+          getScmMetadataStore().getMoveTable(),
+          containerReplicaPendingOps);
     }
     if (configurator.getScmSafeModeManager() != null) {
       scmSafeModeManager = configurator.getScmSafeModeManager();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index 5aa3206c3b..3771fab273 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.block;
 
 import java.io.IOException;
+import java.time.ZoneId;
 import java.time.ZoneOffset;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
 import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
 import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
@@ -159,7 +161,9 @@ public class TestBlockManager {
             scmHAManager,
             sequenceIdGen,
             pipelineManager,
-            scmMetadataStore.getContainerTable());
+            scmMetadataStore.getContainerTable(),
+            new ContainerReplicaPendingOps(conf,
+                new MonotonicClock(ZoneId.systemDefault())));
     SCMSafeModeManager safeModeManager = new SCMSafeModeManager(conf,
         containerManager.getContainers(), containerManager,
         pipelineManager, eventQueue, serviceManager, scmContext) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index 06aa8fbd95..7e46c3e2a1 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.container;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.ZoneId;
 import java.time.ZoneOffset;
 
 import org.apache.hadoop.conf.StorageUnit;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMService.Event;
@@ -116,7 +118,9 @@ public class TestCloseContainerEventHandler {
         scmhaManager,
         sequenceIdGen,
         pipelineManager,
-        scmMetadataStore.getContainerTable());
+        scmMetadataStore.getContainerTable(),
+        new ContainerReplicaPendingOps(configuration,
+            new MonotonicClock(ZoneId.systemDefault())));
 
     // trigger BackgroundPipelineCreator to take effect.
     serviceManager.notifyEventTriggered(Event.PRE_CHECK_COMPLETED);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
index 644748801d..dbf4974ae1 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.container;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.UUID;
 
 import org.apache.hadoop.fs.FileUtil;
@@ -25,8 +26,11 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
@@ -42,7 +46,9 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.OPEN;
 
 
 /**
@@ -56,6 +62,7 @@ public class TestContainerManagerImpl {
   private SCMHAManager scmhaManager;
   private SequenceIdGenerator sequenceIdGen;
   private NodeManager nodeManager;
+  private ContainerReplicaPendingOps pendingOpsMock;
 
   @BeforeEach
   public void setUp() throws Exception {
@@ -73,9 +80,10 @@ public class TestContainerManagerImpl {
         new MockPipelineManager(dbStore, scmhaManager, nodeManager);
     pipelineManager.createPipeline(RatisReplicationConfig.getInstance(
         ReplicationFactor.THREE));
+    pendingOpsMock = Mockito.mock(ContainerReplicaPendingOps.class);
     containerManager = new ContainerManagerImpl(conf,
         scmhaManager, sequenceIdGen, pipelineManager,
-        SCMDBDefinition.CONTAINERS.getTable(dbStore));
+        SCMDBDefinition.CONTAINERS.getTable(dbStore), pendingOpsMock);
   }
 
   @AfterEach
@@ -180,4 +188,44 @@ public class TestContainerManagerImpl {
         containerManager.getContainer(admin.containerID()));
   }
 
+  @Test
+  public void testUpdateContainerReplicaInvokesPendingOp() throws IOException {
+    final ContainerInfo container = containerManager.allocateContainer(
+        RatisReplicationConfig.getInstance(
+            ReplicationFactor.THREE), "admin");
+    DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
+    containerManager.updateContainerReplica(container.containerID(),
+        ContainerReplica.newBuilder()
+            .setContainerState(OPEN)
+            .setReplicaIndex(0)
+            .setContainerID(container.containerID())
+            .setDatanodeDetails(dn)
+            .setSequenceId(1)
+            .setBytesUsed(1234)
+            .setKeyCount(123)
+            .build());
+    Mockito.verify(pendingOpsMock, Mockito.times(1))
+        .completeAddReplica(container.containerID(), dn, 0);
+  }
+
+  @Test
+  public void testRemoveContainerReplicaInvokesPendingOp() throws IOException {
+    final ContainerInfo container = containerManager.allocateContainer(
+        RatisReplicationConfig.getInstance(
+            ReplicationFactor.THREE), "admin");
+    DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
+    containerManager.removeContainerReplica(container.containerID(),
+        ContainerReplica.newBuilder()
+            .setContainerState(OPEN)
+            .setReplicaIndex(0)
+            .setContainerID(container.containerID())
+            .setDatanodeDetails(dn)
+            .setSequenceId(1)
+            .setBytesUsed(1234)
+            .setKeyCount(123)
+            .build());
+    Mockito.verify(pendingOpsMock, Mockito.times(1))
+        .completeDeleteReplica(container.containerID(), dn, 0);
+  }
+
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index de85ca68ef..0cc5feb634 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
 import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
@@ -119,6 +120,7 @@ public class TestReplicationManager {
   private DBStore dbStore;
   private PipelineManager pipelineManager;
   private SCMHAManager scmhaManager;
+  private ContainerReplicaPendingOps containerReplicaPendingOps;
 
   @BeforeEach
   public void setup()
@@ -195,6 +197,7 @@ public class TestReplicationManager {
         )).thenAnswer(invocation ->
         new ContainerPlacementStatusDefault(2, 2, 3));
     clock = new TestClock(Instant.now(), ZoneId.of("UTC"));
+    containerReplicaPendingOps = new ContainerReplicaPendingOps(conf, clock);
     createReplicationManager(new ReplicationManagerConfiguration());
   }
 
@@ -225,7 +228,8 @@ public class TestReplicationManager {
         nodeManager,
         clock,
         scmHAManager,
-        SCMDBDefinition.MOVE.getTable(dbStore));
+        SCMDBDefinition.MOVE.getTable(dbStore),
+        containerReplicaPendingOps);
 
     serviceManager.notifyStatusChanged();
     scmLogs.clearOutput();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
new file mode 100644
index 0000000000..98f20f38fb
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.ozone.test.TestClock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.ADD;
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE;
+
+/**
+ * Tests for ContainerReplicaPendingOps.
+ */
+public class TestContainerReplicaPendingOps {
+
+  private ContainerReplicaPendingOps pendingOps;
+  private TestClock clock;
+  private ConfigurationSource config;
+  private DatanodeDetails dn1;
+  private DatanodeDetails dn2;
+  private DatanodeDetails dn3;
+
+  @Before
+  public void setup() {
+    config = new OzoneConfiguration();
+    clock = new TestClock(Instant.now(), ZoneOffset.UTC);
+    pendingOps = new ContainerReplicaPendingOps(config, clock);
+    dn1 = MockDatanodeDetails.randomDatanodeDetails();
+    dn2 = MockDatanodeDetails.randomDatanodeDetails();
+    dn3 = MockDatanodeDetails.randomDatanodeDetails();
+  }
+
+  @Test
+  public void testGetPendingOpsReturnsEmptyList() {
+    List<ContainerReplicaOp> ops =
+        pendingOps.getPendingOps(new ContainerID(1));
+    Assert.assertEquals(0, ops.size());
+  }
+
+  @Test
+  public void testCanAddReplicasForAdd() {
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 0);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0);
+    pendingOps.scheduleAddReplica(new ContainerID(2), dn1, 1);
+
+    List<ContainerReplicaOp> ops =
+        pendingOps.getPendingOps(new ContainerID(1));
+    Assert.assertEquals(3, ops.size());
+    for (ContainerReplicaOp op : ops) {
+      Assert.assertEquals(0, op.getReplicaIndex());
+      Assert.assertEquals(ADD, op.getOpType());
+    }
+    List<DatanodeDetails> allDns = ops.stream()
+        .map(s -> s.getTarget()).collect(Collectors.toList());
+    Assert.assertTrue(allDns.contains(dn1));
+    Assert.assertTrue(allDns.contains(dn2));
+    Assert.assertTrue(allDns.contains(dn3));
+
+    ops = pendingOps.getPendingOps(new ContainerID(2));
+    Assert.assertEquals(1, ops.size());
+    Assert.assertEquals(1, ops.get(0).getReplicaIndex());
+    Assert.assertEquals(ADD, ops.get(0).getOpType());
+    Assert.assertEquals(dn1, ops.get(0).getTarget());
+  }
+
+  @Test
+  public void testCanAddReplicasForDelete() {
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0);
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0);
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn3, 0);
+    pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1);
+
+    List<ContainerReplicaOp> ops =
+        pendingOps.getPendingOps(new ContainerID(1));
+    Assert.assertEquals(3, ops.size());
+    for (ContainerReplicaOp op : ops) {
+      Assert.assertEquals(0, op.getReplicaIndex());
+      Assert.assertEquals(DELETE, op.getOpType());
+    }
+    List<DatanodeDetails> allDns = ops.stream()
+        .map(s -> s.getTarget()).collect(Collectors.toList());
+    Assert.assertTrue(allDns.contains(dn1));
+    Assert.assertTrue(allDns.contains(dn2));
+    Assert.assertTrue(allDns.contains(dn3));
+
+    ops = pendingOps.getPendingOps(new ContainerID(2));
+    Assert.assertEquals(1, ops.size());
+    Assert.assertEquals(1, ops.get(0).getReplicaIndex());
+    Assert.assertEquals(DELETE, ops.get(0).getOpType());
+    Assert.assertEquals(dn1, ops.get(0).getTarget());
+  }
+
+  @Test
+  public void testCompletingOps() {
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0);
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0);
+    pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1);
+
+    List<ContainerReplicaOp> ops =
+        pendingOps.getPendingOps(new ContainerID(1));
+
+    // We expect 4 entries - 2 add and 2 delete.
+    Assert.assertEquals(4, ops.size());
+
+    Assert.assertTrue(pendingOps
+        .completeAddReplica(new ContainerID(1), dn1, 0));
+    ops = pendingOps.getPendingOps(new ContainerID(1));
+    Assert.assertEquals(3, ops.size());
+
+    // Complete one that does not exist:
+    Assert.assertFalse(pendingOps
+        .completeAddReplica(new ContainerID(1), dn1, 0));
+    ops = pendingOps.getPendingOps(new ContainerID(1));
+    Assert.assertEquals(3, ops.size());
+
+    // Complete the remaining ones
+    pendingOps.completeDeleteReplica(new ContainerID(1), dn1, 0);
+    pendingOps.completeDeleteReplica(new ContainerID(1), dn2, 0);
+    pendingOps.completeAddReplica(new ContainerID(1), dn3, 0);
+    ops = pendingOps.getPendingOps(new ContainerID(1));
+    Assert.assertEquals(0, ops.size());
+  }
+
+  @Test
+  public void testRemoveSpecificOp() {
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0);
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0);
+    pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1);
+
+    ContainerID cid = new ContainerID(1);
+    List<ContainerReplicaOp> ops = pendingOps.getPendingOps(cid);
+    Assert.assertEquals(4, ops.size());
+    for (ContainerReplicaOp op : ops) {
+      Assert.assertTrue(pendingOps.removeOp(cid, op));
+    }
+    // Attempt to remove one that no longer exists
+    Assert.assertFalse(pendingOps.removeOp(cid, ops.get(0)));
+    ops = pendingOps.getPendingOps(cid);
+    Assert.assertEquals(0, ops.size());
+  }
+
+  @Test
+  public void testRemoveExpiredEntries() {
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0);
+    clock.fastForward(1000);
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0);
+    clock.fastForward(1000);
+    pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1);
+
+    List<ContainerReplicaOp> ops =
+        pendingOps.getPendingOps(new ContainerID(1));
+    Assert.assertEquals(4, ops.size());
+    ops = pendingOps.getPendingOps(new ContainerID(2));
+    Assert.assertEquals(1, ops.size());
+
+    // Some entries at "start" some at start + 1000 and start + 2000.
+    // Clock is currently at +2000.
+    pendingOps.removeExpiredEntries(2500);
+    // Nothing is remove as nothing is older than the current clock time.
+    ops = pendingOps.getPendingOps(new ContainerID(1));
+    Assert.assertEquals(4, ops.size());
+
+    clock.fastForward(1000);
+    pendingOps.removeExpiredEntries(2500);
+    // Nothing is remove as nothing is older than the current clock time.
+    ops = pendingOps.getPendingOps(new ContainerID(1));
+    Assert.assertEquals(2, ops.size());
+    // We should lose the entries for DN1
+    List<DatanodeDetails> dns = ops.stream()
+        .map(s -> s.getTarget())
+        .collect(Collectors.toList());
+    Assert.assertFalse(dns.contains(dn1));
+    Assert.assertTrue(dns.contains(dn2));
+    Assert.assertTrue(dns.contains(dn3));
+
+    clock.fastForward(1000);
+    pendingOps.removeExpiredEntries(2500);
+
+    // Now should only have entries for container 2
+    ops = pendingOps.getPendingOps(new ContainerID(1));
+    Assert.assertEquals(0, ops.size());
+    ops = pendingOps.getPendingOps(new ContainerID(2));
+    Assert.assertEquals(1, ops.size());
+
+    // Advance the clock again and all should be removed
+    clock.fastForward(1000);
+    pendingOps.removeExpiredEntries(2500);
+    ops = pendingOps.getPendingOps(new ContainerID(2));
+    Assert.assertEquals(0, ops.size());
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 3b25b72187..cf705d3c84 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.node;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.ZoneId;
 import java.util.List;
 import java.util.UUID;
 
@@ -42,6 +43,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
@@ -56,6 +58,7 @@ import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.MonotonicClock;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.ozone.upgrade.LayoutVersionManager;
 import org.apache.hadoop.test.PathUtils;
@@ -162,7 +165,9 @@ public class TestContainerPlacement {
       throws IOException {
     return new ContainerManagerImpl(conf,
         scmhaManager, sequenceIdGen, pipelineManager,
-        SCMDBDefinition.CONTAINERS.getTable(dbStore));
+        SCMDBDefinition.CONTAINERS.getTable(dbStore),
+        new ContainerReplicaPendingOps(
+            conf, new MonotonicClock(ZoneId.systemDefault())));
   }
 
   /**
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
index b8d7be0480..7e5f8d828f 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -93,9 +94,11 @@ public class ReconContainerManager extends ContainerManagerImpl {
       ContainerHealthSchemaManager containerHealthSchemaManager,
       ReconContainerMetadataManager reconContainerMetadataManager,
       SCMHAManager scmhaManager,
-      SequenceIdGenerator sequenceIdGen)
+      SequenceIdGenerator sequenceIdGen,
+      ContainerReplicaPendingOps pendingOps)
       throws IOException {
-    super(conf, scmhaManager, sequenceIdGen, pipelineManager, containerStore);
+    super(conf, scmhaManager, sequenceIdGen, pipelineManager, containerStore,
+        pendingOps);
     this.scmClient = scm;
     this.pipelineManager = pipelineManager;
     this.containerHealthSchemaManager = containerHealthSchemaManager;
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index 07d9f3fd19..941795e551 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.recon.scm;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.time.ZoneId;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
@@ -71,6 +73,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.common.MonotonicClock;
 import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
 import org.apache.hadoop.ozone.recon.ReconUtils;
 import org.apache.hadoop.ozone.recon.fsck.ContainerHealthTask;
@@ -162,12 +165,14 @@ public class ReconStorageContainerManagerFacade
         eventQueue,
         scmhaManager,
         scmContext);
+    ContainerReplicaPendingOps pendingOps = new ContainerReplicaPendingOps(
+        conf, new MonotonicClock(ZoneId.systemDefault()));
     this.containerManager = new ReconContainerManager(conf,
         dbStore,
         ReconSCMDBDefinition.CONTAINERS.getTable(dbStore),
         pipelineManager, scmServiceProvider,
         containerHealthSchemaManager, reconContainerMetadataManager,
-        scmhaManager, sequenceIdGen);
+        scmhaManager, sequenceIdGen, pendingOps);
     this.scmServiceProvider = scmServiceProvider;
 
     NodeReportHandler nodeReportHandler =
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
index f1d342624c..d6a7a13c3e 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.recon.scm;
 
 import java.io.IOException;
+import java.time.ZoneId;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -28,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
 import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBufferStub;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
@@ -44,6 +46,7 @@ import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.common.MonotonicClock;
 import org.apache.hadoop.ozone.recon.ReconUtils;
 import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
 import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
@@ -111,6 +114,8 @@ public class AbstractReconContainerManagerTest {
         eventQueue,
         scmhaManager,
         scmContext);
+    ContainerReplicaPendingOps pendingOps = new ContainerReplicaPendingOps(
+        conf, new MonotonicClock(ZoneId.systemDefault()));
 
     containerManager = new ReconContainerManager(
         conf,
@@ -121,7 +126,8 @@ public class AbstractReconContainerManagerTest {
         mock(ContainerHealthSchemaManager.class),
         mock(ReconContainerMetadataManager.class),
         scmhaManager,
-        sequenceIdGen);
+        sequenceIdGen,
+        pendingOps);
   }
 
   @After


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org