You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/10/27 01:21:08 UTC
[ozone] branch master updated: HDDS-7402. Adapt CommandQueue to track the count of each queued command type (#3891)
This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 557d7f84ff HDDS-7402. Adapt CommandQueue to track the count of each queued command type (#3891)
557d7f84ff is described below
commit 557d7f84ff107aa3b075fe9ed274efdcdf62a9e3
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Thu Oct 27 02:21:02 2022 +0100
HDDS-7402. Adapt CommandQueue to track the count of each queued command type (#3891)
---
.../apache/hadoop/hdds/scm/node/CommandQueue.java | 51 +++++++----
.../apache/hadoop/hdds/scm/node/NodeManager.java | 9 ++
.../hadoop/hdds/scm/node/SCMNodeManager.java | 13 +++
.../hadoop/hdds/scm/container/MockNodeManager.java | 12 +++
.../hdds/scm/container/SimpleMockNodeManager.java | 12 +++
.../hadoop/hdds/scm/node/TestCommandQueue.java | 101 +++++++++++++++++++++
.../hadoop/hdds/scm/node/TestSCMNodeManager.java | 36 ++++++++
.../testutils/ReplicationNodeManagerMock.java | 12 +++
8 files changed, 229 insertions(+), 17 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java
index aa930251c4..d17411c517 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java
@@ -19,10 +19,12 @@ package org.apache.hadoop.hdds.scm.node;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,9 +40,8 @@ import java.util.concurrent.locks.ReentrantLock;
* there where queued.
*/
public class CommandQueue {
- // This list is used as default return value.
- private static final List<SCMCommand> DEFAULT_LIST = new ArrayList<>();
private final Map<UUID, Commands> commandMap;
+ private final Map<UUID, Map<SCMCommandProto.Type, Integer>> summaryMap;
private final Lock lock;
private long commandsInQueue;
@@ -59,6 +60,7 @@ public class CommandQueue {
*/
public CommandQueue() {
commandMap = new HashMap<>();
+ summaryMap = new HashMap<>();
lock = new ReentrantLock();
commandsInQueue = 0;
}
@@ -71,6 +73,7 @@ public class CommandQueue {
lock.lock();
try {
commandMap.clear();
+ summaryMap.clear();
commandsInQueue = 0;
} finally {
lock.unlock();
@@ -90,6 +93,7 @@ public class CommandQueue {
lock.lock();
try {
Commands cmds = commandMap.remove(datanodeUuid);
+ summaryMap.remove(datanodeUuid);
List<SCMCommand> cmdList = null;
if (cmds != null) {
cmdList = cmds.getCommands();
@@ -97,7 +101,28 @@ public class CommandQueue {
// A post condition really.
Preconditions.checkState(commandsInQueue >= 0);
}
- return cmds == null ? DEFAULT_LIST : cmdList;
+ return cmds == null ? Collections.emptyList() : cmdList;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns the count of commands of the give type currently queued for the
+ * given datanode.
+ * @param datanodeUuid Datanode UUID.
+ * @param commandType The type of command for which to get the count.
+ * @return The currently queued command count, or zero if none are queued.
+ */
+ public int getDatanodeCommandCount(
+ final UUID datanodeUuid, SCMCommandProto.Type commandType) {
+ lock.lock();
+ try {
+ Map<SCMCommandProto.Type, Integer> summary = summaryMap.get(datanodeUuid);
+ if (summary == null) {
+ return 0;
+ }
+ return summary.getOrDefault(commandType, 0);
} finally {
lock.unlock();
}
@@ -113,11 +138,12 @@ public class CommandQueue {
command) {
lock.lock();
try {
- if (commandMap.containsKey(datanodeUuid)) {
- commandMap.get(datanodeUuid).add(command);
- } else {
- commandMap.put(datanodeUuid, new Commands(command));
- }
+ commandMap.computeIfAbsent(datanodeUuid, s -> new Commands())
+ .add(command);
+ Map<SCMCommandProto.Type, Integer> summary =
+ summaryMap.computeIfAbsent(datanodeUuid, s -> new HashMap<>());
+ summary.put(command.getType(),
+ summary.getOrDefault(command.getType(), 0) + 1);
commandsInQueue++;
} finally {
lock.unlock();
@@ -141,15 +167,6 @@ public class CommandQueue {
readTime = 0;
}
- /**
- * Creates the object and populates with the command.
- * @param command command to add to queue.
- */
- Commands(SCMCommand command) {
- this();
- this.add(command);
- }
-
/**
* Gets the last time the commands for this node was updated.
* @return Time stamp
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index e2f69f77db..5514db8ecf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -317,6 +317,15 @@ public interface NodeManager extends StorageContainerNodeProtocol,
int getNodeQueuedCommandCount(DatanodeDetails datanodeDetails,
SCMCommandProto.Type cmdType) throws NodeNotFoundException;
+ /**
+ * Get the number of commands of the given type queued in the SCM CommandQueue
+ * for the given datanode.
+ * @param dnID The UUID of the datanode.
+ * @param cmdType The Type of command to query the current count for.
+ * @return The count of commands queued, or zero if none.
+ */
+ int getCommandQueueCount(UUID dnID, SCMCommandProto.Type cmdType);
+
/**
* Get list of SCMCommands in the Command Queue for a particular Datanode.
* @param dnID - Datanode uuid.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 62ef30d27c..dea4f0605b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -729,12 +729,25 @@ public class SCMNodeManager implements NodeManager {
* @param cmdType
* @return The queued count or -1 if no data has been received from the DN.
*/
+ @Override
public int getNodeQueuedCommandCount(DatanodeDetails datanodeDetails,
SCMCommandProto.Type cmdType) throws NodeNotFoundException {
DatanodeInfo datanodeInfo = nodeStateManager.getNode(datanodeDetails);
return datanodeInfo.getCommandCount(cmdType);
}
+ /**
+ * Get the number of commands of the given type queued in the SCM CommandQueue
+ * for the given datanode.
+ * @param dnID The UUID of the datanode.
+ * @param cmdType The Type of command to query the current count for.
+ * @return The count of commands queued, or zero if none.
+ */
+ @Override
+ public int getCommandQueueCount(UUID dnID, SCMCommandProto.Type cmdType) {
+ return commandQueue.getDatanodeCommandCount(dnID, cmdType);
+ }
+
/**
* Returns the aggregated node stats.
*
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 9083125f47..3fa14f7dd0 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -615,6 +615,18 @@ public class MockNodeManager implements NodeManager {
return -1;
}
+ /**
+ * Get the number of commands of the given type queued in the SCM CommandQueue
+ * for the given datanode.
+ * @param dnID The UUID of the datanode.
+ * @param cmdType The Type of command to query the current count for.
+ * @return The count of commands queued, or zero if none.
+ */
+ @Override
+ public int getCommandQueueCount(UUID dnID, SCMCommandProto.Type cmdType) {
+ return 0;
+ }
+
/**
* Update set of containers available on a datanode.
* @param uuid - DatanodeID
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
index 35ce3cb8a9..e14383f2da 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
@@ -309,6 +309,18 @@ public class SimpleMockNodeManager implements NodeManager {
return -1;
}
+ /**
+ * Get the number of commands of the given type queued in the SCM CommandQueue
+ * for the given datanode.
+ * @param dnID The UUID of the datanode.
+ * @param cmdType The Type of command to query the current count for.
+ * @return The count of commands queued, or zero if none.
+ */
+ @Override
+ public int getCommandQueueCount(UUID dnID, SCMCommandProto.Type cmdType) {
+ return 0;
+ }
+
@Override
public List<SCMCommand> getCommandQueue(UUID dnID) {
return null;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestCommandQueue.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestCommandQueue.java
new file mode 100644
index 0000000000..5c76799236
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestCommandQueue.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Test for the CommandQueue class.
+ */
+
+public class TestCommandQueue {
+
+ @Test
+ public void testSummaryUpdated() {
+ CommandQueue commandQueue = new CommandQueue();
+ long containerID = 1;
+ SCMCommand<?> closeContainerCommand =
+ new CloseContainerCommand(containerID, PipelineID.randomId());
+ SCMCommand<?> createPipelineCommand =
+ new CreatePipelineCommand(PipelineID.randomId(),
+ HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE, Collections.emptyList());
+
+ UUID datanode1UUID = UUID.randomUUID();
+ UUID datanode2UUID = UUID.randomUUID();
+
+ commandQueue.addCommand(datanode1UUID, closeContainerCommand);
+ commandQueue.addCommand(datanode1UUID, closeContainerCommand);
+ commandQueue.addCommand(datanode1UUID, createPipelineCommand);
+
+ commandQueue.addCommand(datanode2UUID, closeContainerCommand);
+ commandQueue.addCommand(datanode2UUID, createPipelineCommand);
+ commandQueue.addCommand(datanode2UUID, createPipelineCommand);
+
+ // Check zero returned for unknown DN
+ Assert.assertEquals(0, commandQueue.getDatanodeCommandCount(
+ UUID.randomUUID(), SCMCommandProto.Type.closeContainerCommand));
+
+ Assert.assertEquals(2, commandQueue.getDatanodeCommandCount(
+ datanode1UUID, SCMCommandProto.Type.closeContainerCommand));
+ Assert.assertEquals(1, commandQueue.getDatanodeCommandCount(
+ datanode1UUID, SCMCommandProto.Type.createPipelineCommand));
+ Assert.assertEquals(0, commandQueue.getDatanodeCommandCount(
+ datanode1UUID, SCMCommandProto.Type.closePipelineCommand));
+
+ Assert.assertEquals(1, commandQueue.getDatanodeCommandCount(
+ datanode2UUID, SCMCommandProto.Type.closeContainerCommand));
+ Assert.assertEquals(2, commandQueue.getDatanodeCommandCount(
+ datanode2UUID, SCMCommandProto.Type.createPipelineCommand));
+
+ // Ensure the counts are cleared when the commands are retrieved
+ List<SCMCommand> cmds = commandQueue.getCommand(datanode1UUID);
+ Assert.assertEquals(3, cmds.size());
+
+ Assert.assertEquals(0, commandQueue.getDatanodeCommandCount(
+ datanode1UUID, SCMCommandProto.Type.closeContainerCommand));
+ Assert.assertEquals(0, commandQueue.getDatanodeCommandCount(
+ datanode1UUID, SCMCommandProto.Type.createPipelineCommand));
+ Assert.assertEquals(0, commandQueue.getDatanodeCommandCount(
+ datanode1UUID, SCMCommandProto.Type.closePipelineCommand));
+
+ Assert.assertEquals(1, commandQueue.getDatanodeCommandCount(
+ datanode2UUID, SCMCommandProto.Type.closeContainerCommand));
+ Assert.assertEquals(2, commandQueue.getDatanodeCommandCount(
+ datanode2UUID, SCMCommandProto.Type.createPipelineCommand));
+
+ // Ensure the commands are zeroed when the queue is cleared
+ commandQueue.clear();
+ Assert.assertEquals(0, commandQueue.getDatanodeCommandCount(
+ datanode2UUID, SCMCommandProto.Type.closeContainerCommand));
+ Assert.assertEquals(0, commandQueue.getDatanodeCommandCount(
+ datanode2UUID, SCMCommandProto.Type.createPipelineCommand));
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index a936e5652f..66120f92fc 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import org.apache.hadoop.ozone.container.upgrade.UpgradeUtils;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.upgrade.LayoutVersionManager;
@@ -76,6 +77,7 @@ import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
+import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@@ -977,6 +979,40 @@ public class TestSCMNodeManager {
node1, SCMCommandProto.Type.closeContainerCommand));
}
+ @Test
+ public void testCommandCount()
+ throws AuthenticationException, IOException {
+ SCMNodeManager nodeManager = createNodeManager(getConf());
+
+ UUID datanode1 = UUID.randomUUID();
+ UUID datanode2 = UUID.randomUUID();
+ long containerID = 1;
+
+ SCMCommand<?> closeContainerCommand =
+ new CloseContainerCommand(containerID, PipelineID.randomId());
+ SCMCommand<?> createPipelineCommand =
+ new CreatePipelineCommand(PipelineID.randomId(),
+ HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE, Collections.emptyList());
+
+ nodeManager.onMessage(
+ new CommandForDatanode<>(datanode1, closeContainerCommand), null);
+ nodeManager.onMessage(
+ new CommandForDatanode<>(datanode1, closeContainerCommand), null);
+ nodeManager.onMessage(
+ new CommandForDatanode<>(datanode1, createPipelineCommand), null);
+
+ Assert.assertEquals(2, nodeManager.getCommandQueueCount(
+ datanode1, SCMCommandProto.Type.closeContainerCommand));
+ Assert.assertEquals(1, nodeManager.getCommandQueueCount(
+ datanode1, SCMCommandProto.Type.createPipelineCommand));
+ Assert.assertEquals(0, nodeManager.getCommandQueueCount(
+ datanode1, SCMCommandProto.Type.closePipelineCommand));
+
+ Assert.assertEquals(0, nodeManager.getCommandQueueCount(
+ datanode2, SCMCommandProto.Type.closeContainerCommand));
+ }
+
/**
* Check for NPE when datanodeDetails is passed null for sendHeartbeat.
*
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 19b5d1a4b9..a93792b2c9 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -451,6 +451,18 @@ public class ReplicationNodeManagerMock implements NodeManager {
return -1;
}
+ /**
+ * Get the number of commands of the given type queued in the SCM CommandQueue
+ * for the given datanode.
+ * @param dnID The UUID of the datanode.
+ * @param cmdType The Type of command to query the current count for.
+ * @return The count of commands queued, or zero if none.
+ */
+ @Override
+ public int getCommandQueueCount(UUID dnID, SCMCommandProto.Type cmdType) {
+ return commandQueue.getDatanodeCommandCount(dnID, cmdType);
+ }
+
@Override
public void onMessage(CommandForDatanode commandForDatanode,
EventPublisher publisher) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org