You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/10/27 01:21:08 UTC

[ozone] branch master updated: HDDS-7402. Adapt CommandQueue to track the count of each queued command type (#3891)

This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 557d7f84ff HDDS-7402. Adapt CommandQueue to track the count of each queued command type (#3891)
557d7f84ff is described below

commit 557d7f84ff107aa3b075fe9ed274efdcdf62a9e3
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Thu Oct 27 02:21:02 2022 +0100

    HDDS-7402. Adapt CommandQueue to track the count of each queued command type (#3891)
---
 .../apache/hadoop/hdds/scm/node/CommandQueue.java  |  51 +++++++----
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |   9 ++
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  13 +++
 .../hadoop/hdds/scm/container/MockNodeManager.java |  12 +++
 .../hdds/scm/container/SimpleMockNodeManager.java  |  12 +++
 .../hadoop/hdds/scm/node/TestCommandQueue.java     | 101 +++++++++++++++++++++
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |  36 ++++++++
 .../testutils/ReplicationNodeManagerMock.java      |  12 +++
 8 files changed, 229 insertions(+), 17 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java
index aa930251c4..d17411c517 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java
@@ -19,10 +19,12 @@ package org.apache.hadoop.hdds.scm.node;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.Time;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,9 +40,8 @@ import java.util.concurrent.locks.ReentrantLock;
  * there where queued.
  */
 public class CommandQueue {
-  // This list is used as default return value.
-  private static final List<SCMCommand> DEFAULT_LIST = new ArrayList<>();
   private final Map<UUID, Commands> commandMap;
+  private final Map<UUID, Map<SCMCommandProto.Type, Integer>> summaryMap;
   private final Lock lock;
   private long commandsInQueue;
 
@@ -59,6 +60,7 @@ public class CommandQueue {
    */
   public CommandQueue() {
     commandMap = new HashMap<>();
+    summaryMap = new HashMap<>();
     lock = new ReentrantLock();
     commandsInQueue = 0;
   }
@@ -71,6 +73,7 @@ public class CommandQueue {
     lock.lock();
     try {
       commandMap.clear();
+      summaryMap.clear();
       commandsInQueue = 0;
     } finally {
       lock.unlock();
@@ -90,6 +93,7 @@ public class CommandQueue {
     lock.lock();
     try {
       Commands cmds = commandMap.remove(datanodeUuid);
+      summaryMap.remove(datanodeUuid);
       List<SCMCommand> cmdList = null;
       if (cmds != null) {
         cmdList = cmds.getCommands();
@@ -97,7 +101,28 @@ public class CommandQueue {
         // A post condition really.
         Preconditions.checkState(commandsInQueue >= 0);
       }
-      return cmds == null ? DEFAULT_LIST : cmdList;
+      return cmds == null ? Collections.emptyList() : cmdList;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns the count of commands of the give type currently queued for the
+   * given datanode.
+   * @param datanodeUuid Datanode UUID.
+   * @param commandType The type of command for which to get the count.
+   * @return The currently queued command count, or zero if none are queued.
+   */
+  public int getDatanodeCommandCount(
+      final UUID datanodeUuid, SCMCommandProto.Type commandType) {
+    lock.lock();
+    try {
+      Map<SCMCommandProto.Type, Integer> summary = summaryMap.get(datanodeUuid);
+      if (summary == null) {
+        return 0;
+      }
+      return summary.getOrDefault(commandType, 0);
     } finally {
       lock.unlock();
     }
@@ -113,11 +138,12 @@ public class CommandQueue {
       command) {
     lock.lock();
     try {
-      if (commandMap.containsKey(datanodeUuid)) {
-        commandMap.get(datanodeUuid).add(command);
-      } else {
-        commandMap.put(datanodeUuid, new Commands(command));
-      }
+      commandMap.computeIfAbsent(datanodeUuid, s -> new Commands())
+          .add(command);
+      Map<SCMCommandProto.Type, Integer> summary =
+          summaryMap.computeIfAbsent(datanodeUuid, s -> new HashMap<>());
+      summary.put(command.getType(),
+          summary.getOrDefault(command.getType(), 0) + 1);
       commandsInQueue++;
     } finally {
       lock.unlock();
@@ -141,15 +167,6 @@ public class CommandQueue {
       readTime = 0;
     }
 
-    /**
-     * Creates the object and populates with the command.
-     * @param command command to add to queue.
-     */
-    Commands(SCMCommand command) {
-      this();
-      this.add(command);
-    }
-
     /**
      * Gets the last time the commands for this node was updated.
      * @return Time stamp
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index e2f69f77db..5514db8ecf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -317,6 +317,15 @@ public interface NodeManager extends StorageContainerNodeProtocol,
   int getNodeQueuedCommandCount(DatanodeDetails datanodeDetails,
       SCMCommandProto.Type cmdType) throws NodeNotFoundException;
 
+  /**
+   * Get the number of commands of the given type queued in the SCM CommandQueue
+   * for the given datanode.
+   * @param dnID The UUID of the datanode.
+   * @param cmdType The Type of command to query the current count for.
+   * @return The count of commands queued, or zero if none.
+   */
+  int getCommandQueueCount(UUID dnID, SCMCommandProto.Type cmdType);
+
   /**
    * Get list of SCMCommands in the Command Queue for a particular Datanode.
    * @param dnID - Datanode uuid.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 62ef30d27c..dea4f0605b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -729,12 +729,25 @@ public class SCMNodeManager implements NodeManager {
    * @param cmdType
    * @return The queued count or -1 if no data has been received from the DN.
    */
+  @Override
   public int getNodeQueuedCommandCount(DatanodeDetails datanodeDetails,
       SCMCommandProto.Type cmdType) throws NodeNotFoundException {
     DatanodeInfo datanodeInfo = nodeStateManager.getNode(datanodeDetails);
     return datanodeInfo.getCommandCount(cmdType);
   }
 
+  /**
+   * Get the number of commands of the given type queued in the SCM CommandQueue
+   * for the given datanode.
+   * @param dnID The UUID of the datanode.
+   * @param cmdType The Type of command to query the current count for.
+   * @return The count of commands queued, or zero if none.
+   */
+  @Override
+  public int getCommandQueueCount(UUID dnID, SCMCommandProto.Type cmdType) {
+    return commandQueue.getDatanodeCommandCount(dnID, cmdType);
+  }
+
   /**
    * Returns the aggregated node stats.
    *
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 9083125f47..3fa14f7dd0 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -615,6 +615,18 @@ public class MockNodeManager implements NodeManager {
     return -1;
   }
 
+  /**
+   * Get the number of commands of the given type queued in the SCM CommandQueue
+   * for the given datanode.
+   * @param dnID The UUID of the datanode.
+   * @param cmdType The Type of command to query the current count for.
+   * @return The count of commands queued, or zero if none.
+   */
+  @Override
+  public int getCommandQueueCount(UUID dnID, SCMCommandProto.Type cmdType) {
+    return 0;
+  }
+
   /**
    * Update set of containers available on a datanode.
    * @param uuid - DatanodeID
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
index 35ce3cb8a9..e14383f2da 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
@@ -309,6 +309,18 @@ public class SimpleMockNodeManager implements NodeManager {
     return -1;
   }
 
+  /**
+   * Get the number of commands of the given type queued in the SCM CommandQueue
+   * for the given datanode.
+   * @param dnID The UUID of the datanode.
+   * @param cmdType The Type of command to query the current count for.
+   * @return The count of commands queued, or zero if none.
+   */
+  @Override
+  public int getCommandQueueCount(UUID dnID, SCMCommandProto.Type cmdType) {
+    return 0;
+  }
+
   @Override
   public List<SCMCommand> getCommandQueue(UUID dnID) {
     return null;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestCommandQueue.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestCommandQueue.java
new file mode 100644
index 0000000000..5c76799236
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestCommandQueue.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Test for the CommandQueue class.
+ */
+
+public class TestCommandQueue {
+
+  @Test
+  public void testSummaryUpdated() {
+    CommandQueue commandQueue = new CommandQueue();
+    long containerID = 1;
+    SCMCommand<?> closeContainerCommand =
+        new CloseContainerCommand(containerID, PipelineID.randomId());
+    SCMCommand<?> createPipelineCommand =
+        new CreatePipelineCommand(PipelineID.randomId(),
+            HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE, Collections.emptyList());
+
+    UUID datanode1UUID = UUID.randomUUID();
+    UUID datanode2UUID = UUID.randomUUID();
+
+    commandQueue.addCommand(datanode1UUID, closeContainerCommand);
+    commandQueue.addCommand(datanode1UUID, closeContainerCommand);
+    commandQueue.addCommand(datanode1UUID, createPipelineCommand);
+
+    commandQueue.addCommand(datanode2UUID, closeContainerCommand);
+    commandQueue.addCommand(datanode2UUID, createPipelineCommand);
+    commandQueue.addCommand(datanode2UUID, createPipelineCommand);
+
+    // Check zero returned for unknown DN
+    Assert.assertEquals(0, commandQueue.getDatanodeCommandCount(
+        UUID.randomUUID(), SCMCommandProto.Type.closeContainerCommand));
+
+    Assert.assertEquals(2, commandQueue.getDatanodeCommandCount(
+        datanode1UUID, SCMCommandProto.Type.closeContainerCommand));
+    Assert.assertEquals(1, commandQueue.getDatanodeCommandCount(
+        datanode1UUID, SCMCommandProto.Type.createPipelineCommand));
+    Assert.assertEquals(0, commandQueue.getDatanodeCommandCount(
+        datanode1UUID, SCMCommandProto.Type.closePipelineCommand));
+
+    Assert.assertEquals(1, commandQueue.getDatanodeCommandCount(
+        datanode2UUID, SCMCommandProto.Type.closeContainerCommand));
+    Assert.assertEquals(2, commandQueue.getDatanodeCommandCount(
+        datanode2UUID, SCMCommandProto.Type.createPipelineCommand));
+
+    // Ensure the counts are cleared when the commands are retrieved
+    List<SCMCommand> cmds = commandQueue.getCommand(datanode1UUID);
+    Assert.assertEquals(3, cmds.size());
+
+    Assert.assertEquals(0, commandQueue.getDatanodeCommandCount(
+        datanode1UUID, SCMCommandProto.Type.closeContainerCommand));
+    Assert.assertEquals(0, commandQueue.getDatanodeCommandCount(
+        datanode1UUID, SCMCommandProto.Type.createPipelineCommand));
+    Assert.assertEquals(0, commandQueue.getDatanodeCommandCount(
+        datanode1UUID, SCMCommandProto.Type.closePipelineCommand));
+
+    Assert.assertEquals(1, commandQueue.getDatanodeCommandCount(
+        datanode2UUID, SCMCommandProto.Type.closeContainerCommand));
+    Assert.assertEquals(2, commandQueue.getDatanodeCommandCount(
+        datanode2UUID, SCMCommandProto.Type.createPipelineCommand));
+
+    // Ensure the commands are zeroed when the queue is cleared
+    commandQueue.clear();
+    Assert.assertEquals(0, commandQueue.getDatanodeCommandCount(
+        datanode2UUID, SCMCommandProto.Type.closeContainerCommand));
+    Assert.assertEquals(0, commandQueue.getDatanodeCommandCount(
+        datanode2UUID, SCMCommandProto.Type.createPipelineCommand));
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index a936e5652f..66120f92fc 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.ozone.container.upgrade.UpgradeUtils;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.ozone.upgrade.LayoutVersionManager;
@@ -76,6 +77,7 @@ import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.hadoop.test.PathUtils;
+import org.junit.Assert;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
@@ -977,6 +979,40 @@ public class TestSCMNodeManager {
         node1, SCMCommandProto.Type.closeContainerCommand));
   }
 
+  @Test
+  public void testCommandCount()
+      throws AuthenticationException, IOException {
+    SCMNodeManager nodeManager = createNodeManager(getConf());
+
+    UUID datanode1 = UUID.randomUUID();
+    UUID datanode2 = UUID.randomUUID();
+    long containerID = 1;
+
+    SCMCommand<?> closeContainerCommand =
+        new CloseContainerCommand(containerID, PipelineID.randomId());
+    SCMCommand<?> createPipelineCommand =
+        new CreatePipelineCommand(PipelineID.randomId(),
+            HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE, Collections.emptyList());
+
+    nodeManager.onMessage(
+        new CommandForDatanode<>(datanode1, closeContainerCommand), null);
+    nodeManager.onMessage(
+        new CommandForDatanode<>(datanode1, closeContainerCommand), null);
+    nodeManager.onMessage(
+        new CommandForDatanode<>(datanode1, createPipelineCommand), null);
+
+    Assert.assertEquals(2, nodeManager.getCommandQueueCount(
+        datanode1, SCMCommandProto.Type.closeContainerCommand));
+    Assert.assertEquals(1, nodeManager.getCommandQueueCount(
+        datanode1, SCMCommandProto.Type.createPipelineCommand));
+    Assert.assertEquals(0, nodeManager.getCommandQueueCount(
+        datanode1, SCMCommandProto.Type.closePipelineCommand));
+
+    Assert.assertEquals(0, nodeManager.getCommandQueueCount(
+        datanode2, SCMCommandProto.Type.closeContainerCommand));
+  }
+
   /**
    * Check for NPE when datanodeDetails is passed null for sendHeartbeat.
    *
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 19b5d1a4b9..a93792b2c9 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -451,6 +451,18 @@ public class ReplicationNodeManagerMock implements NodeManager {
     return -1;
   }
 
+  /**
+   * Get the number of commands of the given type queued in the SCM CommandQueue
+   * for the given datanode.
+   * @param dnID The UUID of the datanode.
+   * @param cmdType The Type of command to query the current count for.
+   * @return The count of commands queued, or zero if none.
+   */
+  @Override
+  public int getCommandQueueCount(UUID dnID, SCMCommandProto.Type cmdType) {
+    return commandQueue.getDatanodeCommandCount(dnID, cmdType);
+  }
+
   @Override
   public void onMessage(CommandForDatanode commandForDatanode,
                         EventPublisher publisher) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org