You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/06/16 16:06:30 UTC

[ozone] branch master updated: HDDS-6829. Limit the no of inflight replication tasks in SCM. (#3482)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 94945aed40 HDDS-6829. Limit the no of inflight replication tasks in SCM. (#3482)
94945aed40 is described below

commit 94945aed404216e17cd4560566100d5a90497fd8
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Jun 16 09:06:24 2022 -0700

    HDDS-6829. Limit the no of inflight replication tasks in SCM. (#3482)
---
 .../scm/container/replication/InflightType.java    |  23 ++
 .../replication/LegacyReplicationManager.java      | 314 ++++++++++++++++-----
 .../container/replication/ReplicationManager.java  |   8 -
 .../replication/ReplicationManagerMetrics.java     |  48 +++-
 .../{ => replication}/TestReplicationManager.java  | 151 +++++++---
 .../replication/TestReplicationManagerMetrics.java |   6 +
 6 files changed, 423 insertions(+), 127 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InflightType.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InflightType.java
new file mode 100644
index 0000000000..acd859f67a
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InflightType.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+enum InflightType {
+  REPLICATION,
+  DELETION
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index 47ac3d3541..37d01736b9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -81,10 +81,12 @@ import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -102,6 +104,110 @@ public class LegacyReplicationManager {
   public static final Logger LOG =
       LoggerFactory.getLogger(LegacyReplicationManager.class);
 
+  static class InflightMap {
+    private final Map<ContainerID, List<InflightAction>> map
+        = new ConcurrentHashMap<>();
+    private final InflightType type;
+    private final int sizeLimit;
+    private final AtomicInteger inflightCount = new AtomicInteger();
+
+    InflightMap(InflightType type, int sizeLimit) {
+      this.type = type;
+      this.sizeLimit = sizeLimit > 0 ? sizeLimit : Integer.MAX_VALUE;
+    }
+
+    boolean isReplication() {
+      return type == InflightType.REPLICATION;
+    }
+
+    private List<InflightAction> get(ContainerID id) {
+      return map.get(id);
+    }
+
+    boolean containsKey(ContainerID id) {
+      return map.containsKey(id);
+    }
+
+    int inflightActionCount(ContainerID id) {
+      return Optional.ofNullable(map.get(id)).map(List::size).orElse(0);
+    }
+
+    int containerCount() {
+      return map.size();
+    }
+
+    boolean isFull() {
+      return inflightCount.get() >= sizeLimit;
+    }
+
+    void clear() {
+      map.clear();
+    }
+
+    void iterate(ContainerID id, Predicate<InflightAction> processor) {
+      for (; ;) {
+        final List<InflightAction> actions = get(id);
+        if (actions == null) {
+          return;
+        }
+        synchronized (actions) {
+          if (get(id) != actions) {
+            continue; //actions is changed, retry
+          }
+          for (Iterator<InflightAction> i = actions.iterator(); i.hasNext();) {
+            final boolean remove = processor.test(i.next());
+            if (remove) {
+              i.remove();
+              inflightCount.decrementAndGet();
+            }
+          }
+          map.computeIfPresent(id,
+              (k, v) -> v == actions && v.isEmpty() ? null : v);
+          return;
+        }
+      }
+    }
+
+    boolean add(ContainerID id, InflightAction a) {
+      final int previous = inflightCount.getAndUpdate(
+          n -> n < sizeLimit ? n + 1 : n);
+      if (previous >= sizeLimit) {
+        return false;
+      }
+      for (; ;) {
+        final List<InflightAction> actions = map.computeIfAbsent(id,
+            key -> new LinkedList<>());
+        synchronized (actions) {
+          if (get(id) != actions) {
+            continue; //actions is changed, retry
+          }
+          final boolean added = actions.add(a);
+          if (!added) {
+            inflightCount.decrementAndGet();
+          }
+          return added;
+        }
+      }
+    }
+
+    List<DatanodeDetails> getDatanodeDetails(ContainerID id) {
+      for (; ;) {
+        final List<InflightAction> actions = get(id);
+        if (actions == null) {
+          return Collections.emptyList();
+        }
+        synchronized (actions) {
+          if (get(id) != actions) {
+            continue; //actions is changed, retry
+          }
+          return actions.stream()
+              .map(InflightAction::getDatanode)
+              .collect(Collectors.toList());
+        }
+      }
+    }
+  }
+
   /**
    * Reference to the ContainerManager.
    */
@@ -132,13 +238,13 @@ public class LegacyReplicationManager {
    * This is used for tracking container replication commands which are issued
    * by ReplicationManager and not yet complete.
    */
-  private final Map<ContainerID, List<InflightAction>> inflightReplication;
+  private final InflightMap inflightReplication;
 
   /**
    * This is used for tracking container deletion commands which are issued
    * by ReplicationManager and not yet complete.
    */
-  private final Map<ContainerID, List<InflightAction>> inflightDeletion;
+  private final InflightMap inflightDeletion;
 
 
   /**
@@ -252,8 +358,10 @@ public class LegacyReplicationManager {
     this.scmContext = scmContext;
     this.nodeManager = nodeManager;
     this.rmConf = conf.getObject(ReplicationManagerConfiguration.class);
-    this.inflightReplication = new ConcurrentHashMap<>();
-    this.inflightDeletion = new ConcurrentHashMap<>();
+    this.inflightReplication = new InflightMap(InflightType.REPLICATION,
+        rmConf.getContainerInflightReplicationLimit());
+    this.inflightDeletion = new InflightMap(InflightType.DELETION,
+        rmConf.getContainerInflightDeletionLimit());
     this.inflightMoveFuture = new ConcurrentHashMap<>();
     this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum();
     this.clock = clock;
@@ -427,8 +535,10 @@ public class LegacyReplicationManager {
                 container.containerID());
 
           }
-          handleUnderReplicatedContainer(container,
-              replicaSet, placementStatus);
+          if (!inflightReplication.isFull() || !inflightDeletion.isFull()) {
+            handleUnderReplicatedContainer(container,
+                replicaSet, placementStatus);
+          }
           return;
         }
 
@@ -485,47 +595,49 @@ public class LegacyReplicationManager {
    * @param completedCounter update completed metrics
    */
   private void updateInflightAction(final ContainerInfo container,
-      final Map<ContainerID, List<InflightAction>> inflightActions,
+      final InflightMap inflightActions,
       final Predicate<InflightAction> filter,
       final Runnable timeoutCounter,
       final Consumer<InflightAction> completedCounter) {
     final ContainerID id = container.containerID();
     final long deadline = clock.millis() - rmConf.getEventTimeout();
-    if (inflightActions.containsKey(id)) {
-      final List<InflightAction> actions = inflightActions.get(id);
-
-      Iterator<InflightAction> iter = actions.iterator();
-      while (iter.hasNext()) {
-        try {
-          InflightAction a = iter.next();
-          NodeStatus status = nodeManager.getNodeStatus(a.getDatanode());
-          boolean isUnhealthy = status.getHealth() != NodeState.HEALTHY;
-          boolean isCompleted = filter.test(a);
-          boolean isTimeout = a.getTime() < deadline;
-          boolean isNotInService = status.getOperationalState() !=
-              NodeOperationalState.IN_SERVICE;
-          if (isCompleted || isUnhealthy || isTimeout || isNotInService) {
-            iter.remove();
-
-            if (isTimeout) {
-              timeoutCounter.run();
-            } else if (isCompleted) {
-              completedCounter.accept(a);
-            }
+    inflightActions.iterate(id, a -> updateInflightAction(
+        container, a, filter, timeoutCounter, completedCounter,
+        deadline, inflightActions.isReplication()));
+  }
 
-            updateMoveIfNeeded(isUnhealthy, isCompleted, isTimeout,
-                isNotInService, container, a.getDatanode(), inflightActions);
-          }
-        } catch (NodeNotFoundException | ContainerNotFoundException e) {
-          // Should not happen, but if it does, just remove the action as the
-          // node somehow does not exist;
-          iter.remove();
+  private boolean updateInflightAction(final ContainerInfo container,
+      final InflightAction a,
+      final Predicate<InflightAction> filter,
+      final Runnable timeoutCounter,
+      final Consumer<InflightAction> completedCounter,
+      final long deadline,
+      final boolean isReplication) {
+    boolean remove = false;
+    try {
+      final NodeStatus status = nodeManager.getNodeStatus(a.getDatanode());
+      final boolean isUnhealthy = status.getHealth() != NodeState.HEALTHY;
+      final boolean isCompleted = filter.test(a);
+      final boolean isTimeout = a.getTime() < deadline;
+      final boolean isNotInService = status.getOperationalState() !=
+          NodeOperationalState.IN_SERVICE;
+      if (isCompleted || isUnhealthy || isTimeout || isNotInService) {
+        if (isTimeout) {
+          timeoutCounter.run();
+        } else if (isCompleted) {
+          completedCounter.accept(a);
         }
+
+        updateMoveIfNeeded(isUnhealthy, isCompleted, isTimeout,
+            isNotInService, container, a.getDatanode(), isReplication);
+        remove = true;
       }
-      if (actions.isEmpty()) {
-        inflightActions.remove(id);
-      }
+    } catch (NodeNotFoundException | ContainerNotFoundException e) {
+      // Should not happen, but if it does, just remove the action as the
+      // node somehow does not exist;
+      remove = true;
     }
+    return remove;
   }
 
   /**
@@ -536,14 +648,13 @@ public class LegacyReplicationManager {
    * @param isTimeout is the action timeout
    * @param container Container to update
    * @param dn datanode which is removed from the inflightActions
-   * @param inflightActions inflightReplication (or) inflightDeletion
+   * @param isInflightReplication is inflightReplication?
    */
   private void updateMoveIfNeeded(final boolean isUnhealthy,
                    final boolean isCompleted, final boolean isTimeout,
                    final boolean isNotInService,
                    final ContainerInfo container, final DatanodeDetails dn,
-                   final Map<ContainerID,
-                       List<InflightAction>> inflightActions)
+                   final boolean isInflightReplication)
       throws ContainerNotFoundException {
     // make sure inflightMove contains the container
     ContainerID id = container.containerID();
@@ -559,8 +670,6 @@ public class LegacyReplicationManager {
     if (!isSource && !isTarget) {
       return;
     }
-    final boolean isInflightReplication =
-        inflightActions.equals(inflightReplication);
 
     /*
      * there are some case:
@@ -806,7 +915,7 @@ public class LegacyReplicationManager {
    * @return The number of inflight additions or zero if none
    */
   private int getInflightAdd(final ContainerID id) {
-    return inflightReplication.getOrDefault(id, Collections.emptyList()).size();
+    return inflightReplication.inflightActionCount(id);
   }
 
   /**
@@ -816,7 +925,7 @@ public class LegacyReplicationManager {
    * @return The number of inflight deletes or zero if none
    */
   private int getInflightDel(final ContainerID id) {
-    return inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
+    return inflightDeletion.inflightActionCount(id);
   }
 
   /**
@@ -953,11 +1062,8 @@ public class LegacyReplicationManager {
       LOG.debug("Container {} state changes to DELETED", container);
     } else {
       // Check whether to resend the delete replica command
-      final List<DatanodeDetails> deletionInFlight = inflightDeletion
-          .getOrDefault(container.containerID(), Collections.emptyList())
-          .stream()
-          .map(action -> action.getDatanode())
-          .collect(Collectors.toList());
+      final List<DatanodeDetails> deletionInFlight
+          = inflightDeletion.getDatanodeDetails(container.containerID());
       Set<ContainerReplica> filteredReplicas = replicas.stream().filter(
           r -> !deletionInFlight.contains(r.getDatanodeDetails()))
           .collect(Collectors.toSet());
@@ -1032,16 +1138,10 @@ public class LegacyReplicationManager {
       }
       int repDelta = replicaSet.additionalReplicaNeeded();
       final ContainerID id = container.containerID();
-      final List<DatanodeDetails> deletionInFlight = inflightDeletion
-          .getOrDefault(id, Collections.emptyList())
-          .stream()
-          .map(action -> action.getDatanode())
-          .collect(Collectors.toList());
-      final List<DatanodeDetails> replicationInFlight = inflightReplication
-          .getOrDefault(id, Collections.emptyList())
-          .stream()
-          .map(action -> action.getDatanode())
-          .collect(Collectors.toList());
+      final List<DatanodeDetails> deletionInFlight
+          = inflightDeletion.getDatanodeDetails(id);
+      final List<DatanodeDetails> replicationInFlight
+          = inflightReplication.getDatanodeDetails(id);
       final List<DatanodeDetails> source = replicas.stream()
           .filter(r ->
               r.getState() == State.QUASI_CLOSED ||
@@ -1431,6 +1531,15 @@ public class LegacyReplicationManager {
     return ""; // unit test
   }
 
+  private boolean addInflight(InflightType type, ContainerID id,
+      InflightAction action) {
+    final boolean added = getInflightMap(type).add(id, action);
+    if (!added) {
+      metrics.incrInflightSkipped(type);
+    }
+    return added;
+  }
+
   /**
    * Sends replicate container command for the given container to the given
    * datanode.
@@ -1450,12 +1559,13 @@ public class LegacyReplicationManager {
     final ContainerID id = container.containerID();
     final ReplicateContainerCommand replicateCommand =
         new ReplicateContainerCommand(id.getId(), sources);
-    inflightReplication.computeIfAbsent(id, k -> new ArrayList<>());
-    sendAndTrackDatanodeCommand(datanode, replicateCommand,
-        action -> inflightReplication.get(id).add(action));
+    final boolean sent = sendAndTrackDatanodeCommand(datanode, replicateCommand,
+        action -> addInflight(InflightType.REPLICATION, id, action));
 
-    metrics.incrNumReplicationCmdsSent();
-    metrics.incrNumReplicationBytesTotal(container.getUsedBytes());
+    if (sent) {
+      metrics.incrNumReplicationCmdsSent();
+      metrics.incrNumReplicationBytesTotal(container.getUsedBytes());
+    }
   }
 
   /**
@@ -1476,12 +1586,13 @@ public class LegacyReplicationManager {
     final ContainerID id = container.containerID();
     final DeleteContainerCommand deleteCommand =
         new DeleteContainerCommand(id.getId(), force);
-    inflightDeletion.computeIfAbsent(id, k -> new ArrayList<>());
-    sendAndTrackDatanodeCommand(datanode, deleteCommand,
-        action -> inflightDeletion.get(id).add(action));
+    final boolean sent = sendAndTrackDatanodeCommand(datanode, deleteCommand,
+        action -> addInflight(InflightType.DELETION, id, action));
 
-    metrics.incrNumDeletionCmdsSent();
-    metrics.incrNumDeletionBytesTotal(container.getUsedBytes());
+    if (sent) {
+      metrics.incrNumDeletionCmdsSent();
+      metrics.incrNumDeletionBytesTotal(container.getUsedBytes());
+    }
   }
 
   /**
@@ -1495,21 +1606,26 @@ public class LegacyReplicationManager {
    * @param tracker Tracker which tracks the inflight actions
    * @param <T> Type of SCMCommand
    */
-  private <T extends GeneratedMessage> void sendAndTrackDatanodeCommand(
+  private <T extends GeneratedMessage> boolean sendAndTrackDatanodeCommand(
       final DatanodeDetails datanode,
       final SCMCommand<T> command,
-      final Consumer<InflightAction> tracker) {
+      final Predicate<InflightAction> tracker) {
     try {
       command.setTerm(scmContext.getTermOfLeader());
     } catch (NotLeaderException nle) {
       LOG.warn("Skip sending datanode command,"
           + " since current SCM is not leader.", nle);
-      return;
+      return false;
+    }
+    final boolean allowed = tracker.test(
+        new InflightAction(datanode, clock.millis()));
+    if (!allowed) {
+      return false;
     }
     final CommandForDatanode<T> datanodeCommand =
         new CommandForDatanode<>(datanode.getUuid(), command);
     eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
-    tracker.accept(new InflightAction(datanode, clock.millis()));
+    return true;
   }
 
   /**
@@ -1626,10 +1742,44 @@ public class LegacyReplicationManager {
             " entering maintenance state until a new replica is created.")
     private int maintenanceReplicaMinimum = 2;
 
+    @Config(key = "container.inflight.replication.limit",
+        type = ConfigType.INT,
+        defaultValue = "0", // 0 means unlimited.
+        tags = {SCM, OZONE},
+        description = "This property is used to limit" +
+            " the maximum number of inflight replication."
+    )
+    private int containerInflightReplicationLimit = 0;
+
+    @Config(key = "container.inflight.deletion.limit",
+        type = ConfigType.INT,
+        defaultValue = "0", // 0 means unlimited.
+        tags = {SCM, OZONE},
+        description = "This property is used to limit" +
+            " the maximum number of inflight deletion."
+    )
+    private int containerInflightDeletionLimit = 0;
+
+    public void setContainerInflightReplicationLimit(int replicationLimit) {
+      this.containerInflightReplicationLimit = replicationLimit;
+    }
+
+    public void setContainerInflightDeletionLimit(int deletionLimit) {
+      this.containerInflightDeletionLimit = deletionLimit;
+    }
+
     public void setMaintenanceReplicaMinimum(int replicaCount) {
       this.maintenanceReplicaMinimum = replicaCount;
     }
 
+    public int getContainerInflightReplicationLimit() {
+      return containerInflightReplicationLimit;
+    }
+
+    public int getContainerInflightDeletionLimit() {
+      return containerInflightDeletionLimit;
+    }
+
     public long getInterval() {
       return interval;
     }
@@ -1649,12 +1799,20 @@ public class LegacyReplicationManager {
     onLeaderReadyAndOutOfSafeMode();
   }
 
-  public Map<ContainerID, List<InflightAction>> getInflightReplication() {
-    return inflightReplication;
+  private InflightMap getInflightMap(InflightType type) {
+    switch (type) {
+    case REPLICATION: return inflightReplication;
+    case DELETION: return inflightDeletion;
+    default: throw new IllegalStateException("Unexpected type " + type);
+    }
+  }
+
+  int getInflightCount(InflightType type) {
+    return getInflightMap(type).containerCount();
   }
 
-  public Map<ContainerID, List<InflightAction>> getInflightDeletion() {
-    return inflightDeletion;
+  DatanodeDetails getFirstDatanode(InflightType type, ContainerID id) {
+    return getInflightMap(type).get(id).get(0).getDatanode();
   }
 
   public Map<ContainerID, CompletableFuture<MoveResult>> getInflightMove() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 7ae7d64478..d0811ce2e2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -422,14 +422,6 @@ public class ReplicationManager implements SCMService {
     return legacyReplicationManager.move(cid, src, tgt);
   }
 
-  public Map<ContainerID, List<InflightAction>>  getInflightReplication() {
-    return legacyReplicationManager.getInflightReplication();
-  }
-
-  public Map<ContainerID, List<InflightAction>> getInflightDeletion() {
-    return legacyReplicationManager.getInflightDeletion();
-  }
-
   public Map<ContainerID,
       CompletableFuture<LegacyReplicationManager.MoveResult>>
       getInflightMove() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java
index ef17b587d7..42ba4a0cdf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java
@@ -52,10 +52,21 @@ public final class ReplicationManagerMetrics implements MetricsSource {
       "InflightReplication",
       "Tracked inflight container replication requests.");
 
+  private static final MetricsInfo INFLIGHT_REPLICATION_SKIPPED = Interns.info(
+      "InflightReplicationSkipped",
+      "Tracked inflight container replication requests skipped" +
+          " due to the configured limit.");
+
   private static final MetricsInfo INFLIGHT_DELETION = Interns.info(
       "InflightDeletion",
       "Tracked inflight container deletion requests.");
 
+  private static final MetricsInfo INFLIGHT_DELETION_SKIPPED = Interns.info(
+      "InflightDeletionSkipped",
+      "Tracked inflight container deletion requests skipped" +
+          " due to the configured limit.");
+
+
   private static final MetricsInfo INFLIGHT_MOVE = Interns.info(
       "InflightMove",
       "Tracked inflight container move requests.");
@@ -118,6 +129,14 @@ public final class ReplicationManagerMetrics implements MetricsSource {
   @Metric("Time elapsed for deletion")
   private MutableRate deletionTime;
 
+  @Metric("Number of inflight replication skipped" +
+      " due to the configured limit.")
+  private MutableCounterLong numInflightReplicationSkipped;
+
+  @Metric("Number of inflight replication skipped" +
+      " due to the configured limit.")
+  private MutableCounterLong numInflightDeletionSkipped;
+
   private MetricsRegistry registry;
 
   private ReplicationManager replicationManager;
@@ -138,7 +157,9 @@ public final class ReplicationManagerMetrics implements MetricsSource {
   public void getMetrics(MetricsCollector collector, boolean all) {
     MetricsRecordBuilder builder = collector.addRecord(METRICS_SOURCE_NAME)
         .addGauge(INFLIGHT_REPLICATION, getInflightReplication())
+        .addGauge(INFLIGHT_REPLICATION_SKIPPED, getInflightReplicationSkipped())
         .addGauge(INFLIGHT_DELETION, getInflightDeletion())
+        .addGauge(INFLIGHT_DELETION_SKIPPED, getInflightDeletionSkipped())
         .addGauge(INFLIGHT_MOVE, getInflightMove());
 
     ReplicationManagerReport report = replicationManager.getContainerReport();
@@ -217,12 +238,35 @@ public final class ReplicationManagerMetrics implements MetricsSource {
     this.deletionTime.add(millis);
   }
 
+  public void incrInflightSkipped(InflightType type) {
+    switch (type) {
+    case REPLICATION:
+      this.numInflightReplicationSkipped.incr();
+      return;
+    case DELETION:
+      this.numInflightDeletionSkipped.incr();
+      return;
+    default:
+      throw new IllegalArgumentException("Unexpected type " + type);
+    }
+  }
+
   public long getInflightReplication() {
-    return replicationManager.getInflightReplication().size();
+    return replicationManager.getLegacyReplicationManager()
+        .getInflightCount(InflightType.REPLICATION);
+  }
+
+  public long getInflightReplicationSkipped() {
+    return this.numInflightReplicationSkipped.value();
   }
 
   public long getInflightDeletion() {
-    return replicationManager.getInflightDeletion().size();
+    return replicationManager.getLegacyReplicationManager()
+        .getInflightCount(InflightType.DELETION);
+  }
+
+  public long getInflightDeletionSkipped() {
+    return this.numInflightDeletionSkipped.value();
   }
 
   public long getInflightMove() {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
similarity index 94%
rename from hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
rename to hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 0cc5feb634..38ece4992d 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hdds.scm.container;
+package org.apache.hadoop.hdds.scm.container.replication;
 
 import com.google.common.primitives.Longs;
 import org.apache.commons.io.FileUtils;
@@ -31,9 +31,17 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
-import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
+import org.apache.hadoop.hdds.scm.container.ContainerStateManagerImpl;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.SimpleMockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
     .ReplicationManagerConfiguration;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
@@ -77,6 +85,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -122,6 +131,11 @@ public class TestReplicationManager {
   private SCMHAManager scmhaManager;
   private ContainerReplicaPendingOps containerReplicaPendingOps;
 
+  int getInflightCount(InflightType type) {
+    return replicationManager.getLegacyReplicationManager()
+        .getInflightCount(type);
+  }
+
   @BeforeEach
   public void setup()
       throws IOException, InterruptedException,
@@ -201,8 +215,31 @@ public class TestReplicationManager {
     createReplicationManager(new ReplicationManagerConfiguration());
   }
 
+  void createReplicationManager(int replicationLimit, int deletionLimit)
+      throws Exception {
+    replicationManager.stop();
+    dbStore.close();
+    final LegacyReplicationManager.ReplicationManagerConfiguration conf
+        = new LegacyReplicationManager.ReplicationManagerConfiguration();
+    conf.setContainerInflightReplicationLimit(replicationLimit);
+    conf.setContainerInflightDeletionLimit(deletionLimit);
+    createReplicationManager(conf);
+  }
+
+  void createReplicationManager(
+      LegacyReplicationManager.ReplicationManagerConfiguration conf)
+      throws Exception {
+    createReplicationManager(null, conf);
+  }
+
   private void createReplicationManager(ReplicationManagerConfiguration rmConf)
       throws InterruptedException, IOException {
+    createReplicationManager(rmConf, null);
+  }
+
+  void createReplicationManager(ReplicationManagerConfiguration rmConf,
+      LegacyReplicationManager.ReplicationManagerConfiguration lrmConf)
+      throws InterruptedException, IOException {
     OzoneConfiguration config = new OzoneConfiguration();
     testDir = GenericTestUtils
       .getTestDir(TestContainerManagerImpl.class.getSimpleName());
@@ -211,7 +248,8 @@ public class TestReplicationManager {
     config.setTimeDuration(
         HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
         0, TimeUnit.SECONDS);
-    config.setFromObject(rmConf);
+    Optional.ofNullable(rmConf).ifPresent(config::setFromObject);
+    Optional.ofNullable(lrmConf).ifPresent(config::setFromObject);
 
     SCMHAManager scmHAManager = SCMHAManagerStub
         .getInstance(true, new SCMDBTransactionBufferImpl());
@@ -474,8 +512,7 @@ public class TestReplicationManager {
         replicationManager.getMetrics().getNumReplicationCmdsSent());
     Assertions.assertEquals(currentBytesToReplicate + 100L,
         replicationManager.getMetrics().getNumReplicationBytesTotal());
-    Assertions.assertEquals(1,
-        replicationManager.getInflightReplication().size());
+    Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
     Assertions.assertEquals(1, replicationManager.getMetrics()
         .getInflightReplication());
 
@@ -488,8 +525,8 @@ public class TestReplicationManager {
         ReplicationManagerReport.HealthState.UNDER_REPLICATED));
 
     // Now we add the missing replica back
-    DatanodeDetails targetDn = replicationManager.getInflightReplication()
-        .get(id).get(0).getDatanode();
+    DatanodeDetails targetDn = replicationManager.getLegacyReplicationManager()
+        .getFirstDatanode(InflightType.REPLICATION, id);
     final ContainerReplica replicatedReplicaOne = getReplicas(
         id, State.CLOSED, 1000L, originNodeId, targetDn);
     containerStateManager.updateContainerReplica(
@@ -503,8 +540,7 @@ public class TestReplicationManager {
     replicationManager.processAll();
     eventQueue.processAll(1000);
 
-    Assertions.assertEquals(0,
-        replicationManager.getInflightReplication().size());
+    Assertions.assertEquals(0, getInflightCount(InflightType.REPLICATION));
     Assertions.assertEquals(0, replicationManager.getMetrics()
         .getInflightReplication());
     Assertions.assertEquals(currentReplicationCommandCompleted + 1,
@@ -556,7 +592,7 @@ public class TestReplicationManager {
             .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
     Assertions.assertEquals(currentDeleteCommandCount + 1,
         replicationManager.getMetrics().getNumDeletionCmdsSent());
-    Assertions.assertEquals(1, replicationManager.getInflightDeletion().size());
+    Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
     Assertions.assertEquals(1, replicationManager.getMetrics()
         .getInflightDeletion());
 
@@ -568,8 +604,8 @@ public class TestReplicationManager {
         ReplicationManagerReport.HealthState.OVER_REPLICATED));
 
     // Now we remove the replica according to inflight
-    DatanodeDetails targetDn = replicationManager.getInflightDeletion()
-        .get(id).get(0).getDatanode();
+    DatanodeDetails targetDn = replicationManager.getLegacyReplicationManager()
+        .getFirstDatanode(InflightType.DELETION, id);
     if (targetDn.equals(replicaOne.getDatanodeDetails())) {
       containerStateManager.removeContainerReplica(
           id, replicaOne);
@@ -591,7 +627,7 @@ public class TestReplicationManager {
 
     replicationManager.processAll();
     eventQueue.processAll(1000);
-    Assertions.assertEquals(0, replicationManager.getInflightDeletion().size());
+    Assertions.assertEquals(0, getInflightCount(InflightType.DELETION));
     Assertions.assertEquals(0, replicationManager.getMetrics()
         .getInflightDeletion());
     Assertions.assertEquals(currentDeleteCommandCompleted + 1,
@@ -648,7 +684,7 @@ public class TestReplicationManager {
         replicaOne.getDatanodeDetails()));
     Assertions.assertEquals(currentDeleteCommandCount + 1,
         replicationManager.getMetrics().getNumDeletionCmdsSent());
-    Assertions.assertEquals(1, replicationManager.getInflightDeletion().size());
+    Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
     Assertions.assertEquals(1, replicationManager.getMetrics()
         .getInflightDeletion());
 
@@ -669,7 +705,7 @@ public class TestReplicationManager {
 
     Assertions.assertEquals(currentDeleteCommandCompleted + 1,
         replicationManager.getMetrics().getNumDeletionCmdsCompleted());
-    Assertions.assertEquals(0, replicationManager.getInflightDeletion().size());
+    Assertions.assertEquals(0, getInflightCount(InflightType.DELETION));
     Assertions.assertEquals(0, replicationManager.getMetrics()
         .getInflightDeletion());
 
@@ -714,8 +750,7 @@ public class TestReplicationManager {
         replicationManager.getMetrics().getNumReplicationCmdsSent());
     Assertions.assertEquals(currentBytesToReplicate + 100,
         replicationManager.getMetrics().getNumReplicationBytesTotal());
-    Assertions.assertEquals(1,
-        replicationManager.getInflightReplication().size());
+    Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
     Assertions.assertEquals(1, replicationManager.getMetrics()
         .getInflightReplication());
 
@@ -732,8 +767,8 @@ public class TestReplicationManager {
         .getMetrics().getNumReplicationBytesCompleted();
 
     // Now we add the replicated new replica
-    DatanodeDetails targetDn = replicationManager.getInflightReplication()
-        .get(id).get(0).getDatanode();
+    DatanodeDetails targetDn = replicationManager.getLegacyReplicationManager()
+        .getFirstDatanode(InflightType.REPLICATION, id);
     final ContainerReplica replicatedReplicaThree = getReplicas(
         id, State.CLOSED, 1000L, originNodeId, targetDn);
     containerStateManager.updateContainerReplica(
@@ -746,8 +781,7 @@ public class TestReplicationManager {
         replicationManager.getMetrics().getNumReplicationCmdsCompleted());
     Assertions.assertEquals(currentReplicateBytesCompleted + 100,
         replicationManager.getMetrics().getNumReplicationBytesCompleted());
-    Assertions.assertEquals(0,
-        replicationManager.getInflightReplication().size());
+    Assertions.assertEquals(0, getInflightCount(InflightType.REPLICATION));
     Assertions.assertEquals(0, replicationManager.getMetrics()
         .getInflightReplication());
 
@@ -849,8 +883,7 @@ public class TestReplicationManager {
         replicationManager.getMetrics().getNumDeletionCmdsSent());
     Assertions.assertEquals(currentBytesToDelete + 99,
         replicationManager.getMetrics().getNumDeletionBytesTotal());
-    Assertions.assertEquals(1,
-        replicationManager.getInflightDeletion().size());
+    Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
     Assertions.assertEquals(1, replicationManager.getMetrics()
         .getInflightDeletion());
 
@@ -876,7 +909,7 @@ public class TestReplicationManager {
     replicationManager.processAll();
     eventQueue.processAll(1000);
 
-    Assertions.assertEquals(0, replicationManager.getInflightDeletion().size());
+    Assertions.assertEquals(0, getInflightCount(InflightType.DELETION));
     Assertions.assertEquals(0, replicationManager.getMetrics()
         .getInflightDeletion());
     Assertions.assertEquals(currentDeleteCommandCompleted + 1,
@@ -887,8 +920,7 @@ public class TestReplicationManager {
             SCMCommandProto.Type.replicateContainerCommand));
     Assertions.assertEquals(currentReplicateCommandCount + 2,
         replicationManager.getMetrics().getNumReplicationCmdsSent());
-    Assertions.assertEquals(1,
-        replicationManager.getInflightReplication().size());
+    Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
     Assertions.assertEquals(1, replicationManager.getMetrics()
         .getInflightReplication());
 
@@ -1082,8 +1114,7 @@ public class TestReplicationManager {
         replicationManager.getMetrics().getNumReplicationCmdsSent());
     Assertions.assertEquals(currentBytesToReplicate + 100,
         replicationManager.getMetrics().getNumReplicationBytesTotal());
-    Assertions.assertEquals(1,
-        replicationManager.getInflightReplication().size());
+    Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
     Assertions.assertEquals(1, replicationManager.getMetrics()
         .getInflightReplication());
 
@@ -1114,8 +1145,7 @@ public class TestReplicationManager {
         .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand));
     Assertions.assertEquals(currentReplicateCommandCount,
         replicationManager.getMetrics().getNumReplicationCmdsSent());
-    Assertions.assertEquals(1,
-        replicationManager.getInflightReplication().size());
+    Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
     Assertions.assertEquals(1, replicationManager.getMetrics()
         .getInflightReplication());
   }
@@ -1170,7 +1200,7 @@ public class TestReplicationManager {
     Assertions.assertTrue(datanodeCommandHandler.received(
         SCMCommandProto.Type.deleteContainerCommand,
         replicaFive.getDatanodeDetails()));
-    Assertions.assertEquals(1, replicationManager.getInflightDeletion().size());
+    Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
     Assertions.assertEquals(1, replicationManager.getMetrics()
         .getInflightDeletion());
     assertOverReplicatedCount(1);
@@ -1213,7 +1243,7 @@ public class TestReplicationManager {
             .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
     Assertions.assertEquals(currentDeleteCommandCount + 1,
         replicationManager.getMetrics().getNumDeletionCmdsSent());
-    Assertions.assertEquals(1, replicationManager.getInflightDeletion().size());
+    Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
     Assertions.assertEquals(1, replicationManager.getMetrics()
         .getInflightDeletion());
 
@@ -1261,7 +1291,7 @@ public class TestReplicationManager {
             .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
     Assertions.assertEquals(currentDeleteCommandCount + 2,
         replicationManager.getMetrics().getNumDeletionCmdsSent());
-    Assertions.assertEquals(1, replicationManager.getInflightDeletion().size());
+    Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
     Assertions.assertEquals(1, replicationManager.getMetrics()
         .getInflightDeletion());
   }
@@ -1286,12 +1316,44 @@ public class TestReplicationManager {
    */
   @Test
   public void testUnderReplicatedDueToAllDecommission() throws IOException {
+    runTestUnderReplicatedDueToAllDecommission(3);
+  }
+
+  Void runTestUnderReplicatedDueToAllDecommission(int expectedReplication)
+      throws IOException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
     addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
     addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
-    assertReplicaScheduled(3);
+    assertReplicaScheduled(expectedReplication);
     assertUnderReplicatedCount(1);
+    return null;
+  }
+
+  @Test
+  public void testReplicationLimit() throws Exception {
+    runTestLimit(1, 0, 2, 0,
+        () -> runTestUnderReplicatedDueToAllDecommission(1));
+  }
+
+  void runTestLimit(int replicationLimit, int deletionLimit,
+      int expectedReplicationSkipped, int expectedDeletionSkipped,
+      Callable<Void> testcase) throws Exception {
+    createReplicationManager(replicationLimit, deletionLimit);
+
+    final ReplicationManagerMetrics metrics = replicationManager.getMetrics();
+    final long replicationSkipped = metrics.getInflightReplicationSkipped();
+    final long deletionSkipped = metrics.getInflightDeletionSkipped();
+
+    testcase.call();
+
+    Assertions.assertEquals(replicationSkipped + expectedReplicationSkipped,
+        metrics.getInflightReplicationSkipped());
+    Assertions.assertEquals(deletionSkipped + expectedDeletionSkipped,
+        metrics.getInflightDeletionSkipped());
+
+    //reset limits for other tests.
+    createReplicationManager(0, 0);
   }
 
   /**
@@ -1450,7 +1512,7 @@ public class TestReplicationManager {
             .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
     Assertions.assertEquals(currentDeleteCommandCount + 2,
         replicationManager.getMetrics().getNumDeletionCmdsSent());
-    Assertions.assertEquals(1, replicationManager.getInflightDeletion().size());
+    Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
     Assertions.assertEquals(1, replicationManager.getMetrics()
         .getInflightDeletion());
     // Get the DECOM and Maint replica and ensure none of them are scheduled
@@ -1880,6 +1942,10 @@ public class TestReplicationManager {
    */
   @Test
   public void testDeleteEmptyContainer() throws Exception {
+    runTestDeleteEmptyContainer(3);
+  }
+
+  Void runTestDeleteEmptyContainer(int expectedDelete) throws Exception {
     // Create container with usedBytes = 1000 and keyCount = 0
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED, 1000,
         0);
@@ -1888,7 +1954,14 @@ public class TestReplicationManager {
     // Create a replica with usedBytes != 0 and keyCount = 0
     addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED, 100, 0);
 
-    assertDeleteScheduled(3);
+    assertDeleteScheduled(expectedDelete);
+    return null;
+  }
+
+  @Test
+  public void testDeletionLimit() throws Exception {
+    runTestLimit(0, 2, 0, 1,
+        () -> runTestDeleteEmptyContainer(2));
   }
 
   /**
@@ -1994,7 +2067,7 @@ public class TestReplicationManager {
         replicationManager.getMetrics().getNumReplicationCmdsSent());
   }
 
-  private void assertDeleteScheduled(int delta) throws InterruptedException {
+  private void assertDeleteScheduled(int delta) {
     final int currentDeleteCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java
index 50f4945fc4..9d70b190dc 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java
@@ -54,7 +54,13 @@ public class TestReplicationManagerMetrics {
         report.increment(s);
       }
     }
+    final LegacyReplicationManager lrm = Mockito.mock(
+        LegacyReplicationManager.class);
+    Mockito.when(lrm.getInflightCount(Mockito.any(InflightType.class)))
+        .thenReturn(0);
     replicationManager = Mockito.mock(ReplicationManager.class);
+    Mockito.when(replicationManager.getLegacyReplicationManager())
+        .thenReturn(lrm);
     Mockito.when(replicationManager.getContainerReport()).thenReturn(report);
     metrics = ReplicationManagerMetrics.create(replicationManager);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org