You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2023/04/01 16:14:25 UTC

[ozone] branch master updated: HDDS-8334. ReplicationManager: Add nodes to exclude list if they are overloaded (#4510)

This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ecd87cd7a HDDS-8334. ReplicationManager: Add nodes to exclude list if they are overloaded (#4510)
3ecd87cd7a is described below

commit 3ecd87cd7a8ca8e645c7552e0c15d63848fb54e2
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Sat Apr 1 17:14:18 2023 +0100

    HDDS-8334. ReplicationManager: Add nodes to exclude list if they are overloaded (#4510)
---
 .../container/replication/ReplicationManager.java  | 102 ++++++++++++++++-----
 .../replication/TestReplicationManager.java        |  86 +++++++++++++++--
 2 files changed, 155 insertions(+), 33 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 23b226d978..62851a325b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -79,6 +79,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
@@ -147,6 +148,16 @@ public class ReplicationManager implements SCMService {
    */
   private LegacyReplicationManager legacyReplicationManager;
 
+  /**
+   * Set of nodes which have been excluded for replication commands due to the
+   * number of commands queued on a datanode. This can be used when generating
+   * reconstruction commands to avoid nodes which are already overloaded. When
+   * the datanode heartbeat is received, the node is removed from this set if
+   * the command count has dropped below the limit.
+   */
+  private final Map<DatanodeDetails, Integer> excludedNodes =
+      new ConcurrentHashMap<>();
+
   /**
    * SCMService related variables.
    * After leaving safe mode, replicationMonitor needs to wait for a while
@@ -526,13 +537,13 @@ public class ReplicationManager implements SCMService {
           "available for replication of container " + containerID + " to " +
           target);
     }
-    // Put the least loaded source first
-    sourceWithCmds.sort(Comparator.comparingInt(Pair::getLeft));
+    DatanodeDetails source = selectAndOptionallyExcludeDatanode(
+        1, sourceWithCmds);
 
     ReplicateContainerCommand cmd =
         ReplicateContainerCommand.toTarget(containerID, target);
     cmd.setReplicaIndex(replicaIndex);
-    sendDatanodeCommand(cmd, containerInfo, sourceWithCmds.get(0).getRight());
+    sendDatanodeCommand(cmd, containerInfo, source);
   }
 
   public void sendThrottledReconstructionCommand(ContainerInfo containerInfo,
@@ -545,10 +556,24 @@ public class ReplicationManager implements SCMService {
       throw new CommandTargetOverloadedException("No target with capacity " +
           "available for reconstruction of " + containerInfo.getContainerID());
     }
-    // Put the least loaded target first
-    targetWithCmds.sort(Comparator.comparingInt(Pair::getLeft));
-    sendDatanodeCommand(command, containerInfo,
-        targetWithCmds.get(0).getRight());
+    DatanodeDetails target = selectAndOptionallyExcludeDatanode(
+        reconstructionCommandWeight, targetWithCmds);
+    sendDatanodeCommand(command, containerInfo, target);
+  }
+
+  private DatanodeDetails selectAndOptionallyExcludeDatanode(
+      int additionalCmdCount, List<Pair<Integer, DatanodeDetails>> datanodes) {
+    if (datanodes.isEmpty()) {
+      return null;
+    }
+    // Put the least loaded datanode first
+    datanodes.sort(Comparator.comparingInt(Pair::getLeft));
+    DatanodeDetails datanode = datanodes.get(0).getRight();
+    int currentCount = datanodes.get(0).getLeft();
+    if (currentCount + additionalCmdCount >= datanodeReplicationLimit) {
+      addExcludedNode(datanode);
+    }
+    return datanode;
   }
 
   /**
@@ -567,18 +592,12 @@ public class ReplicationManager implements SCMService {
         = new ArrayList<>();
     for (DatanodeDetails dn : datanodes) {
       try {
-        Map<Type, Integer> counts = nodeManager.getTotalDatanodeCommandCounts(
-            dn, Type.replicateContainerCommand,
-            Type.reconstructECContainersCommand);
-        int replicateCount = counts.get(Type.replicateContainerCommand);
-        int reconstructCount = counts.get(Type.reconstructECContainersCommand);
-        int totalCount = replicateCount
-            + reconstructCount * reconstructionCommandWeight;
+        int totalCount = getDatanodeQueuedReplicationCount(dn);
         if (totalCount >= datanodeReplicationLimit) {
           LOG.debug("Datanode {} has reached the maximum number of queued " +
-              "commands, replication: {}, reconstruction: {} * {})",
-              dn, replicateCount, reconstructCount,
-              reconstructionCommandWeight);
+              "commands, replication + reconstruction * {}: {})",
+              dn, reconstructionCommandWeight, totalCount);
+          addExcludedNode(dn);
           continue;
         }
         datanodeWithCommandCount.add(Pair.of(totalCount, dn));
@@ -590,6 +609,16 @@ public class ReplicationManager implements SCMService {
     return datanodeWithCommandCount;
   }
 
+  private int getDatanodeQueuedReplicationCount(DatanodeDetails datanode)
+      throws NodeNotFoundException {
+    Map<Type, Integer> counts = nodeManager.getTotalDatanodeCommandCounts(
+        datanode, Type.replicateContainerCommand,
+        Type.reconstructECContainersCommand);
+    int replicateCount = counts.get(Type.replicateContainerCommand);
+    int reconstructCount = counts.get(Type.reconstructECContainersCommand);
+    return replicateCount + reconstructCount * reconstructionCommandWeight;
+  }
+
   /**
    * Send a push replication command to the given source datanode, instructing
    * it to copy the given container to the target. The command is sent as a low
@@ -804,15 +833,40 @@ public class ReplicationManager implements SCMService {
    * Notify ReplicationManager that the command counts on a datanode have been
    * updated via a heartbeat received. This will allow RM to consider the node
    * for container operations if it was previously excluded due to load.
-   * @param datanodeDetails The datanode for which the commands have been
-   *                        updated.
+   * @param datanode The datanode for which the commands have been updated.
    */
-  public void datanodeCommandCountUpdated(DatanodeDetails datanodeDetails) {
-    // For now this is a NOOP, as the plan is to use this notification in a
-    // future change to limit the number of commands scheduled against a DN by
-    // RM.
+  public void datanodeCommandCountUpdated(DatanodeDetails datanode) {
     LOG.debug("Received a notification that the DN command count " +
-        "has been updated for {}", datanodeDetails);
+        "has been updated for {}", datanode);
+    // If there is an existing mapping, we may need to remove it
+    excludedNodes.computeIfPresent(datanode, (k, v) -> {
+      try {
+        if (getDatanodeQueuedReplicationCount(datanode)
+            < datanodeReplicationLimit) {
+          // Returning null removes the entry from the map
+          return null;
+        } else {
+          return 1;
+        }
+      } catch (NodeNotFoundException e) {
+        LOG.warn("Unable to find datanode {} in nodeManager. " +
+            "Should not happen.", datanode);
+        return null;
+      }
+    });
+  }
+
+  /**
+   * Returns the list of datanodes that are currently excluded from being
+   * targets for container replication due to queued commands.
+   * @return Set of excluded DatanodeDetails.
+   */
+  public Set<DatanodeDetails> getExcludedNodes() {
+    return excludedNodes.keySet();
+  }
+
+  private void addExcludedNode(DatanodeDetails dn) {
+    excludedNodes.put(dn, 1);
   }
 
   protected void processContainer(ContainerInfo containerInfo,
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index b48532d0fe..0e6ab3c17a 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -59,6 +59,7 @@ import java.io.IOException;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -957,7 +958,7 @@ public class TestReplicationManager {
         repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
 
     ReconstructECContainersCommand command = createReconstructionCommand(
-        container, new ArrayList<>(targetNodes.keySet()));
+        container, targetNodes.keySet().toArray(new DatanodeDetails[0]));
 
     replicationManager.sendThrottledReconstructionCommand(container, command);
 
@@ -980,10 +981,6 @@ public class TestReplicationManager {
     int reconstructionCount = 2;
     int replicationCount = limit - reconstructionCount * reconstructionWeight;
 
-    List<DatanodeDetails> targets = new ArrayList<>();
-    targets.add(MockDatanodeDetails.randomDatanodeDetails());
-    targets.add(MockDatanodeDetails.randomDatanodeDetails());
-
     Mockito.when(nodeManager.getTotalDatanodeCommandCounts(any(),
             eq(SCMCommandProto.Type.replicateContainerCommand),
             eq(SCMCommandProto.Type.reconstructECContainersCommand)))
@@ -999,12 +996,13 @@ public class TestReplicationManager {
     ContainerInfo container = ReplicationTestUtil.createContainerInfo(
         repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
     ReconstructECContainersCommand command = createReconstructionCommand(
-        container, targets);
+        container, MockDatanodeDetails.randomDatanodeDetails(),
+        MockDatanodeDetails.randomDatanodeDetails());
     replicationManager.sendThrottledReconstructionCommand(container, command);
   }
 
   private ReconstructECContainersCommand createReconstructionCommand(
-      ContainerInfo containerInfo, List<DatanodeDetails> targets) {
+      ContainerInfo containerInfo, DatanodeDetails... targets) {
     List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex> sources
         = new ArrayList<>();
     for (int i = 1; i <= 3; i++) {
@@ -1013,10 +1011,10 @@ public class TestReplicationManager {
               MockDatanodeDetails.randomDatanodeDetails(), i));
     }
     byte[] missingIndexes = new byte[]{4, 5};
-
     return new ReconstructECContainersCommand(
         containerInfo.getContainerID(), sources,
-        targets, missingIndexes, (ECReplicationConfig) repConfig);
+        new ArrayList<>(Arrays.asList(targets)), missingIndexes,
+        (ECReplicationConfig) repConfig);
   }
 
   @Test
@@ -1050,6 +1048,76 @@ public class TestReplicationManager {
     replicationManager.sendThrottledDeleteCommand(container, 1, target, true);
   }
 
+  @Test
+  public void testExcludedNodes() throws NodeNotFoundException,
+      NotLeaderException, CommandTargetOverloadedException {
+    int repLimit = replicationManager.getConfig().getDatanodeReplicationLimit();
+    int reconstructionWeight = replicationManager.getConfig()
+        .getReconstructionCommandWeight();
+    ContainerInfo container = ReplicationTestUtil.createContainerInfo(
+        repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
+    Map<DatanodeDetails, Integer> commandCounts = new HashMap<>();
+    DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails dn2 = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails dn3 = MockDatanodeDetails.randomDatanodeDetails();
+
+    commandCounts.put(dn1, repLimit - 1);
+    commandCounts.put(dn2, repLimit - reconstructionWeight);
+    commandCounts.put(dn3, repLimit);
+
+    Mockito.when(nodeManager.getTotalDatanodeCommandCounts(any(),
+        eq(SCMCommandProto.Type.replicateContainerCommand),
+        eq(SCMCommandProto.Type.reconstructECContainersCommand)))
+        .thenAnswer(invocation -> {
+          DatanodeDetails dn = invocation.getArgument(0);
+          Map<SCMCommandProto.Type, Integer> counts = new HashMap<>();
+          counts.put(SCMCommandProto.Type.replicateContainerCommand,
+              commandCounts.get(dn));
+          counts.put(SCMCommandProto.Type.reconstructECContainersCommand, 0);
+          return counts;
+        });
+
+    replicationManager.sendThrottledReplicationCommand(container,
+        new ArrayList<>(commandCounts.keySet()),
+        MockDatanodeDetails.randomDatanodeDetails(), 1);
+
+    Set<DatanodeDetails> excluded = replicationManager.getExcludedNodes();
+    Assert.assertEquals(excluded.size(), 1);
+    // dn 3 was at the limit already, so should be added when filtering the
+    // nodes
+    Assert.assertTrue(excluded.contains(dn3));
+
+    // Trigger an update for dn3, but it should stay in the excluded list as its
+    // count is still at the limit.
+    replicationManager.datanodeCommandCountUpdated(dn3);
+    Assert.assertEquals(replicationManager.getExcludedNodes().size(), 1);
+
+    // now sent a reconstruction command. It should be sent to dn2, which is
+    // at the lowest count, but this command should push it to the limit and
+    // cause it to be excluded.
+    ReconstructECContainersCommand command = createReconstructionCommand(
+        container, dn1, dn2);
+    replicationManager.sendThrottledReconstructionCommand(container, command);
+    excluded = replicationManager.getExcludedNodes();
+    Assert.assertEquals(excluded.size(), 2);
+    // dn 3 was already in the excluded list
+    Assert.assertTrue(excluded.contains(dn3));
+    // dn 2 reached the limit from the reconstruction command
+    Assert.assertTrue(excluded.contains(dn2));
+
+    // Update received for DN2, it should be cleared from the excluded list.
+    replicationManager.datanodeCommandCountUpdated(dn2);
+    excluded = replicationManager.getExcludedNodes();
+    Assert.assertEquals(excluded.size(), 1);
+    Assert.assertFalse(excluded.contains(dn2));
+
+    // Finally, update received for DN1 - it is not excluded and should not
+    // be added or cause any problems by not being there
+    replicationManager.datanodeCommandCountUpdated(dn1);
+    Assert.assertEquals(excluded.size(), 1);
+    Assert.assertFalse(excluded.contains(dn1));
+  }
+
   @SafeVarargs
   private final Set<ContainerReplica>  addReplicas(ContainerInfo container,
       ContainerReplicaProto.State replicaState,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org