You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2023/04/01 16:14:25 UTC
[ozone] branch master updated: HDDS-8334. ReplicationManager: Add nodes to exclude list if they are overloaded (#4510)
This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 3ecd87cd7a HDDS-8334. ReplicationManager: Add nodes to exclude list if they are overloaded (#4510)
3ecd87cd7a is described below
commit 3ecd87cd7a8ca8e645c7552e0c15d63848fb54e2
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Sat Apr 1 17:14:18 2023 +0100
HDDS-8334. ReplicationManager: Add nodes to exclude list if they are overloaded (#4510)
---
.../container/replication/ReplicationManager.java | 102 ++++++++++++++++-----
.../replication/TestReplicationManager.java | 86 +++++++++++++++--
2 files changed, 155 insertions(+), 33 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 23b226d978..62851a325b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -79,6 +79,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
@@ -147,6 +148,16 @@ public class ReplicationManager implements SCMService {
*/
private LegacyReplicationManager legacyReplicationManager;
+ /**
+ * Set of nodes which have been excluded for replication commands due to the
+ * number of commands queued on a datanode. This can be used when generating
+ * reconstruction commands to avoid nodes which are already overloaded. When
+ * the datanode heartbeat is received, the node is removed from this set if
+ * the command count has dropped below the limit.
+ */
+ private final Map<DatanodeDetails, Integer> excludedNodes =
+ new ConcurrentHashMap<>();
+
/**
* SCMService related variables.
* After leaving safe mode, replicationMonitor needs to wait for a while
@@ -526,13 +537,13 @@ public class ReplicationManager implements SCMService {
"available for replication of container " + containerID + " to " +
target);
}
- // Put the least loaded source first
- sourceWithCmds.sort(Comparator.comparingInt(Pair::getLeft));
+ DatanodeDetails source = selectAndOptionallyExcludeDatanode(
+ 1, sourceWithCmds);
ReplicateContainerCommand cmd =
ReplicateContainerCommand.toTarget(containerID, target);
cmd.setReplicaIndex(replicaIndex);
- sendDatanodeCommand(cmd, containerInfo, sourceWithCmds.get(0).getRight());
+ sendDatanodeCommand(cmd, containerInfo, source);
}
public void sendThrottledReconstructionCommand(ContainerInfo containerInfo,
@@ -545,10 +556,24 @@ public class ReplicationManager implements SCMService {
throw new CommandTargetOverloadedException("No target with capacity " +
"available for reconstruction of " + containerInfo.getContainerID());
}
- // Put the least loaded target first
- targetWithCmds.sort(Comparator.comparingInt(Pair::getLeft));
- sendDatanodeCommand(command, containerInfo,
- targetWithCmds.get(0).getRight());
+ DatanodeDetails target = selectAndOptionallyExcludeDatanode(
+ reconstructionCommandWeight, targetWithCmds);
+ sendDatanodeCommand(command, containerInfo, target);
+ }
+
+ private DatanodeDetails selectAndOptionallyExcludeDatanode(
+ int additionalCmdCount, List<Pair<Integer, DatanodeDetails>> datanodes) {
+ if (datanodes.isEmpty()) {
+ return null;
+ }
+ // Put the least loaded datanode first
+ datanodes.sort(Comparator.comparingInt(Pair::getLeft));
+ DatanodeDetails datanode = datanodes.get(0).getRight();
+ int currentCount = datanodes.get(0).getLeft();
+ if (currentCount + additionalCmdCount >= datanodeReplicationLimit) {
+ addExcludedNode(datanode);
+ }
+ return datanode;
}
/**
@@ -567,18 +592,12 @@ public class ReplicationManager implements SCMService {
= new ArrayList<>();
for (DatanodeDetails dn : datanodes) {
try {
- Map<Type, Integer> counts = nodeManager.getTotalDatanodeCommandCounts(
- dn, Type.replicateContainerCommand,
- Type.reconstructECContainersCommand);
- int replicateCount = counts.get(Type.replicateContainerCommand);
- int reconstructCount = counts.get(Type.reconstructECContainersCommand);
- int totalCount = replicateCount
- + reconstructCount * reconstructionCommandWeight;
+ int totalCount = getDatanodeQueuedReplicationCount(dn);
if (totalCount >= datanodeReplicationLimit) {
LOG.debug("Datanode {} has reached the maximum number of queued " +
- "commands, replication: {}, reconstruction: {} * {})",
- dn, replicateCount, reconstructCount,
- reconstructionCommandWeight);
+ "commands, replication + reconstruction * {}: {})",
+ dn, reconstructionCommandWeight, totalCount);
+ addExcludedNode(dn);
continue;
}
datanodeWithCommandCount.add(Pair.of(totalCount, dn));
@@ -590,6 +609,16 @@ public class ReplicationManager implements SCMService {
return datanodeWithCommandCount;
}
+ private int getDatanodeQueuedReplicationCount(DatanodeDetails datanode)
+ throws NodeNotFoundException {
+ Map<Type, Integer> counts = nodeManager.getTotalDatanodeCommandCounts(
+ datanode, Type.replicateContainerCommand,
+ Type.reconstructECContainersCommand);
+ int replicateCount = counts.get(Type.replicateContainerCommand);
+ int reconstructCount = counts.get(Type.reconstructECContainersCommand);
+ return replicateCount + reconstructCount * reconstructionCommandWeight;
+ }
+
/**
* Send a push replication command to the given source datanode, instructing
* it to copy the given container to the target. The command is sent as a low
@@ -804,15 +833,40 @@ public class ReplicationManager implements SCMService {
* Notify ReplicationManager that the command counts on a datanode have been
* updated via a heartbeat received. This will allow RM to consider the node
* for container operations if it was previously excluded due to load.
- * @param datanodeDetails The datanode for which the commands have been
- * updated.
+ * @param datanode The datanode for which the commands have been updated.
*/
- public void datanodeCommandCountUpdated(DatanodeDetails datanodeDetails) {
- // For now this is a NOOP, as the plan is to use this notification in a
- // future change to limit the number of commands scheduled against a DN by
- // RM.
+ public void datanodeCommandCountUpdated(DatanodeDetails datanode) {
LOG.debug("Received a notification that the DN command count " +
- "has been updated for {}", datanodeDetails);
+ "has been updated for {}", datanode);
+ // If there is an existing mapping, we may need to remove it
+ excludedNodes.computeIfPresent(datanode, (k, v) -> {
+ try {
+ if (getDatanodeQueuedReplicationCount(datanode)
+ < datanodeReplicationLimit) {
+ // Returning null removes the entry from the map
+ return null;
+ } else {
+ return 1;
+ }
+ } catch (NodeNotFoundException e) {
+ LOG.warn("Unable to find datanode {} in nodeManager. " +
+ "Should not happen.", datanode);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Returns the list of datanodes that are currently excluded from being
+ * targets for container replication due to queued commands.
+ * @return Set of excluded DatanodeDetails.
+ */
+ public Set<DatanodeDetails> getExcludedNodes() {
+ return excludedNodes.keySet();
+ }
+
+ private void addExcludedNode(DatanodeDetails dn) {
+ excludedNodes.put(dn, 1);
}
protected void processContainer(ContainerInfo containerInfo,
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index b48532d0fe..0e6ab3c17a 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -59,6 +59,7 @@ import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -957,7 +958,7 @@ public class TestReplicationManager {
repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
ReconstructECContainersCommand command = createReconstructionCommand(
- container, new ArrayList<>(targetNodes.keySet()));
+ container, targetNodes.keySet().toArray(new DatanodeDetails[0]));
replicationManager.sendThrottledReconstructionCommand(container, command);
@@ -980,10 +981,6 @@ public class TestReplicationManager {
int reconstructionCount = 2;
int replicationCount = limit - reconstructionCount * reconstructionWeight;
- List<DatanodeDetails> targets = new ArrayList<>();
- targets.add(MockDatanodeDetails.randomDatanodeDetails());
- targets.add(MockDatanodeDetails.randomDatanodeDetails());
-
Mockito.when(nodeManager.getTotalDatanodeCommandCounts(any(),
eq(SCMCommandProto.Type.replicateContainerCommand),
eq(SCMCommandProto.Type.reconstructECContainersCommand)))
@@ -999,12 +996,13 @@ public class TestReplicationManager {
ContainerInfo container = ReplicationTestUtil.createContainerInfo(
repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
ReconstructECContainersCommand command = createReconstructionCommand(
- container, targets);
+ container, MockDatanodeDetails.randomDatanodeDetails(),
+ MockDatanodeDetails.randomDatanodeDetails());
replicationManager.sendThrottledReconstructionCommand(container, command);
}
private ReconstructECContainersCommand createReconstructionCommand(
- ContainerInfo containerInfo, List<DatanodeDetails> targets) {
+ ContainerInfo containerInfo, DatanodeDetails... targets) {
List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex> sources
= new ArrayList<>();
for (int i = 1; i <= 3; i++) {
@@ -1013,10 +1011,10 @@ public class TestReplicationManager {
MockDatanodeDetails.randomDatanodeDetails(), i));
}
byte[] missingIndexes = new byte[]{4, 5};
-
return new ReconstructECContainersCommand(
containerInfo.getContainerID(), sources,
- targets, missingIndexes, (ECReplicationConfig) repConfig);
+ new ArrayList<>(Arrays.asList(targets)), missingIndexes,
+ (ECReplicationConfig) repConfig);
}
@Test
@@ -1050,6 +1048,76 @@ public class TestReplicationManager {
replicationManager.sendThrottledDeleteCommand(container, 1, target, true);
}
+ @Test
+ public void testExcludedNodes() throws NodeNotFoundException,
+ NotLeaderException, CommandTargetOverloadedException {
+ int repLimit = replicationManager.getConfig().getDatanodeReplicationLimit();
+ int reconstructionWeight = replicationManager.getConfig()
+ .getReconstructionCommandWeight();
+ ContainerInfo container = ReplicationTestUtil.createContainerInfo(
+ repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20);
+ Map<DatanodeDetails, Integer> commandCounts = new HashMap<>();
+ DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails dn2 = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails dn3 = MockDatanodeDetails.randomDatanodeDetails();
+
+ commandCounts.put(dn1, repLimit - 1);
+ commandCounts.put(dn2, repLimit - reconstructionWeight);
+ commandCounts.put(dn3, repLimit);
+
+ Mockito.when(nodeManager.getTotalDatanodeCommandCounts(any(),
+ eq(SCMCommandProto.Type.replicateContainerCommand),
+ eq(SCMCommandProto.Type.reconstructECContainersCommand)))
+ .thenAnswer(invocation -> {
+ DatanodeDetails dn = invocation.getArgument(0);
+ Map<SCMCommandProto.Type, Integer> counts = new HashMap<>();
+ counts.put(SCMCommandProto.Type.replicateContainerCommand,
+ commandCounts.get(dn));
+ counts.put(SCMCommandProto.Type.reconstructECContainersCommand, 0);
+ return counts;
+ });
+
+ replicationManager.sendThrottledReplicationCommand(container,
+ new ArrayList<>(commandCounts.keySet()),
+ MockDatanodeDetails.randomDatanodeDetails(), 1);
+
+ Set<DatanodeDetails> excluded = replicationManager.getExcludedNodes();
+ Assert.assertEquals(excluded.size(), 1);
+ // dn 3 was at the limit already, so should be added when filtering the
+ // nodes
+ Assert.assertTrue(excluded.contains(dn3));
+
+ // Trigger an update for dn3, but it should stay in the excluded list as its
+ // count is still at the limit.
+ replicationManager.datanodeCommandCountUpdated(dn3);
+ Assert.assertEquals(replicationManager.getExcludedNodes().size(), 1);
+
+ // now sent a reconstruction command. It should be sent to dn2, which is
+ // at the lowest count, but this command should push it to the limit and
+ // cause it to be excluded.
+ ReconstructECContainersCommand command = createReconstructionCommand(
+ container, dn1, dn2);
+ replicationManager.sendThrottledReconstructionCommand(container, command);
+ excluded = replicationManager.getExcludedNodes();
+ Assert.assertEquals(excluded.size(), 2);
+ // dn 3 was already in the excluded list
+ Assert.assertTrue(excluded.contains(dn3));
+ // dn 2 reached the limit from the reconstruction command
+ Assert.assertTrue(excluded.contains(dn2));
+
+ // Update received for DN2, it should be cleared from the excluded list.
+ replicationManager.datanodeCommandCountUpdated(dn2);
+ excluded = replicationManager.getExcludedNodes();
+ Assert.assertEquals(excluded.size(), 1);
+ Assert.assertFalse(excluded.contains(dn2));
+
+ // Finally, update received for DN1 - it is not excluded and should not
+ // be added or cause any problems by not being there
+ replicationManager.datanodeCommandCountUpdated(dn1);
+ Assert.assertEquals(excluded.size(), 1);
+ Assert.assertFalse(excluded.contains(dn1));
+ }
+
@SafeVarargs
private final Set<ContainerReplica> addReplicas(ContainerInfo container,
ContainerReplicaProto.State replicaState,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org