You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2022/11/11 09:42:56 UTC

[ozone] branch master updated: HDDS-7471. EC: Notify ReplicationManager when a heartbeat updates datanode command counts (#3945)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a13c62b605 HDDS-7471. EC: Notify ReplicationManager when a heartbeat updates datanode command counts (#3945)
a13c62b605 is described below

commit a13c62b60556cd003ee2149179f72029d9e35756
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Fri Nov 11 09:42:50 2022 +0000

    HDDS-7471. EC: Notify ReplicationManager when a heartbeat updates datanode command counts (#3945)
---
 .../DatanodeCommandCountUpdatedHandler.java        | 48 +++++++++++++++++++++
 .../container/replication/ReplicationManager.java  | 15 +++++++
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |  9 ++++
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  2 +
 .../hdds/scm/server/StorageContainerManager.java   |  5 +++
 .../TestDatanodeCommandCountUpdatedHandler.java    | 49 ++++++++++++++++++++++
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   | 13 ++++++
 7 files changed, 141 insertions(+)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/DatanodeCommandCountUpdatedHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/DatanodeCommandCountUpdatedHandler.java
new file mode 100644
index 0000000000..87e5928f40
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/DatanodeCommandCountUpdatedHandler.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Event handler for the DATANODE_COMMAND_COUNT_UPDATED event.
+ */
+public class DatanodeCommandCountUpdatedHandler implements
+    EventHandler<DatanodeDetails> {
+
+  private ReplicationManager replicationManager;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DatanodeCommandCountUpdatedHandler.class);
+
+  public DatanodeCommandCountUpdatedHandler(
+      ReplicationManager replicationManager) {
+    this.replicationManager = replicationManager;
+  }
+
+  @Override
+  public void onMessage(DatanodeDetails datanodeDetails,
+      EventPublisher publisher) {
+    LOG.debug("DatanodeCommandCountUpdatedHandler called with datanode {}",
+        datanodeDetails);
+    replicationManager.datanodeCommandCountUpdated(datanodeDetails);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 6aceab13d1..686ed74f50 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -488,6 +488,21 @@ public class ReplicationManager implements SCMService {
     return scmContext.getTermOfLeader();
   }
 
+  /**
+   * Notify ReplicationManager that the command counts on a datanode have been
+   * updated via a heartbeat received. This will allow RM to consider the node
+   * for container operations if it was previously excluded due to load.
+   * @param datanodeDetails The datanode for which the commands have been
+   *                        updated.
+   */
+  public void datanodeCommandCountUpdated(DatanodeDetails datanodeDetails) {
+    // For now this is a NOOP, as the plan is to use this notification in a
+    // future change to limit the number of commands scheduled against a DN by
+    // RM.
+    LOG.debug("Received a notification that the DN command count " +
+        "has been updated for {}", datanodeDetails);
+  }
+
   protected void processContainer(ContainerInfo containerInfo,
       ReplicationQueue repQueue, ReplicationManagerReport report)
       throws ContainerNotFoundException {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index c51d792d4c..5575760a15 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -59,6 +59,15 @@ public final class SCMEvents {
       COMMAND_QUEUE_REPORT = new TypedEvent<>(
           CommandQueueReportFromDatanode.class, "Command_Queue_Report");
 
+  /**
+   * After node manager processes a COMMAND_QUEUE_REPORT it fires
+   * this event to allow any other processes which depend upon the counts to
+   * be notified they have been updated.
+   */
+  public static final TypedEvent<DatanodeDetails>
+      DATANODE_COMMAND_COUNT_UPDATED = new TypedEvent<>(
+          DatanodeDetails.class, "Datanode_Command_Queue_Updated");
+
   /**
    * Event generated on DataNode registration.
    */
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index c5ec348f2a..b9bc13d5a5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -717,6 +717,8 @@ public class SCMNodeManager implements NodeManager {
         datanodeInfo.setCommandCounts(commandQueueReportProto,
             commandsToBeSent);
         metrics.incNumNodeCommandQueueReportProcessed();
+        scmNodeEventPublisher.fireEvent(
+            SCMEvents.DATANODE_COMMAND_COUNT_UPDATED, datanodeDetails);
       }
     } catch (NodeNotFoundException e) {
       metrics.incNumNodeCommandQueueReportProcessingFailed();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index a3b748e547..0f84b624b1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
 import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy;
 import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
+import org.apache.hadoop.hdds.scm.container.replication.DatanodeCommandCountUpdatedHandler;
 import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
 import org.apache.hadoop.hdds.scm.crl.CRLStatusReportHandler;
 import org.apache.hadoop.hdds.scm.ha.BackgroundSCMService;
@@ -466,12 +467,16 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         new PipelineActionHandler(pipelineManager, scmContext, configuration);
     CRLStatusReportHandler crlStatusReportHandler =
         new CRLStatusReportHandler(certificateStore, configuration);
+    DatanodeCommandCountUpdatedHandler datanodeCommandCountUpdatedHandler =
+        new DatanodeCommandCountUpdatedHandler(replicationManager);
 
     eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
     eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager);
     eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
     eventQueue.addHandler(SCMEvents.COMMAND_QUEUE_REPORT,
         commandQueueReportHandler);
+    eventQueue.addHandler(SCMEvents.DATANODE_COMMAND_COUNT_UPDATED,
+        datanodeCommandCountUpdatedHandler);
 
     // Use the same executor for both ICR and FCR.
     // The Executor maps the event to a thread for DN.
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestDatanodeCommandCountUpdatedHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestDatanodeCommandCountUpdatedHandler.java
new file mode 100644
index 0000000000..9fb9d1f692
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestDatanodeCommandCountUpdatedHandler.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.mockito.ArgumentMatchers.eq;
+
+/**
+ * Tests for DatanodeCommandCountUpdatedHandler.
+ */
+public class TestDatanodeCommandCountUpdatedHandler {
+
+  private ReplicationManager replicationManager;
+  private DatanodeCommandCountUpdatedHandler handler;
+
+  @BeforeEach
+  public void setup() {
+    replicationManager = Mockito.mock(ReplicationManager.class);
+    handler = new DatanodeCommandCountUpdatedHandler(replicationManager);
+  }
+
+  @Test
+  public void testReplicationManagerNotified() {
+    DatanodeDetails datanode = MockDatanodeDetails.randomDatanodeDetails();
+    handler.onMessage(datanode, null);
+    Mockito.verify(replicationManager)
+        .datanodeCommandCountUpdated(eq(datanode));
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index 73dc7b6175..72c2873f72 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -106,6 +106,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_C
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND_COUNT_UPDATED;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.NEW_NODE;
 import static org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.toLayoutVersionProto;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -986,6 +987,13 @@ public class TestSCMNodeManager {
     assertEquals(5, nodeManager.getNodeQueuedCommandCount(
         node1, SCMCommandProto.Type.deleteBlocksCommand));
 
+    ArgumentCaptor<DatanodeDetails> captor =
+        ArgumentCaptor.forClass(DatanodeDetails.class);
+    verify(eventPublisher, times(1))
+        .fireEvent(Mockito.eq(DATANODE_COMMAND_COUNT_UPDATED),
+            captor.capture());
+    assertEquals(node1, captor.getValue());
+
     // Send another report missing an earlier entry, and ensure it is not
     // still reported as a stale value.
     nodeManager.processNodeCommandQueueReport(node1,
@@ -998,6 +1006,11 @@ public class TestSCMNodeManager {
         node1, SCMCommandProto.Type.replicateContainerCommand));
     assertEquals(11, nodeManager.getNodeQueuedCommandCount(
         node1, SCMCommandProto.Type.closeContainerCommand));
+
+    verify(eventPublisher, times(2))
+        .fireEvent(Mockito.eq(DATANODE_COMMAND_COUNT_UPDATED),
+            captor.capture());
+    assertEquals(node1, captor.getValue());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org