You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2022/05/20 08:29:10 UTC
[ozone] branch master updated: HDDS-6744. EC: ReplicationManager - create ContainerReplicaPendingOps class and integrate with ContainerManager (#3425)
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 78a018814c HDDS-6744. EC: ReplicationManager - create ContainerReplicaPendingOps class and integrate with ContainerManager (#3425)
78a018814c is described below
commit 78a018814c1f3bdca90d16e8246c349e3d0e390b
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Fri May 20 09:29:05 2022 +0100
HDDS-6744. EC: ReplicationManager - create ContainerReplicaPendingOps class and integrate with ContainerManager (#3425)
---
.../hdds/scm/container/ContainerManagerImpl.java | 16 +-
.../container/replication/ContainerReplicaOp.java | 63 ++++++
.../replication/ContainerReplicaPendingOps.java | 227 +++++++++++++++++++++
.../container/replication/ReplicationManager.java | 5 +-
.../hdds/scm/server/StorageContainerManager.java | 11 +-
.../hadoop/hdds/scm/block/TestBlockManager.java | 6 +-
.../container/TestCloseContainerEventHandler.java | 6 +-
.../scm/container/TestContainerManagerImpl.java | 50 ++++-
.../hdds/scm/container/TestReplicationManager.java | 6 +-
.../TestContainerReplicaPendingOps.java | 226 ++++++++++++++++++++
.../hdds/scm/node/TestContainerPlacement.java | 7 +-
.../ozone/recon/scm/ReconContainerManager.java | 7 +-
.../scm/ReconStorageContainerManagerFacade.java | 7 +-
.../scm/AbstractReconContainerManagerTest.java | 8 +-
14 files changed, 632 insertions(+), 13 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index f3f1a8a520..f3e8025450 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -97,6 +98,10 @@ public class ContainerManagerImpl implements ContainerManager {
@SuppressWarnings("java:S2245") // no need for secure random
private final Random random = new Random();
+ // Used to track pending replication and delete for container replicas. In
+ // ContainerManager, we try to remove any replicas we see added or deleted
+ // in case they have been created by replication / delete command
+ private final ContainerReplicaPendingOps containerReplicaPendingOps;
/**
*
@@ -106,7 +111,8 @@ public class ContainerManagerImpl implements ContainerManager {
final SCMHAManager scmHaManager,
final SequenceIdGenerator sequenceIdGen,
final PipelineManager pipelineManager,
- final Table<ContainerID, ContainerInfo> containerStore)
+ final Table<ContainerID, ContainerInfo> containerStore,
+ final ContainerReplicaPendingOps containerReplicaPendingOps)
throws IOException {
// Introduce builder for this class?
this.lock = new ReentrantLock();
@@ -126,6 +132,7 @@ public class ContainerManagerImpl implements ContainerManager {
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create();
+ this.containerReplicaPendingOps = containerReplicaPendingOps;
}
@Override
@@ -320,6 +327,9 @@ public class ContainerManagerImpl implements ContainerManager {
throws ContainerNotFoundException {
if (containerExist(cid)) {
containerStateManager.updateContainerReplica(cid, replica);
+ // Clear any pending additions for this replica as we have now seen it.
+ containerReplicaPendingOps.completeAddReplica(cid,
+ replica.getDatanodeDetails(), replica.getReplicaIndex());
} else {
throwContainerNotFoundException(cid);
}
@@ -331,6 +341,10 @@ public class ContainerManagerImpl implements ContainerManager {
throws ContainerNotFoundException, ContainerReplicaNotFoundException {
if (containerExist(cid)) {
containerStateManager.removeContainerReplica(cid, replica);
+ // Remove any pending delete replication operations for the deleted
+ // replica.
+ containerReplicaPendingOps.completeDeleteReplica(cid,
+ replica.getDatanodeDetails(), replica.getReplicaIndex());
} else {
throwContainerNotFoundException(cid);
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
new file mode 100644
index 0000000000..869b0f0cf2
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+/**
+ * Class to wrap details used to track pending replications.
+ */
+public class ContainerReplicaOp {
+
+ /**
+ * Enum representing different types of pending Ops.
+ */
+ public enum PendingOpType {
+ ADD, DELETE
+ }
+
+ private PendingOpType opType;
+ private DatanodeDetails target;
+ private int replicaIndex;
+ private long scheduledEpochMillis;
+
+ public ContainerReplicaOp(PendingOpType opType,
+ DatanodeDetails target, int replicaIndex, long scheduledTime) {
+ this.opType = opType;
+ this.target = target;
+ this.replicaIndex = replicaIndex;
+ this.scheduledEpochMillis = scheduledTime;
+ }
+
+ public PendingOpType getOpType() {
+ return opType;
+ }
+
+ public DatanodeDetails getTarget() {
+ return target;
+ }
+
+ public int getReplicaIndex() {
+ return replicaIndex;
+ }
+
+ public long getScheduledEpochMillis() {
+ return scheduledEpochMillis;
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
new file mode 100644
index 0000000000..691d5e9dfb
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import com.google.common.util.concurrent.Striped;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.ADD;
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE;
+
+/**
+ * Class to track pending replication operations across the cluster. For
+ * each container with a pending replication or pending delete, there will
+ * be an entry in this class mapping ContainerID to a list of the pending
+ * operations.
+ */
+public class ContainerReplicaPendingOps {
+
+ private final ConfigurationSource config;
+ private final Clock clock;
+ private final ConcurrentHashMap<ContainerID, List<ContainerReplicaOp>>
+ pendingOps = new ConcurrentHashMap<>();
+ private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(64);
+
+ public ContainerReplicaPendingOps(final ConfigurationSource conf,
+ Clock clock) {
+ this.config = conf;
+ this.clock = clock;
+ }
+
+ /**
+ * Get all the ContainerReplicaOp's associated with the given ContainerID.
+ * A new list is created and returned, so it can be modified by the caller,
+ * but any changes will not be reflected in the internal map.
+ * @param containerID The ContainerID for which to retrieve the pending
+ * ops.
+ * @return Standalone list of ContainerReplica or an empty list if none exist.
+ */
+ public List<ContainerReplicaOp> getPendingOps(ContainerID containerID) {
+ Lock lock = readLock(containerID);
+ lock.lock();
+ try {
+ List<ContainerReplicaOp> ops = pendingOps.get(containerID);
+ if (ops == null) {
+ return Collections.emptyList();
+ }
+ return new ArrayList<>(ops);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Store a ContainerReplicaOp to add a replica for the given ContainerID.
+ * @param containerID ContainerID for which to add a replica
+ * @param target The target datanode
+ * @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
+ */
+ public void scheduleAddReplica(ContainerID containerID,
+ DatanodeDetails target, int replicaIndex) {
+ addReplica(ADD, containerID, target, replicaIndex);
+ }
+
+ /**
+ * Store a ContainerReplicaOp to delete a replica for the given ContainerID.
+ * @param containerID ContainerID for which to delete a replica
+ * @param target The target datanode
+ * @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
+ */
+ public void scheduleDeleteReplica(ContainerID containerID,
+ DatanodeDetails target, int replicaIndex) {
+ addReplica(DELETE, containerID, target, replicaIndex);
+ }
+
+ /**
+ * Remove a stored ContainerReplicaOp from the given ContainerID as it has
+ * been replicated successfully.
+ * @param containerID ContainerID for which to complete the replication
+ * @param target The target Datanode
+ * @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
+ * @return True if a pending replica was found and removed, false otherwise.
+ */
+ public boolean completeAddReplica(ContainerID containerID,
+ DatanodeDetails target, int replicaIndex) {
+ return completeOp(ADD, containerID, target, replicaIndex);
+ }
+
+
+ /**
+ * Remove a stored ContainerReplicaOp from the given ContainerID as it has
+ * been deleted successfully.
+ * @param containerID ContainerID for which to complete the deletion
+ * @param target The target Datanode
+ * @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
+ * @return True if a pending replica was found and removed, false otherwise.
+ */
+ public boolean completeDeleteReplica(ContainerID containerID,
+ DatanodeDetails target, int replicaIndex) {
+ return completeOp(DELETE, containerID, target, replicaIndex);
+ }
+
+ /**
+ * Remove a stored pending operation from the given ContainerID.
+ * @param containerID ContainerID for which to remove the op.
+ * @param op ContainerReplicaOp to remove
+ * @return True if an element was found and deleted, false otherwise.
+ */
+ public boolean removeOp(ContainerID containerID,
+ ContainerReplicaOp op) {
+ return completeOp(op.getOpType(), containerID, op.getTarget(),
+ op.getReplicaIndex());
+ }
+
+ /**
+ * Iterate over all pending entries and remove any which have expired, meaning
+ * they have not completed the operation inside the given time.
+ * @param expiryMilliSeconds
+ */
+ public void removeExpiredEntries(long expiryMilliSeconds) {
+ for (ContainerID containerID : pendingOps.keySet()) {
+ // Rather than use an entry set, we get the map entry again. This is
+ // to protect against another thread modifying the value after this
+ // iterator started. Once we lock on the ContainerID object, no other
+ // changes can occur to the list of ops associated with it.
+ Lock lock = writeLock(containerID);
+ lock.lock();
+ try {
+ List<ContainerReplicaOp> ops = pendingOps.get(containerID);
+ if (ops == null) {
+ // There should not be null entries, but another thread may have
+ // removed the map entry after the iterator was started.
+ continue;
+ }
+ Iterator<ContainerReplicaOp> iterator = ops.listIterator();
+ while (iterator.hasNext()) {
+ ContainerReplicaOp op = iterator.next();
+ if (op.getScheduledEpochMillis() + expiryMilliSeconds
+ < clock.millis()) {
+ iterator.remove();
+ }
+ }
+ if (ops.size() == 0) {
+ pendingOps.remove(containerID);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ private void addReplica(ContainerReplicaOp.PendingOpType opType,
+ ContainerID containerID, DatanodeDetails target, int replicaIndex) {
+ Lock lock = writeLock(containerID);
+ lock.lock();
+ try {
+ List<ContainerReplicaOp> ops = pendingOps.computeIfAbsent(
+ containerID, s -> new ArrayList<>());
+ ops.add(new ContainerReplicaOp(opType,
+ target, replicaIndex, clock.millis()));
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private boolean completeOp(ContainerReplicaOp.PendingOpType opType,
+ ContainerID containerID, DatanodeDetails target, int replicaIndex) {
+ boolean found = false;
+ Lock lock = writeLock(containerID);
+ lock.lock();
+ try {
+ List<ContainerReplicaOp> ops = pendingOps.get(containerID);
+ if (ops != null) {
+ Iterator<ContainerReplicaOp> iterator = ops.listIterator();
+ while (iterator.hasNext()) {
+ ContainerReplicaOp op = iterator.next();
+ if (op.getOpType() == opType
+ && op.getTarget().equals(target)
+ && op.getReplicaIndex() == replicaIndex) {
+ found = true;
+ iterator.remove();
+ }
+ }
+ if (ops.size() == 0) {
+ pendingOps.remove(containerID);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ return found;
+ }
+
+ private Lock writeLock(ContainerID containerID) {
+ return stripedLock.get(containerID).writeLock();
+ }
+
+ private Lock readLock(ContainerID containerID) {
+ return stripedLock.get(containerID).readLock();
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index e4020955f7..7ae7d64478 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -124,6 +124,7 @@ public class ReplicationManager implements SCMService {
private final long waitTimeInMillis;
private long lastTimeToBeReadyInMillis = 0;
private final Clock clock;
+ private final ContainerReplicaPendingOps containerReplicaPendingOps;
/**
* Constructs ReplicationManager instance with the given configuration.
@@ -143,7 +144,8 @@ public class ReplicationManager implements SCMService {
final NodeManager nodeManager,
final Clock clock,
final SCMHAManager scmhaManager,
- final Table<ContainerID, MoveDataNodePair> moveTable)
+ final Table<ContainerID, MoveDataNodePair> moveTable,
+ final ContainerReplicaPendingOps replicaPendingOps)
throws IOException {
this.containerManager = containerManager;
this.scmContext = scmContext;
@@ -156,6 +158,7 @@ public class ReplicationManager implements SCMService {
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
TimeUnit.MILLISECONDS);
+ this.containerReplicaPendingOps = replicaPendingOps;
this.legacyReplicationManager = new LegacyReplicationManager(
conf, containerManager, containerPlacement, eventPublisher,
scmContext, nodeManager, scmhaManager, clock, moveTable);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 7c58def298..4353d23b45 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.crl.CRLStatusReportHandler;
import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
@@ -278,6 +279,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private ContainerBalancer containerBalancer;
private StatefulServiceStateManager statefulServiceStateManager;
+ // Used to keep track of pending replication and pending deletes for
+ // container replicas.
+ private ContainerReplicaPendingOps containerReplicaPendingOps;
/**
* Creates a new StorageContainerManager. Configuration will be
@@ -620,11 +624,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
);
}
+ containerReplicaPendingOps = new ContainerReplicaPendingOps(conf, clock);
if (configurator.getContainerManager() != null) {
containerManager = configurator.getContainerManager();
} else {
containerManager = new ContainerManagerImpl(conf, scmHAManager,
- sequenceIdGen, pipelineManager, scmMetadataStore.getContainerTable());
+ sequenceIdGen, pipelineManager, scmMetadataStore.getContainerTable(),
+ containerReplicaPendingOps);
}
pipelineChoosePolicy = PipelineChoosePolicyFactory.getPolicy(conf);
@@ -651,7 +657,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
scmNodeManager,
clock,
scmHAManager,
- getScmMetadataStore().getMoveTable());
+ getScmMetadataStore().getMoveTable(),
+ containerReplicaPendingOps);
}
if (configurator.getScmSafeModeManager() != null) {
scmSafeModeManager = configurator.getScmSafeModeManager();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index 5aa3206c3b..3771fab273 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.block;
import java.io.IOException;
+import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
@@ -159,7 +161,9 @@ public class TestBlockManager {
scmHAManager,
sequenceIdGen,
pipelineManager,
- scmMetadataStore.getContainerTable());
+ scmMetadataStore.getContainerTable(),
+ new ContainerReplicaPendingOps(conf,
+ new MonotonicClock(ZoneId.systemDefault())));
SCMSafeModeManager safeModeManager = new SCMSafeModeManager(conf,
containerManager.getContainers(), containerManager,
pipelineManager, eventQueue, serviceManager, scmContext) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index 06aa8fbd95..7e46c3e2a1 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.container;
import java.io.File;
import java.io.IOException;
+import java.time.ZoneId;
import java.time.ZoneOffset;
import org.apache.hadoop.conf.StorageUnit;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMService.Event;
@@ -116,7 +118,9 @@ public class TestCloseContainerEventHandler {
scmhaManager,
sequenceIdGen,
pipelineManager,
- scmMetadataStore.getContainerTable());
+ scmMetadataStore.getContainerTable(),
+ new ContainerReplicaPendingOps(configuration,
+ new MonotonicClock(ZoneId.systemDefault())));
// trigger BackgroundPipelineCreator to take effect.
serviceManager.notifyEventTriggered(Event.PRE_CHECK_COMPLETED);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
index 644748801d..dbf4974ae1 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.container;
import java.io.File;
+import java.io.IOException;
import java.util.UUID;
import org.apache.hadoop.fs.FileUtil;
@@ -25,8 +26,11 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
@@ -42,7 +46,9 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.OPEN;
/**
@@ -56,6 +62,7 @@ public class TestContainerManagerImpl {
private SCMHAManager scmhaManager;
private SequenceIdGenerator sequenceIdGen;
private NodeManager nodeManager;
+ private ContainerReplicaPendingOps pendingOpsMock;
@BeforeEach
public void setUp() throws Exception {
@@ -73,9 +80,10 @@ public class TestContainerManagerImpl {
new MockPipelineManager(dbStore, scmhaManager, nodeManager);
pipelineManager.createPipeline(RatisReplicationConfig.getInstance(
ReplicationFactor.THREE));
+ pendingOpsMock = Mockito.mock(ContainerReplicaPendingOps.class);
containerManager = new ContainerManagerImpl(conf,
scmhaManager, sequenceIdGen, pipelineManager,
- SCMDBDefinition.CONTAINERS.getTable(dbStore));
+ SCMDBDefinition.CONTAINERS.getTable(dbStore), pendingOpsMock);
}
@AfterEach
@@ -180,4 +188,44 @@ public class TestContainerManagerImpl {
containerManager.getContainer(admin.containerID()));
}
+ @Test
+ public void testUpdateContainerReplicaInvokesPendingOp() throws IOException {
+ final ContainerInfo container = containerManager.allocateContainer(
+ RatisReplicationConfig.getInstance(
+ ReplicationFactor.THREE), "admin");
+ DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
+ containerManager.updateContainerReplica(container.containerID(),
+ ContainerReplica.newBuilder()
+ .setContainerState(OPEN)
+ .setReplicaIndex(0)
+ .setContainerID(container.containerID())
+ .setDatanodeDetails(dn)
+ .setSequenceId(1)
+ .setBytesUsed(1234)
+ .setKeyCount(123)
+ .build());
+ Mockito.verify(pendingOpsMock, Mockito.times(1))
+ .completeAddReplica(container.containerID(), dn, 0);
+ }
+
+ @Test
+ public void testRemoveContainerReplicaInvokesPendingOp() throws IOException {
+ final ContainerInfo container = containerManager.allocateContainer(
+ RatisReplicationConfig.getInstance(
+ ReplicationFactor.THREE), "admin");
+ DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
+ containerManager.removeContainerReplica(container.containerID(),
+ ContainerReplica.newBuilder()
+ .setContainerState(OPEN)
+ .setReplicaIndex(0)
+ .setContainerID(container.containerID())
+ .setDatanodeDetails(dn)
+ .setSequenceId(1)
+ .setBytesUsed(1234)
+ .setKeyCount(123)
+ .build());
+ Mockito.verify(pendingOpsMock, Mockito.times(1))
+ .completeDeleteReplica(container.containerID(), dn, 0);
+ }
+
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index de85ca68ef..0cc5feb634 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
@@ -119,6 +120,7 @@ public class TestReplicationManager {
private DBStore dbStore;
private PipelineManager pipelineManager;
private SCMHAManager scmhaManager;
+ private ContainerReplicaPendingOps containerReplicaPendingOps;
@BeforeEach
public void setup()
@@ -195,6 +197,7 @@ public class TestReplicationManager {
)).thenAnswer(invocation ->
new ContainerPlacementStatusDefault(2, 2, 3));
clock = new TestClock(Instant.now(), ZoneId.of("UTC"));
+ containerReplicaPendingOps = new ContainerReplicaPendingOps(conf, clock);
createReplicationManager(new ReplicationManagerConfiguration());
}
@@ -225,7 +228,8 @@ public class TestReplicationManager {
nodeManager,
clock,
scmHAManager,
- SCMDBDefinition.MOVE.getTable(dbStore));
+ SCMDBDefinition.MOVE.getTable(dbStore),
+ containerReplicaPendingOps);
serviceManager.notifyStatusChanged();
scmLogs.clearOutput();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
new file mode 100644
index 0000000000..98f20f38fb
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.ozone.test.TestClock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.ADD;
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE;
+
+/**
+ * Tests for ContainerReplicaPendingOps.
+ */
+public class TestContainerReplicaPendingOps {
+
+ private ContainerReplicaPendingOps pendingOps;
+ private TestClock clock;
+ private ConfigurationSource config;
+ private DatanodeDetails dn1;
+ private DatanodeDetails dn2;
+ private DatanodeDetails dn3;
+
+ @Before
+ public void setup() {
+ config = new OzoneConfiguration();
+ clock = new TestClock(Instant.now(), ZoneOffset.UTC);
+ pendingOps = new ContainerReplicaPendingOps(config, clock);
+ dn1 = MockDatanodeDetails.randomDatanodeDetails();
+ dn2 = MockDatanodeDetails.randomDatanodeDetails();
+ dn3 = MockDatanodeDetails.randomDatanodeDetails();
+ }
+
+ @Test
+ public void testGetPendingOpsReturnsEmptyList() {
+ List<ContainerReplicaOp> ops =
+ pendingOps.getPendingOps(new ContainerID(1));
+ Assert.assertEquals(0, ops.size());
+ }
+
+ @Test
+ public void testCanAddReplicasForAdd() {
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 0);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0);
+ pendingOps.scheduleAddReplica(new ContainerID(2), dn1, 1);
+
+ List<ContainerReplicaOp> ops =
+ pendingOps.getPendingOps(new ContainerID(1));
+ Assert.assertEquals(3, ops.size());
+ for (ContainerReplicaOp op : ops) {
+ Assert.assertEquals(0, op.getReplicaIndex());
+ Assert.assertEquals(ADD, op.getOpType());
+ }
+ List<DatanodeDetails> allDns = ops.stream()
+ .map(s -> s.getTarget()).collect(Collectors.toList());
+ Assert.assertTrue(allDns.contains(dn1));
+ Assert.assertTrue(allDns.contains(dn2));
+ Assert.assertTrue(allDns.contains(dn3));
+
+ ops = pendingOps.getPendingOps(new ContainerID(2));
+ Assert.assertEquals(1, ops.size());
+ Assert.assertEquals(1, ops.get(0).getReplicaIndex());
+ Assert.assertEquals(ADD, ops.get(0).getOpType());
+ Assert.assertEquals(dn1, ops.get(0).getTarget());
+ }
+
+ @Test
+ public void testCanAddReplicasForDelete() {
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0);
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0);
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn3, 0);
+ pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1);
+
+ List<ContainerReplicaOp> ops =
+ pendingOps.getPendingOps(new ContainerID(1));
+ Assert.assertEquals(3, ops.size());
+ for (ContainerReplicaOp op : ops) {
+ Assert.assertEquals(0, op.getReplicaIndex());
+ Assert.assertEquals(DELETE, op.getOpType());
+ }
+ List<DatanodeDetails> allDns = ops.stream()
+ .map(s -> s.getTarget()).collect(Collectors.toList());
+ Assert.assertTrue(allDns.contains(dn1));
+ Assert.assertTrue(allDns.contains(dn2));
+ Assert.assertTrue(allDns.contains(dn3));
+
+ ops = pendingOps.getPendingOps(new ContainerID(2));
+ Assert.assertEquals(1, ops.size());
+ Assert.assertEquals(1, ops.get(0).getReplicaIndex());
+ Assert.assertEquals(DELETE, ops.get(0).getOpType());
+ Assert.assertEquals(dn1, ops.get(0).getTarget());
+ }
+
+ @Test
+ public void testCompletingOps() {
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0);
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0);
+ pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1);
+
+ List<ContainerReplicaOp> ops =
+ pendingOps.getPendingOps(new ContainerID(1));
+
+ // We expect 4 entries - 2 add and 2 delete.
+ Assert.assertEquals(4, ops.size());
+
+ Assert.assertTrue(pendingOps
+ .completeAddReplica(new ContainerID(1), dn1, 0));
+ ops = pendingOps.getPendingOps(new ContainerID(1));
+ Assert.assertEquals(3, ops.size());
+
+ // Complete one that does not exist:
+ Assert.assertFalse(pendingOps
+ .completeAddReplica(new ContainerID(1), dn1, 0));
+ ops = pendingOps.getPendingOps(new ContainerID(1));
+ Assert.assertEquals(3, ops.size());
+
+ // Complete the remaining ones
+ pendingOps.completeDeleteReplica(new ContainerID(1), dn1, 0);
+ pendingOps.completeDeleteReplica(new ContainerID(1), dn2, 0);
+ pendingOps.completeAddReplica(new ContainerID(1), dn3, 0);
+ ops = pendingOps.getPendingOps(new ContainerID(1));
+ Assert.assertEquals(0, ops.size());
+ }
+
+ @Test
+ public void testRemoveSpecificOp() {
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0);
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0);
+ pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1);
+
+ ContainerID cid = new ContainerID(1);
+ List<ContainerReplicaOp> ops = pendingOps.getPendingOps(cid);
+ Assert.assertEquals(4, ops.size());
+ for (ContainerReplicaOp op : ops) {
+ Assert.assertTrue(pendingOps.removeOp(cid, op));
+ }
+ // Attempt to remove one that no longer exists
+ Assert.assertFalse(pendingOps.removeOp(cid, ops.get(0)));
+ ops = pendingOps.getPendingOps(cid);
+ Assert.assertEquals(0, ops.size());
+ }
+
+ @Test
+ public void testRemoveExpiredEntries() {
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0);
+ clock.fastForward(1000);
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0);
+ clock.fastForward(1000);
+ pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1);
+
+ List<ContainerReplicaOp> ops =
+ pendingOps.getPendingOps(new ContainerID(1));
+ Assert.assertEquals(4, ops.size());
+ ops = pendingOps.getPendingOps(new ContainerID(2));
+ Assert.assertEquals(1, ops.size());
+
+ // Some entries at "start" some at start + 1000 and start + 2000.
+ // Clock is currently at +2000.
+ pendingOps.removeExpiredEntries(2500);
+ // Nothing is remove as nothing is older than the current clock time.
+ ops = pendingOps.getPendingOps(new ContainerID(1));
+ Assert.assertEquals(4, ops.size());
+
+ clock.fastForward(1000);
+ pendingOps.removeExpiredEntries(2500);
+ // Nothing is remove as nothing is older than the current clock time.
+ ops = pendingOps.getPendingOps(new ContainerID(1));
+ Assert.assertEquals(2, ops.size());
+ // We should lose the entries for DN1
+ List<DatanodeDetails> dns = ops.stream()
+ .map(s -> s.getTarget())
+ .collect(Collectors.toList());
+ Assert.assertFalse(dns.contains(dn1));
+ Assert.assertTrue(dns.contains(dn2));
+ Assert.assertTrue(dns.contains(dn3));
+
+ clock.fastForward(1000);
+ pendingOps.removeExpiredEntries(2500);
+
+ // Now should only have entries for container 2
+ ops = pendingOps.getPendingOps(new ContainerID(1));
+ Assert.assertEquals(0, ops.size());
+ ops = pendingOps.getPendingOps(new ContainerID(2));
+ Assert.assertEquals(1, ops.size());
+
+ // Advance the clock again and all should be removed
+ clock.fastForward(1000);
+ pendingOps.removeExpiredEntries(2500);
+ ops = pendingOps.getPendingOps(new ContainerID(2));
+ Assert.assertEquals(0, ops.size());
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 3b25b72187..cf705d3c84 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.node;
import java.io.File;
import java.io.IOException;
+import java.time.ZoneId;
import java.util.List;
import java.util.UUID;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
@@ -56,6 +58,7 @@ import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.MonotonicClock;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.upgrade.LayoutVersionManager;
import org.apache.hadoop.test.PathUtils;
@@ -162,7 +165,9 @@ public class TestContainerPlacement {
throws IOException {
return new ContainerManagerImpl(conf,
scmhaManager, sequenceIdGen, pipelineManager,
- SCMDBDefinition.CONTAINERS.getTable(dbStore));
+ SCMDBDefinition.CONTAINERS.getTable(dbStore),
+ new ContainerReplicaPendingOps(
+ conf, new MonotonicClock(ZoneId.systemDefault())));
}
/**
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
index b8d7be0480..7e5f8d828f 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -93,9 +94,11 @@ public class ReconContainerManager extends ContainerManagerImpl {
ContainerHealthSchemaManager containerHealthSchemaManager,
ReconContainerMetadataManager reconContainerMetadataManager,
SCMHAManager scmhaManager,
- SequenceIdGenerator sequenceIdGen)
+ SequenceIdGenerator sequenceIdGen,
+ ContainerReplicaPendingOps pendingOps)
throws IOException {
- super(conf, scmhaManager, sequenceIdGen, pipelineManager, containerStore);
+ super(conf, scmhaManager, sequenceIdGen, pipelineManager, containerStore,
+ pendingOps);
this.scmClient = scm;
this.pipelineManager = pipelineManager;
this.containerHealthSchemaManager = containerHealthSchemaManager;
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index 07d9f3fd19..941795e551 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.recon.scm;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.time.ZoneId;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
@@ -71,6 +73,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.common.MonotonicClock;
import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.fsck.ContainerHealthTask;
@@ -162,12 +165,14 @@ public class ReconStorageContainerManagerFacade
eventQueue,
scmhaManager,
scmContext);
+ ContainerReplicaPendingOps pendingOps = new ContainerReplicaPendingOps(
+ conf, new MonotonicClock(ZoneId.systemDefault()));
this.containerManager = new ReconContainerManager(conf,
dbStore,
ReconSCMDBDefinition.CONTAINERS.getTable(dbStore),
pipelineManager, scmServiceProvider,
containerHealthSchemaManager, reconContainerMetadataManager,
- scmhaManager, sequenceIdGen);
+ scmhaManager, sequenceIdGen, pendingOps);
this.scmServiceProvider = scmServiceProvider;
NodeReportHandler nodeReportHandler =
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
index f1d342624c..d6a7a13c3e 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.recon.scm;
import java.io.IOException;
+import java.time.ZoneId;
import java.util.LinkedList;
import java.util.List;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.ha.SCMHADBTransactionBufferStub;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
@@ -44,6 +46,7 @@ import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.common.MonotonicClock;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
@@ -111,6 +114,8 @@ public class AbstractReconContainerManagerTest {
eventQueue,
scmhaManager,
scmContext);
+ ContainerReplicaPendingOps pendingOps = new ContainerReplicaPendingOps(
+ conf, new MonotonicClock(ZoneId.systemDefault()));
containerManager = new ReconContainerManager(
conf,
@@ -121,7 +126,8 @@ public class AbstractReconContainerManagerTest {
mock(ContainerHealthSchemaManager.class),
mock(ReconContainerMetadataManager.class),
scmhaManager,
- sequenceIdGen);
+ sequenceIdGen,
+ pendingOps);
}
@After
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org