You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/06/16 16:06:30 UTC
[ozone] branch master updated: HDDS-6829. Limit the no of inflight replication tasks in SCM. (#3482)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 94945aed40 HDDS-6829. Limit the no of inflight replication tasks in SCM. (#3482)
94945aed40 is described below
commit 94945aed404216e17cd4560566100d5a90497fd8
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Jun 16 09:06:24 2022 -0700
HDDS-6829. Limit the no of inflight replication tasks in SCM. (#3482)
---
.../scm/container/replication/InflightType.java | 23 ++
.../replication/LegacyReplicationManager.java | 314 ++++++++++++++++-----
.../container/replication/ReplicationManager.java | 8 -
.../replication/ReplicationManagerMetrics.java | 48 +++-
.../{ => replication}/TestReplicationManager.java | 151 +++++++---
.../replication/TestReplicationManagerMetrics.java | 6 +
6 files changed, 423 insertions(+), 127 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InflightType.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InflightType.java
new file mode 100644
index 0000000000..acd859f67a
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InflightType.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+enum InflightType {
+ REPLICATION,
+ DELETION
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index 47ac3d3541..37d01736b9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -81,10 +81,12 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -102,6 +104,110 @@ public class LegacyReplicationManager {
public static final Logger LOG =
LoggerFactory.getLogger(LegacyReplicationManager.class);
+ static class InflightMap {
+ private final Map<ContainerID, List<InflightAction>> map
+ = new ConcurrentHashMap<>();
+ private final InflightType type;
+ private final int sizeLimit;
+ private final AtomicInteger inflightCount = new AtomicInteger();
+
+ InflightMap(InflightType type, int sizeLimit) {
+ this.type = type;
+ this.sizeLimit = sizeLimit > 0 ? sizeLimit : Integer.MAX_VALUE;
+ }
+
+ boolean isReplication() {
+ return type == InflightType.REPLICATION;
+ }
+
+ private List<InflightAction> get(ContainerID id) {
+ return map.get(id);
+ }
+
+ boolean containsKey(ContainerID id) {
+ return map.containsKey(id);
+ }
+
+ int inflightActionCount(ContainerID id) {
+ return Optional.ofNullable(map.get(id)).map(List::size).orElse(0);
+ }
+
+ int containerCount() {
+ return map.size();
+ }
+
+ boolean isFull() {
+ return inflightCount.get() >= sizeLimit;
+ }
+
+ void clear() {
+ map.clear();
+ }
+
+ void iterate(ContainerID id, Predicate<InflightAction> processor) {
+ for (; ;) {
+ final List<InflightAction> actions = get(id);
+ if (actions == null) {
+ return;
+ }
+ synchronized (actions) {
+ if (get(id) != actions) {
+ continue; //actions is changed, retry
+ }
+ for (Iterator<InflightAction> i = actions.iterator(); i.hasNext();) {
+ final boolean remove = processor.test(i.next());
+ if (remove) {
+ i.remove();
+ inflightCount.decrementAndGet();
+ }
+ }
+ map.computeIfPresent(id,
+ (k, v) -> v == actions && v.isEmpty() ? null : v);
+ return;
+ }
+ }
+ }
+
+ boolean add(ContainerID id, InflightAction a) {
+ final int previous = inflightCount.getAndUpdate(
+ n -> n < sizeLimit ? n + 1 : n);
+ if (previous >= sizeLimit) {
+ return false;
+ }
+ for (; ;) {
+ final List<InflightAction> actions = map.computeIfAbsent(id,
+ key -> new LinkedList<>());
+ synchronized (actions) {
+ if (get(id) != actions) {
+ continue; //actions is changed, retry
+ }
+ final boolean added = actions.add(a);
+ if (!added) {
+ inflightCount.decrementAndGet();
+ }
+ return added;
+ }
+ }
+ }
+
+ List<DatanodeDetails> getDatanodeDetails(ContainerID id) {
+ for (; ;) {
+ final List<InflightAction> actions = get(id);
+ if (actions == null) {
+ return Collections.emptyList();
+ }
+ synchronized (actions) {
+ if (get(id) != actions) {
+ continue; //actions is changed, retry
+ }
+ return actions.stream()
+ .map(InflightAction::getDatanode)
+ .collect(Collectors.toList());
+ }
+ }
+ }
+ }
+
/**
* Reference to the ContainerManager.
*/
@@ -132,13 +238,13 @@ public class LegacyReplicationManager {
* This is used for tracking container replication commands which are issued
* by ReplicationManager and not yet complete.
*/
- private final Map<ContainerID, List<InflightAction>> inflightReplication;
+ private final InflightMap inflightReplication;
/**
* This is used for tracking container deletion commands which are issued
* by ReplicationManager and not yet complete.
*/
- private final Map<ContainerID, List<InflightAction>> inflightDeletion;
+ private final InflightMap inflightDeletion;
/**
@@ -252,8 +358,10 @@ public class LegacyReplicationManager {
this.scmContext = scmContext;
this.nodeManager = nodeManager;
this.rmConf = conf.getObject(ReplicationManagerConfiguration.class);
- this.inflightReplication = new ConcurrentHashMap<>();
- this.inflightDeletion = new ConcurrentHashMap<>();
+ this.inflightReplication = new InflightMap(InflightType.REPLICATION,
+ rmConf.getContainerInflightReplicationLimit());
+ this.inflightDeletion = new InflightMap(InflightType.DELETION,
+ rmConf.getContainerInflightDeletionLimit());
this.inflightMoveFuture = new ConcurrentHashMap<>();
this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum();
this.clock = clock;
@@ -427,8 +535,10 @@ public class LegacyReplicationManager {
container.containerID());
}
- handleUnderReplicatedContainer(container,
- replicaSet, placementStatus);
+ if (!inflightReplication.isFull() || !inflightDeletion.isFull()) {
+ handleUnderReplicatedContainer(container,
+ replicaSet, placementStatus);
+ }
return;
}
@@ -485,47 +595,49 @@ public class LegacyReplicationManager {
* @param completedCounter update completed metrics
*/
private void updateInflightAction(final ContainerInfo container,
- final Map<ContainerID, List<InflightAction>> inflightActions,
+ final InflightMap inflightActions,
final Predicate<InflightAction> filter,
final Runnable timeoutCounter,
final Consumer<InflightAction> completedCounter) {
final ContainerID id = container.containerID();
final long deadline = clock.millis() - rmConf.getEventTimeout();
- if (inflightActions.containsKey(id)) {
- final List<InflightAction> actions = inflightActions.get(id);
-
- Iterator<InflightAction> iter = actions.iterator();
- while (iter.hasNext()) {
- try {
- InflightAction a = iter.next();
- NodeStatus status = nodeManager.getNodeStatus(a.getDatanode());
- boolean isUnhealthy = status.getHealth() != NodeState.HEALTHY;
- boolean isCompleted = filter.test(a);
- boolean isTimeout = a.getTime() < deadline;
- boolean isNotInService = status.getOperationalState() !=
- NodeOperationalState.IN_SERVICE;
- if (isCompleted || isUnhealthy || isTimeout || isNotInService) {
- iter.remove();
-
- if (isTimeout) {
- timeoutCounter.run();
- } else if (isCompleted) {
- completedCounter.accept(a);
- }
+ inflightActions.iterate(id, a -> updateInflightAction(
+ container, a, filter, timeoutCounter, completedCounter,
+ deadline, inflightActions.isReplication()));
+ }
- updateMoveIfNeeded(isUnhealthy, isCompleted, isTimeout,
- isNotInService, container, a.getDatanode(), inflightActions);
- }
- } catch (NodeNotFoundException | ContainerNotFoundException e) {
- // Should not happen, but if it does, just remove the action as the
- // node somehow does not exist;
- iter.remove();
+ private boolean updateInflightAction(final ContainerInfo container,
+ final InflightAction a,
+ final Predicate<InflightAction> filter,
+ final Runnable timeoutCounter,
+ final Consumer<InflightAction> completedCounter,
+ final long deadline,
+ final boolean isReplication) {
+ boolean remove = false;
+ try {
+ final NodeStatus status = nodeManager.getNodeStatus(a.getDatanode());
+ final boolean isUnhealthy = status.getHealth() != NodeState.HEALTHY;
+ final boolean isCompleted = filter.test(a);
+ final boolean isTimeout = a.getTime() < deadline;
+ final boolean isNotInService = status.getOperationalState() !=
+ NodeOperationalState.IN_SERVICE;
+ if (isCompleted || isUnhealthy || isTimeout || isNotInService) {
+ if (isTimeout) {
+ timeoutCounter.run();
+ } else if (isCompleted) {
+ completedCounter.accept(a);
}
+
+ updateMoveIfNeeded(isUnhealthy, isCompleted, isTimeout,
+ isNotInService, container, a.getDatanode(), isReplication);
+ remove = true;
}
- if (actions.isEmpty()) {
- inflightActions.remove(id);
- }
+ } catch (NodeNotFoundException | ContainerNotFoundException e) {
+ // Should not happen, but if it does, just remove the action as the
+ // node somehow does not exist;
+ remove = true;
}
+ return remove;
}
/**
@@ -536,14 +648,13 @@ public class LegacyReplicationManager {
* @param isTimeout is the action timeout
* @param container Container to update
* @param dn datanode which is removed from the inflightActions
- * @param inflightActions inflightReplication (or) inflightDeletion
+ * @param isInflightReplication is inflightReplication?
*/
private void updateMoveIfNeeded(final boolean isUnhealthy,
final boolean isCompleted, final boolean isTimeout,
final boolean isNotInService,
final ContainerInfo container, final DatanodeDetails dn,
- final Map<ContainerID,
- List<InflightAction>> inflightActions)
+ final boolean isInflightReplication)
throws ContainerNotFoundException {
// make sure inflightMove contains the container
ContainerID id = container.containerID();
@@ -559,8 +670,6 @@ public class LegacyReplicationManager {
if (!isSource && !isTarget) {
return;
}
- final boolean isInflightReplication =
- inflightActions.equals(inflightReplication);
/*
* there are some case:
@@ -806,7 +915,7 @@ public class LegacyReplicationManager {
* @return The number of inflight additions or zero if none
*/
private int getInflightAdd(final ContainerID id) {
- return inflightReplication.getOrDefault(id, Collections.emptyList()).size();
+ return inflightReplication.inflightActionCount(id);
}
/**
@@ -816,7 +925,7 @@ public class LegacyReplicationManager {
* @return The number of inflight deletes or zero if none
*/
private int getInflightDel(final ContainerID id) {
- return inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
+ return inflightDeletion.inflightActionCount(id);
}
/**
@@ -953,11 +1062,8 @@ public class LegacyReplicationManager {
LOG.debug("Container {} state changes to DELETED", container);
} else {
// Check whether to resend the delete replica command
- final List<DatanodeDetails> deletionInFlight = inflightDeletion
- .getOrDefault(container.containerID(), Collections.emptyList())
- .stream()
- .map(action -> action.getDatanode())
- .collect(Collectors.toList());
+ final List<DatanodeDetails> deletionInFlight
+ = inflightDeletion.getDatanodeDetails(container.containerID());
Set<ContainerReplica> filteredReplicas = replicas.stream().filter(
r -> !deletionInFlight.contains(r.getDatanodeDetails()))
.collect(Collectors.toSet());
@@ -1032,16 +1138,10 @@ public class LegacyReplicationManager {
}
int repDelta = replicaSet.additionalReplicaNeeded();
final ContainerID id = container.containerID();
- final List<DatanodeDetails> deletionInFlight = inflightDeletion
- .getOrDefault(id, Collections.emptyList())
- .stream()
- .map(action -> action.getDatanode())
- .collect(Collectors.toList());
- final List<DatanodeDetails> replicationInFlight = inflightReplication
- .getOrDefault(id, Collections.emptyList())
- .stream()
- .map(action -> action.getDatanode())
- .collect(Collectors.toList());
+ final List<DatanodeDetails> deletionInFlight
+ = inflightDeletion.getDatanodeDetails(id);
+ final List<DatanodeDetails> replicationInFlight
+ = inflightReplication.getDatanodeDetails(id);
final List<DatanodeDetails> source = replicas.stream()
.filter(r ->
r.getState() == State.QUASI_CLOSED ||
@@ -1431,6 +1531,15 @@ public class LegacyReplicationManager {
return ""; // unit test
}
+ private boolean addInflight(InflightType type, ContainerID id,
+ InflightAction action) {
+ final boolean added = getInflightMap(type).add(id, action);
+ if (!added) {
+ metrics.incrInflightSkipped(type);
+ }
+ return added;
+ }
+
/**
* Sends replicate container command for the given container to the given
* datanode.
@@ -1450,12 +1559,13 @@ public class LegacyReplicationManager {
final ContainerID id = container.containerID();
final ReplicateContainerCommand replicateCommand =
new ReplicateContainerCommand(id.getId(), sources);
- inflightReplication.computeIfAbsent(id, k -> new ArrayList<>());
- sendAndTrackDatanodeCommand(datanode, replicateCommand,
- action -> inflightReplication.get(id).add(action));
+ final boolean sent = sendAndTrackDatanodeCommand(datanode, replicateCommand,
+ action -> addInflight(InflightType.REPLICATION, id, action));
- metrics.incrNumReplicationCmdsSent();
- metrics.incrNumReplicationBytesTotal(container.getUsedBytes());
+ if (sent) {
+ metrics.incrNumReplicationCmdsSent();
+ metrics.incrNumReplicationBytesTotal(container.getUsedBytes());
+ }
}
/**
@@ -1476,12 +1586,13 @@ public class LegacyReplicationManager {
final ContainerID id = container.containerID();
final DeleteContainerCommand deleteCommand =
new DeleteContainerCommand(id.getId(), force);
- inflightDeletion.computeIfAbsent(id, k -> new ArrayList<>());
- sendAndTrackDatanodeCommand(datanode, deleteCommand,
- action -> inflightDeletion.get(id).add(action));
+ final boolean sent = sendAndTrackDatanodeCommand(datanode, deleteCommand,
+ action -> addInflight(InflightType.DELETION, id, action));
- metrics.incrNumDeletionCmdsSent();
- metrics.incrNumDeletionBytesTotal(container.getUsedBytes());
+ if (sent) {
+ metrics.incrNumDeletionCmdsSent();
+ metrics.incrNumDeletionBytesTotal(container.getUsedBytes());
+ }
}
/**
@@ -1495,21 +1606,26 @@ public class LegacyReplicationManager {
* @param tracker Tracker which tracks the inflight actions
* @param <T> Type of SCMCommand
*/
- private <T extends GeneratedMessage> void sendAndTrackDatanodeCommand(
+ private <T extends GeneratedMessage> boolean sendAndTrackDatanodeCommand(
final DatanodeDetails datanode,
final SCMCommand<T> command,
- final Consumer<InflightAction> tracker) {
+ final Predicate<InflightAction> tracker) {
try {
command.setTerm(scmContext.getTermOfLeader());
} catch (NotLeaderException nle) {
LOG.warn("Skip sending datanode command,"
+ " since current SCM is not leader.", nle);
- return;
+ return false;
+ }
+ final boolean allowed = tracker.test(
+ new InflightAction(datanode, clock.millis()));
+ if (!allowed) {
+ return false;
}
final CommandForDatanode<T> datanodeCommand =
new CommandForDatanode<>(datanode.getUuid(), command);
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
- tracker.accept(new InflightAction(datanode, clock.millis()));
+ return true;
}
/**
@@ -1626,10 +1742,44 @@ public class LegacyReplicationManager {
" entering maintenance state until a new replica is created.")
private int maintenanceReplicaMinimum = 2;
+ @Config(key = "container.inflight.replication.limit",
+ type = ConfigType.INT,
+ defaultValue = "0", // 0 means unlimited.
+ tags = {SCM, OZONE},
+ description = "This property is used to limit" +
+ " the maximum number of inflight replication."
+ )
+ private int containerInflightReplicationLimit = 0;
+
+ @Config(key = "container.inflight.deletion.limit",
+ type = ConfigType.INT,
+ defaultValue = "0", // 0 means unlimited.
+ tags = {SCM, OZONE},
+ description = "This property is used to limit" +
+ " the maximum number of inflight deletion."
+ )
+ private int containerInflightDeletionLimit = 0;
+
+ public void setContainerInflightReplicationLimit(int replicationLimit) {
+ this.containerInflightReplicationLimit = replicationLimit;
+ }
+
+ public void setContainerInflightDeletionLimit(int deletionLimit) {
+ this.containerInflightDeletionLimit = deletionLimit;
+ }
+
public void setMaintenanceReplicaMinimum(int replicaCount) {
this.maintenanceReplicaMinimum = replicaCount;
}
+ public int getContainerInflightReplicationLimit() {
+ return containerInflightReplicationLimit;
+ }
+
+ public int getContainerInflightDeletionLimit() {
+ return containerInflightDeletionLimit;
+ }
+
public long getInterval() {
return interval;
}
@@ -1649,12 +1799,20 @@ public class LegacyReplicationManager {
onLeaderReadyAndOutOfSafeMode();
}
- public Map<ContainerID, List<InflightAction>> getInflightReplication() {
- return inflightReplication;
+ private InflightMap getInflightMap(InflightType type) {
+ switch (type) {
+ case REPLICATION: return inflightReplication;
+ case DELETION: return inflightDeletion;
+ default: throw new IllegalStateException("Unexpected type " + type);
+ }
+ }
+
+ int getInflightCount(InflightType type) {
+ return getInflightMap(type).containerCount();
}
- public Map<ContainerID, List<InflightAction>> getInflightDeletion() {
- return inflightDeletion;
+ DatanodeDetails getFirstDatanode(InflightType type, ContainerID id) {
+ return getInflightMap(type).get(id).get(0).getDatanode();
}
public Map<ContainerID, CompletableFuture<MoveResult>> getInflightMove() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 7ae7d64478..d0811ce2e2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -422,14 +422,6 @@ public class ReplicationManager implements SCMService {
return legacyReplicationManager.move(cid, src, tgt);
}
- public Map<ContainerID, List<InflightAction>> getInflightReplication() {
- return legacyReplicationManager.getInflightReplication();
- }
-
- public Map<ContainerID, List<InflightAction>> getInflightDeletion() {
- return legacyReplicationManager.getInflightDeletion();
- }
-
public Map<ContainerID,
CompletableFuture<LegacyReplicationManager.MoveResult>>
getInflightMove() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java
index ef17b587d7..42ba4a0cdf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java
@@ -52,10 +52,21 @@ public final class ReplicationManagerMetrics implements MetricsSource {
"InflightReplication",
"Tracked inflight container replication requests.");
+ private static final MetricsInfo INFLIGHT_REPLICATION_SKIPPED = Interns.info(
+ "InflightReplicationSkipped",
+ "Tracked inflight container replication requests skipped" +
+ " due to the configured limit.");
+
private static final MetricsInfo INFLIGHT_DELETION = Interns.info(
"InflightDeletion",
"Tracked inflight container deletion requests.");
+ private static final MetricsInfo INFLIGHT_DELETION_SKIPPED = Interns.info(
+ "InflightDeletionSkipped",
+ "Tracked inflight container deletion requests skipped" +
+ " due to the configured limit.");
+
+
private static final MetricsInfo INFLIGHT_MOVE = Interns.info(
"InflightMove",
"Tracked inflight container move requests.");
@@ -118,6 +129,14 @@ public final class ReplicationManagerMetrics implements MetricsSource {
@Metric("Time elapsed for deletion")
private MutableRate deletionTime;
+ @Metric("Number of inflight replication skipped" +
+ " due to the configured limit.")
+ private MutableCounterLong numInflightReplicationSkipped;
+
+ @Metric("Number of inflight replication skipped" +
+ " due to the configured limit.")
+ private MutableCounterLong numInflightDeletionSkipped;
+
private MetricsRegistry registry;
private ReplicationManager replicationManager;
@@ -138,7 +157,9 @@ public final class ReplicationManagerMetrics implements MetricsSource {
public void getMetrics(MetricsCollector collector, boolean all) {
MetricsRecordBuilder builder = collector.addRecord(METRICS_SOURCE_NAME)
.addGauge(INFLIGHT_REPLICATION, getInflightReplication())
+ .addGauge(INFLIGHT_REPLICATION_SKIPPED, getInflightReplicationSkipped())
.addGauge(INFLIGHT_DELETION, getInflightDeletion())
+ .addGauge(INFLIGHT_DELETION_SKIPPED, getInflightDeletionSkipped())
.addGauge(INFLIGHT_MOVE, getInflightMove());
ReplicationManagerReport report = replicationManager.getContainerReport();
@@ -217,12 +238,35 @@ public final class ReplicationManagerMetrics implements MetricsSource {
this.deletionTime.add(millis);
}
+ public void incrInflightSkipped(InflightType type) {
+ switch (type) {
+ case REPLICATION:
+ this.numInflightReplicationSkipped.incr();
+ return;
+ case DELETION:
+ this.numInflightDeletionSkipped.incr();
+ return;
+ default:
+ throw new IllegalArgumentException("Unexpected type " + type);
+ }
+ }
+
public long getInflightReplication() {
- return replicationManager.getInflightReplication().size();
+ return replicationManager.getLegacyReplicationManager()
+ .getInflightCount(InflightType.REPLICATION);
+ }
+
+ public long getInflightReplicationSkipped() {
+ return this.numInflightReplicationSkipped.value();
}
public long getInflightDeletion() {
- return replicationManager.getInflightDeletion().size();
+ return replicationManager.getLegacyReplicationManager()
+ .getInflightCount(InflightType.DELETION);
+ }
+
+ public long getInflightDeletionSkipped() {
+ return this.numInflightDeletionSkipped.value();
}
public long getInflightMove() {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
similarity index 94%
rename from hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
rename to hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 0cc5feb634..38ece4992d 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.hdds.scm.container;
+package org.apache.hadoop.hdds.scm.container.replication;
import com.google.common.primitives.Longs;
import org.apache.commons.io.FileUtils;
@@ -31,9 +31,17 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
-import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
+import org.apache.hadoop.hdds.scm.container.ContainerStateManagerImpl;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.SimpleMockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
@@ -77,6 +85,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -122,6 +131,11 @@ public class TestReplicationManager {
private SCMHAManager scmhaManager;
private ContainerReplicaPendingOps containerReplicaPendingOps;
+ int getInflightCount(InflightType type) {
+ return replicationManager.getLegacyReplicationManager()
+ .getInflightCount(type);
+ }
+
@BeforeEach
public void setup()
throws IOException, InterruptedException,
@@ -201,8 +215,31 @@ public class TestReplicationManager {
createReplicationManager(new ReplicationManagerConfiguration());
}
+ void createReplicationManager(int replicationLimit, int deletionLimit)
+ throws Exception {
+ replicationManager.stop();
+ dbStore.close();
+ final LegacyReplicationManager.ReplicationManagerConfiguration conf
+ = new LegacyReplicationManager.ReplicationManagerConfiguration();
+ conf.setContainerInflightReplicationLimit(replicationLimit);
+ conf.setContainerInflightDeletionLimit(deletionLimit);
+ createReplicationManager(conf);
+ }
+
+ void createReplicationManager(
+ LegacyReplicationManager.ReplicationManagerConfiguration conf)
+ throws Exception {
+ createReplicationManager(null, conf);
+ }
+
private void createReplicationManager(ReplicationManagerConfiguration rmConf)
throws InterruptedException, IOException {
+ createReplicationManager(rmConf, null);
+ }
+
+ void createReplicationManager(ReplicationManagerConfiguration rmConf,
+ LegacyReplicationManager.ReplicationManagerConfiguration lrmConf)
+ throws InterruptedException, IOException {
OzoneConfiguration config = new OzoneConfiguration();
testDir = GenericTestUtils
.getTestDir(TestContainerManagerImpl.class.getSimpleName());
@@ -211,7 +248,8 @@ public class TestReplicationManager {
config.setTimeDuration(
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
0, TimeUnit.SECONDS);
- config.setFromObject(rmConf);
+ Optional.ofNullable(rmConf).ifPresent(config::setFromObject);
+ Optional.ofNullable(lrmConf).ifPresent(config::setFromObject);
SCMHAManager scmHAManager = SCMHAManagerStub
.getInstance(true, new SCMDBTransactionBufferImpl());
@@ -474,8 +512,7 @@ public class TestReplicationManager {
replicationManager.getMetrics().getNumReplicationCmdsSent());
Assertions.assertEquals(currentBytesToReplicate + 100L,
replicationManager.getMetrics().getNumReplicationBytesTotal());
- Assertions.assertEquals(1,
- replicationManager.getInflightReplication().size());
+ Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightReplication());
@@ -488,8 +525,8 @@ public class TestReplicationManager {
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
// Now we add the missing replica back
- DatanodeDetails targetDn = replicationManager.getInflightReplication()
- .get(id).get(0).getDatanode();
+ DatanodeDetails targetDn = replicationManager.getLegacyReplicationManager()
+ .getFirstDatanode(InflightType.REPLICATION, id);
final ContainerReplica replicatedReplicaOne = getReplicas(
id, State.CLOSED, 1000L, originNodeId, targetDn);
containerStateManager.updateContainerReplica(
@@ -503,8 +540,7 @@ public class TestReplicationManager {
replicationManager.processAll();
eventQueue.processAll(1000);
- Assertions.assertEquals(0,
- replicationManager.getInflightReplication().size());
+ Assertions.assertEquals(0, getInflightCount(InflightType.REPLICATION));
Assertions.assertEquals(0, replicationManager.getMetrics()
.getInflightReplication());
Assertions.assertEquals(currentReplicationCommandCompleted + 1,
@@ -556,7 +592,7 @@ public class TestReplicationManager {
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assertions.assertEquals(currentDeleteCommandCount + 1,
replicationManager.getMetrics().getNumDeletionCmdsSent());
- Assertions.assertEquals(1, replicationManager.getInflightDeletion().size());
+ Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightDeletion());
@@ -568,8 +604,8 @@ public class TestReplicationManager {
ReplicationManagerReport.HealthState.OVER_REPLICATED));
// Now we remove the replica according to inflight
- DatanodeDetails targetDn = replicationManager.getInflightDeletion()
- .get(id).get(0).getDatanode();
+ DatanodeDetails targetDn = replicationManager.getLegacyReplicationManager()
+ .getFirstDatanode(InflightType.DELETION, id);
if (targetDn.equals(replicaOne.getDatanodeDetails())) {
containerStateManager.removeContainerReplica(
id, replicaOne);
@@ -591,7 +627,7 @@ public class TestReplicationManager {
replicationManager.processAll();
eventQueue.processAll(1000);
- Assertions.assertEquals(0, replicationManager.getInflightDeletion().size());
+ Assertions.assertEquals(0, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(0, replicationManager.getMetrics()
.getInflightDeletion());
Assertions.assertEquals(currentDeleteCommandCompleted + 1,
@@ -648,7 +684,7 @@ public class TestReplicationManager {
replicaOne.getDatanodeDetails()));
Assertions.assertEquals(currentDeleteCommandCount + 1,
replicationManager.getMetrics().getNumDeletionCmdsSent());
- Assertions.assertEquals(1, replicationManager.getInflightDeletion().size());
+ Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightDeletion());
@@ -669,7 +705,7 @@ public class TestReplicationManager {
Assertions.assertEquals(currentDeleteCommandCompleted + 1,
replicationManager.getMetrics().getNumDeletionCmdsCompleted());
- Assertions.assertEquals(0, replicationManager.getInflightDeletion().size());
+ Assertions.assertEquals(0, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(0, replicationManager.getMetrics()
.getInflightDeletion());
@@ -714,8 +750,7 @@ public class TestReplicationManager {
replicationManager.getMetrics().getNumReplicationCmdsSent());
Assertions.assertEquals(currentBytesToReplicate + 100,
replicationManager.getMetrics().getNumReplicationBytesTotal());
- Assertions.assertEquals(1,
- replicationManager.getInflightReplication().size());
+ Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightReplication());
@@ -732,8 +767,8 @@ public class TestReplicationManager {
.getMetrics().getNumReplicationBytesCompleted();
// Now we add the replicated new replica
- DatanodeDetails targetDn = replicationManager.getInflightReplication()
- .get(id).get(0).getDatanode();
+ DatanodeDetails targetDn = replicationManager.getLegacyReplicationManager()
+ .getFirstDatanode(InflightType.REPLICATION, id);
final ContainerReplica replicatedReplicaThree = getReplicas(
id, State.CLOSED, 1000L, originNodeId, targetDn);
containerStateManager.updateContainerReplica(
@@ -746,8 +781,7 @@ public class TestReplicationManager {
replicationManager.getMetrics().getNumReplicationCmdsCompleted());
Assertions.assertEquals(currentReplicateBytesCompleted + 100,
replicationManager.getMetrics().getNumReplicationBytesCompleted());
- Assertions.assertEquals(0,
- replicationManager.getInflightReplication().size());
+ Assertions.assertEquals(0, getInflightCount(InflightType.REPLICATION));
Assertions.assertEquals(0, replicationManager.getMetrics()
.getInflightReplication());
@@ -849,8 +883,7 @@ public class TestReplicationManager {
replicationManager.getMetrics().getNumDeletionCmdsSent());
Assertions.assertEquals(currentBytesToDelete + 99,
replicationManager.getMetrics().getNumDeletionBytesTotal());
- Assertions.assertEquals(1,
- replicationManager.getInflightDeletion().size());
+ Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightDeletion());
@@ -876,7 +909,7 @@ public class TestReplicationManager {
replicationManager.processAll();
eventQueue.processAll(1000);
- Assertions.assertEquals(0, replicationManager.getInflightDeletion().size());
+ Assertions.assertEquals(0, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(0, replicationManager.getMetrics()
.getInflightDeletion());
Assertions.assertEquals(currentDeleteCommandCompleted + 1,
@@ -887,8 +920,7 @@ public class TestReplicationManager {
SCMCommandProto.Type.replicateContainerCommand));
Assertions.assertEquals(currentReplicateCommandCount + 2,
replicationManager.getMetrics().getNumReplicationCmdsSent());
- Assertions.assertEquals(1,
- replicationManager.getInflightReplication().size());
+ Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightReplication());
@@ -1082,8 +1114,7 @@ public class TestReplicationManager {
replicationManager.getMetrics().getNumReplicationCmdsSent());
Assertions.assertEquals(currentBytesToReplicate + 100,
replicationManager.getMetrics().getNumReplicationBytesTotal());
- Assertions.assertEquals(1,
- replicationManager.getInflightReplication().size());
+ Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightReplication());
@@ -1114,8 +1145,7 @@ public class TestReplicationManager {
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand));
Assertions.assertEquals(currentReplicateCommandCount,
replicationManager.getMetrics().getNumReplicationCmdsSent());
- Assertions.assertEquals(1,
- replicationManager.getInflightReplication().size());
+ Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightReplication());
}
@@ -1170,7 +1200,7 @@ public class TestReplicationManager {
Assertions.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaFive.getDatanodeDetails()));
- Assertions.assertEquals(1, replicationManager.getInflightDeletion().size());
+ Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightDeletion());
assertOverReplicatedCount(1);
@@ -1213,7 +1243,7 @@ public class TestReplicationManager {
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assertions.assertEquals(currentDeleteCommandCount + 1,
replicationManager.getMetrics().getNumDeletionCmdsSent());
- Assertions.assertEquals(1, replicationManager.getInflightDeletion().size());
+ Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightDeletion());
@@ -1261,7 +1291,7 @@ public class TestReplicationManager {
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assertions.assertEquals(currentDeleteCommandCount + 2,
replicationManager.getMetrics().getNumDeletionCmdsSent());
- Assertions.assertEquals(1, replicationManager.getInflightDeletion().size());
+ Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightDeletion());
}
@@ -1286,12 +1316,44 @@ public class TestReplicationManager {
*/
@Test
public void testUnderReplicatedDueToAllDecommission() throws IOException {
+ runTestUnderReplicatedDueToAllDecommission(3);
+ }
+
+ Void runTestUnderReplicatedDueToAllDecommission(int expectedReplication)
+ throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
- assertReplicaScheduled(3);
+ assertReplicaScheduled(expectedReplication);
assertUnderReplicatedCount(1);
+ return null;
+ }
+
+ @Test
+ public void testReplicationLimit() throws Exception {
+ runTestLimit(1, 0, 2, 0,
+ () -> runTestUnderReplicatedDueToAllDecommission(1));
+ }
+
+ void runTestLimit(int replicationLimit, int deletionLimit,
+ int expectedReplicationSkipped, int expectedDeletionSkipped,
+ Callable<Void> testcase) throws Exception {
+ createReplicationManager(replicationLimit, deletionLimit);
+
+ final ReplicationManagerMetrics metrics = replicationManager.getMetrics();
+ final long replicationSkipped = metrics.getInflightReplicationSkipped();
+ final long deletionSkipped = metrics.getInflightDeletionSkipped();
+
+ testcase.call();
+
+ Assertions.assertEquals(replicationSkipped + expectedReplicationSkipped,
+ metrics.getInflightReplicationSkipped());
+ Assertions.assertEquals(deletionSkipped + expectedDeletionSkipped,
+ metrics.getInflightDeletionSkipped());
+
+ //reset limits for other tests.
+ createReplicationManager(0, 0);
}
/**
@@ -1450,7 +1512,7 @@ public class TestReplicationManager {
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assertions.assertEquals(currentDeleteCommandCount + 2,
replicationManager.getMetrics().getNumDeletionCmdsSent());
- Assertions.assertEquals(1, replicationManager.getInflightDeletion().size());
+ Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightDeletion());
// Get the DECOM and Maint replica and ensure none of them are scheduled
@@ -1880,6 +1942,10 @@ public class TestReplicationManager {
*/
@Test
public void testDeleteEmptyContainer() throws Exception {
+ runTestDeleteEmptyContainer(3);
+ }
+
+ Void runTestDeleteEmptyContainer(int expectedDelete) throws Exception {
// Create container with usedBytes = 1000 and keyCount = 0
final ContainerInfo container = createContainer(LifeCycleState.CLOSED, 1000,
0);
@@ -1888,7 +1954,14 @@ public class TestReplicationManager {
// Create a replica with usedBytes != 0 and keyCount = 0
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED, 100, 0);
- assertDeleteScheduled(3);
+ assertDeleteScheduled(expectedDelete);
+ return null;
+ }
+
+ @Test
+ public void testDeletionLimit() throws Exception {
+ runTestLimit(0, 2, 0, 1,
+ () -> runTestDeleteEmptyContainer(2));
}
/**
@@ -1994,7 +2067,7 @@ public class TestReplicationManager {
replicationManager.getMetrics().getNumReplicationCmdsSent());
}
- private void assertDeleteScheduled(int delta) throws InterruptedException {
+ private void assertDeleteScheduled(int delta) {
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java
index 50f4945fc4..9d70b190dc 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java
@@ -54,7 +54,13 @@ public class TestReplicationManagerMetrics {
report.increment(s);
}
}
+ final LegacyReplicationManager lrm = Mockito.mock(
+ LegacyReplicationManager.class);
+ Mockito.when(lrm.getInflightCount(Mockito.any(InflightType.class)))
+ .thenReturn(0);
replicationManager = Mockito.mock(ReplicationManager.class);
+ Mockito.when(replicationManager.getLegacyReplicationManager())
+ .thenReturn(lrm);
Mockito.when(replicationManager.getContainerReport()).thenReturn(report);
metrics = ReplicationManagerMetrics.create(replicationManager);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org